thrift源码解析之server

2023-10-29


前言

此文章介绍thrift支持的几种server:SimpleServer、NonblockingServer、ThreadedServer、ThreadedPoolServer。并且额外介绍一种server机制:EventloopServer

thrift文章整理:
1.thrift简介
2.thrift源码解析之compiler
3.thrift源码解析之processor
4.thrift源码解析之protocol
5.thrift源码解析之transport
6.thrift源码解析之server

有关网络编程参考文章:
网络编程之linger结构体
socketpair
网络编程之addrinfo
poll用来完成多路socket
线程池原理
Reactor 反应堆设计模式


概述

关于thrift的线程、互斥锁、条件变量都在 thrift源码解析之concurrency 中已经阐述过。

TSimpleServer

serve

在服务器端就是执行serve();,TSimpleServer继承于TServerFramework,以下是TServerFramework实现的serve()(部分代码省略)

void TServerFramework::serve() {
  shared_ptr<TTransport> client;
  shared_ptr<TTransport> inputTransport;
  shared_ptr<TTransport> outputTransport;
  shared_ptr<TProtocol> inputProtocol;
  shared_ptr<TProtocol> outputProtocol;

  //启动监听
  serverTransport_->listen();

  //serve前的准备
  /*运行preServe事件以指示服务器正在侦听并且可以安全连接。*/
  if (eventHandler_) {
    eventHandler_->preServe();		//空的
  }

  // 获取客户端连接
  for (;;) {
    try {
      //释放前一个客户端连接资源
      outputProtocol.reset();
      inputProtocol.reset();
      outputTransport.reset();
      inputTransport.reset();
      client.reset();

      // 如果超出了允许并发的客户端数上限,等待,直到并发数降低到limit_以下,这里mon_ 作用类似条件锁
      {
        Synchronized sync(mon_);
        while (clients_ >= limit_) {
          mon_.wait();
        }
      }
	  
	  //监听客户端连接,此时另开一个终端,执行./client 请求,触发这一步
      client = serverTransport_->accept();

      inputTransport = inputTransportFactory_->getTransport(client);
      outputTransport = outputTransportFactory_->getTransport(client);
      if (!outputProtocolFactory_) {
        inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport);
        outputProtocol = inputProtocol;
      } else {
        inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
      }
	  
	  //处理客户端连接
      newlyConnectedClient(shared_ptr<TConnectedClient>(
          new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
                               inputProtocol,
                               outputProtocol,
                               eventHandler_,
                               client),
          bind(&TServerFramework::disposeConnectedClient, this, std::placeholders::_1)));

    }
    ... ...
  }

  releaseOneDescriptor("serverTransport", serverTransport_);
}

几个关键点:listen、accept、newlyConnectedClient,其中accept 会在客户端 transport->open() 时返回。

1.listen

void TServerSocket::listen() {
  listening_ = true;
#ifdef _WIN32
  TWinsockSingleton::create();
#endif // _WIN32
  THRIFT_SOCKET sv[2];
  // Create the socket pair used to interrupt
  if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
    GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt", THRIFT_GET_SOCKET_ERROR);
    interruptSockWriter_ = THRIFT_INVALID_SOCKET;
    interruptSockReader_ = THRIFT_INVALID_SOCKET;
  } else {
    interruptSockWriter_ = sv[1];
    interruptSockReader_ = sv[0];
  }

  // Create the socket pair used to interrupt all clients
  if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
    GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt",
                        THRIFT_GET_SOCKET_ERROR);
    childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
    pChildInterruptSockReader_.reset();
  } else {
    childInterruptSockWriter_ = sv[1];
    pChildInterruptSockReader_
        = std::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets);
  }

  // Validate port number
  if (port_ < 0 || port_ > 0xFFFF) {	//端口号有效
    throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
  }

  const struct addrinfo *res;
  int error;
  char port[sizeof("65535")];
  THRIFT_SNPRINTF(port, sizeof(port), "%d", port_);

  struct addrinfo hints;
  std::memset(&hints, 0, sizeof(hints));
  hints.ai_family = PF_UNSPEC;		//地址系列指定为IPV4还是IPV6还是其他
  hints.ai_socktype = SOCK_STREAM;	//基于字节流的套接字类型
  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;	//AI_PASSIVE套接字地址,getaddrinfo函数需要AI_ADDRCONFIG

  // If address is not specified use wildcard address (NULL)
  TGetAddrInfoWrapper info(address_.empty() ? nullptr : &address_[0], port, &hints);

  error = info.init();

  // Pick the ipv6 address first since ipv4 addresses can be mapped
  // into ipv6 space.
  for (res = info.res(); res; res = res->ai_next) {
    if (res->ai_family == AF_INET6 || res->ai_next == nullptr)
      break;
  }

  if (!path_.empty()) {
    serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
  } else if (res != nullptr) {
    serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
  }

  if (serverSocket_ == THRIFT_INVALID_SOCKET) {
  	... ...
  }

  // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept 报文最大生存时间
  int one = 1;
  if (-1 == setsockopt(serverSocket_,
                       SOL_SOCKET,
                       THRIFT_NO_SOCKET_CACHING,	// 允许套接口和一个已在使用中的地址捆绑
                       cast_sockopt(&one),
                       sizeof(one))) {
  	... ...
  }

  // Set TCP buffer sizes
  if (tcpSendBuffer_ > 0) {
    if (-1 == setsockopt(serverSocket_,
                         SOL_SOCKET,		//设置发送缓冲区大小
                         SO_SNDBUF,
                         cast_sockopt(&tcpSendBuffer_),
                         sizeof(tcpSendBuffer_))) {
     	... ...
    }
  }

  if (tcpRecvBuffer_ > 0) {
    if (-1 == setsockopt(serverSocket_,
                         SOL_SOCKET,		//设置接收缓冲区大小
                         SO_RCVBUF,
                         cast_sockopt(&tcpRecvBuffer_),
                         sizeof(tcpRecvBuffer_))) {
     	... ...
    }
  }

// Defer accept
#ifdef TCP_DEFER_ACCEPT
  if (path_.empty()) {
    if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_DEFER_ACCEPT, &one, sizeof(one))) {	//IPPROTO_TCP:选项所在协议层;支持SOL_SOCKET(套接字本身)、IPPROTO_TCP、IPPROTO_IP和IPPROTO_IPV6。
       	... ...
    }
  }
#endif // #ifdef TCP_DEFER_ACCEPT

#ifdef IPV6_V6ONLY
  if (res->ai_family == AF_INET6 && path_.empty()) {
    int zero = 0;
    if (-1 == setsockopt(serverSocket_,
                         IPPROTO_IPV6,
                         IPV6_V6ONLY,
                         cast_sockopt(&zero),
                         sizeof(zero))) {
      GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
    }
  }
#endif // #ifdef IPV6_V6ONLY

  // Turn linger off, don't want to block on calls to close
  struct linger ling = {0, 0};	//linger结构体决定closesocket是否优雅端断开
  if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) {
     	... ...
  }

  // Unix Sockets do not need that
  if (path_.empty()) {
    // TCP Nodelay, speed over bandwidth
    if (-1
        == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
       	... ...
    }
  }

  // Set NONBLOCK on the accept socket		//设置非阻塞socket
  int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
  if (flags == -1) {
     	... ...
  }

  if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
    int errno_copy = THRIFT_GET_SOCKET_ERROR;
    	... ...
  }

  // prepare the port information
  // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
  // always seem to work. The client can configure the retry variables.
  int retries = 0;
  int errno_copy = 0;

  if (!path_.empty()) {		//path_不是空,则为unix_domin_socket

#ifndef _WIN32

    // Unix Domain Socket
    size_t len = path_.size() + 1;	//获得path长度
    if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) {		//长度不能超过sockaddr_un支持的最大长度
        	... ...
    }

    struct sockaddr_un address;
    address.sun_family = AF_UNIX;
    memcpy(address.sun_path, path_.c_str(), len);	//path复制给sockaddr_un

    auto structlen = static_cast<socklen_t>(sizeof(address));

    if (!address.sun_path[0]) { // abstract namespace socket
#ifdef __linux__
      // sun_path is not null-terminated in this case and structlen determines its length
      structlen -= sizeof(address.sun_path) - len;
#else
      	... ...
#endif
    }

    do {	//unix domin socket连接
      if (0 == ::bind(serverSocket_, (struct sockaddr*)&address, structlen)) {
        break;
      }
      errno_copy = THRIFT_GET_SOCKET_ERROR;
      // use short circuit evaluation here to only sleep if we need to
    } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
#else
    GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
    throw TTransportException(TTransportException::NOT_OPEN,
                              " Unix Domain socket path not supported");
#endif
  } else {	//unix  socket连接	
    do {
      if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen))) {
        break;
      }
      errno_copy = THRIFT_GET_SOCKET_ERROR;
      // use short circuit evaluation here to only sleep if we need to
    } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));

    // retrieve bind info
    if (port_ == 0 && retries <= retryLimit_) {
      struct sockaddr_storage sa;
      socklen_t len = sizeof(sa);
      std::memset(&sa, 0, len);
      if (::getsockname(serverSocket_, reinterpret_cast<struct sockaddr*>(&sa), &len) < 0) {
        errno_copy = THRIFT_GET_SOCKET_ERROR;
        GlobalOutput.perror("TServerSocket::getPort() getsockname() ", errno_copy);
      } else {
        if (sa.ss_family == AF_INET6) {
          const auto* sin = reinterpret_cast<const struct sockaddr_in6*>(&sa);
          port_ = ntohs(sin->sin6_port);
        } else {
          const auto* sin = reinterpret_cast<const struct sockaddr_in*>(&sa);
          port_ = ntohs(sin->sin_port);
        }
      }
    }
  }

   	... ...

  // Call listen
  if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
     	... ...
  }

  // The socket is now listening!
}

主要做的事情:
1.设置中断相关事宜
2.判断端口号值是否是有效值
3.对socket进行一些设置(发送/接收缓冲区大小等等)
4.设置优雅断开
5.设置非阻塞socket (fcntl的用法:linux非阻塞IO
6.根据path_是否空,选择unix_domin_socket连接或者unix socket连接
7.bind
8.listen

最主要做的两件事:bind、listen

2.accept

shared_ptr<TTransport> TServerSocket::acceptImpl() {
	... ...

  struct THRIFT_POLLFD fds[2];

  int maxEintrs = 5;
  int numEintrs = 0;

  while (true) {
    std::memset(fds, 0, sizeof(fds));
    fds[0].fd = serverSocket_;
    fds[0].events = THRIFT_POLLIN;
	... ...
    int ret = THRIFT_POLL(fds, 2, accTimeout_);
    ... ...
      // Check for the actual server socket being ready
      if (fds[0].revents & THRIFT_POLLIN) {
        break;
      }
    ... ...  
  }

  struct sockaddr_storage clientAddress;
  int size = sizeof(clientAddress);
  THRIFT_SOCKET clientSocket
      = ::accept(serverSocket_, (struct sockaddr*)&clientAddress, (socklen_t*)&size);
    ... ...  
  //将clientSocket设置为阻塞
  int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
	... ...
  THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK);
	... ...
  //接下来就是将clientSocket包装成thrift中的TSocket类型的
  shared_ptr<TSocket> client = createSocket(clientSocket);
  if (sendTimeout_ > 0) {
    client->setSendTimeout(sendTimeout_);
  }
  if (recvTimeout_ > 0) {
    client->setRecvTimeout(recvTimeout_);
  }
  if (keepAlive_) {
    client->setKeepAlive(keepAlive_);
  }
  client->setCachedAddress((sockaddr*)&clientAddress, size);

  if (acceptCallback_)
    acceptCallback_(clientSocket);

  return client;
}

1.将serverSocket_设置为多路复用IO模式  (poll用来监听socket事件看文章:linux网络基础 的最后)
1.while死循环阻塞住,直到服务端有接收到client:fds[0].revents & THRIFT_POLLIN,则跳出循环
2.clientSocket = accept();
3.将clientSocket设置为阻塞模式
4.将clientSocket包装成thrift中的TSocket类型的client,返回这个client

然后根据这clinet和main程序选择的transport,protocol类生成 inputTransport 、outputTransport、outputProtocol 、inputProtocol:

  inputTransport = inputTransportFactory_->getTransport(client);
  outputTransport = outputTransportFactory_->getTransport(client);
  if (!outputProtocolFactory_) {
    inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport);
    outputProtocol = inputProtocol;
  } else {
    inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
    outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
  }

3.newlyConnectedClient

void TServerFramework::newlyConnectedClient(const shared_ptr<TConnectedClient>& pClient) {
  {
    Synchronized sync(mon_);
    ++clients_;
    hwm_ = (std::max)(hwm_, clients_);
  }

  onClientConnected(pClient);
}
void TSimpleServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
  pClient->run();
}
void TConnectedClient::run() {
  if (eventHandler_) {
    opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_);
  }

  for (bool done = false; !done;) {
    if (eventHandler_) {
      eventHandler_->processContext(opaqueContext_, client_);
    }

    try {
      if (!processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)) {
        break;
      }
    } catch (const TTransportException& ttx) {
      switch (ttx.getType()) {
        case TTransportException::END_OF_FILE:
        case TTransportException::INTERRUPTED:
        case TTransportException::TIMED_OUT:
          // Client disconnected or was interrupted or did not respond within the receive timeout.
          // No logging needed.  Done.
          done = true;
          break;

        default: {
          // All other transport exceptions are logged.
          // State of connection is unknown.  Done.
          string errStr = string("TConnectedClient died: ") + ttx.what();
          GlobalOutput(errStr.c_str());
          done = true;
          break;
        }
      }
    } catch (const TException& tex) {
      string errStr = string("TConnectedClient processing exception: ") + tex.what();
      GlobalOutput(errStr.c_str());
      // Disconnect from client, because we could not process the message.
      done = true;
    }
  }

  cleanup();
}

其中processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)执行了以下类中的process:

class TDispatchProcessor : public TProcessor {
public:
  bool process(std::shared_ptr<protocol::TProtocol> in,
                       std::shared_ptr<protocol::TProtocol> out,
                       void* connectionContext) override {
    std::string fname;
    protocol::TMessageType mtype;
    int32_t seqid;
    in->readMessageBegin(fname, mtype, seqid);

    if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) {
      GlobalOutput.printf("received invalid message type %d from client", mtype);
      return false;
    }

    return dispatchCall(in.get(), out.get(), fname, seqid, connectionContext);
  }

protected:
  virtual bool dispatchCall(apache::thrift::protocol::TProtocol* in,
                            apache::thrift::protocol::TProtocol* out,
                            const std::string& fname,
                            int32_t seqid,
                            void* callContext) = 0;
};

dispatchcall函数具体做的事情在thrift源码解析之processor中已经详细介绍过,就是调用用户自定义的函数完成其执行。最终将返回结果写回到客户端,一个调用过程就结束了。


TNonblockingServer

这是一个用 C++ 编写的非阻塞服务器以实现高性能操作一组 IO 线程。 它假定所有传入请求都使用 4 字节长度指示符进行帧化,并使用相同的帧写出响应。
使用了libevent

serve

void TNonblockingServer::serve() {
  // 1.注册事件
  if (ioThreads_.empty())
    registerEvents(nullptr);

  // 2.在我们的主线程中运行主(侦听器)IO 线程循环; 这只会在服务器关闭时返回。
  ioThreads_[0]->run();

  // 3.在退出 serve() 之前确保所有线程都已完成
  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
    ioThreads_[i]->join();
    GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
  }
}

做了以上注释的三件事

1.registerEvents

void TNonblockingServer::registerEvents(event_base* user_event_base) {
  // 1.赋值userEventBase_ 
  userEventBase_ = user_event_base;

  // 2.初始化监听端口
  if (serverSocket_ == THRIFT_INVALID_SOCKET)
    createAndListenOnSocket();

  // 3.设置IO线程
  //ioThreads_是shared_ptr<TNonblockingIOThread>的动态数组类型
  assert(ioThreads_.empty());
  if (!numIOThreads_) {
    numIOThreads_ = DEFAULT_IO_THREADS;	//如果numIOThreads_为0,则赋值为1
  }
  // User-provided event-base doesn't works for multi-threaded servers
  assert(numIOThreads_ == 1 || !userEventBase_);
  //生成numIOThreads_个thread,将thread压栈(注意上面说过ioThreads_是动态数组类型)
  for (uint32_t id = 0; id < numIOThreads_; ++id) {
    // the first IO thread also does the listening on server socket
    THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
	//
    shared_ptr<TNonblockingIOThread> thread(new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
    ioThreads_.push_back(thread);
  }

  // 4.serve之前的处理函数
  if (eventHandler_) {
    eventHandler_->preServe();
  }

  // 5.启动我们所有的辅助 IO 线程。 请注意,线程将永远运行,只有在调用 stop() 时才会终止。
  assert(ioThreads_.size() == numIOThreads_);
  assert(ioThreads_.size() > 0);

  GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
                      ioThreads_.size());

  // 在单独的线程中启动所有辅助 IO 线程
  if (ioThreads_.size() > 1) {
    ioThreadFactory_.reset(new ThreadFactory(
        false // detached
        ));

    assert(ioThreadFactory_.get());

    // 故意从线程 1 开始,而不是 0
    for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
      shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
      ioThreads_[i]->setThread(thread);
      thread->start();
    }
  }

  // 6.为主(侦听器)IO 线程注册事件
  ioThreads_[0]->registerEvents();
}
(1).赋值userEventBase_
(2).初始化监听端口

createAndListenOnSocket:

void TNonblockingServer::createAndListenOnSocket() {
  serverTransport_->listen();
  serverSocket_ = serverTransport_->getSocketFD();
}

TNonblockingServerSocket的listen和TServerSocket类似,不放代码了。初始化监听端口。

(3).设置IO线程

生成numIOThreads_个thread,将thread压栈(注意上面说过ioThreads_是动态数组类型)

  for (uint32_t id = 0; id < numIOThreads_; ++id) {
    // the first IO thread also does the listening on server socket
    THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
	//
    shared_ptr<TNonblockingIOThread> thread(new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
    ioThreads_.push_back(thread);
  }
TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
                                           int number,
                                           THRIFT_SOCKET listenSocket,
                                           bool useHighPriority)
  : server_(server),
    number_(number),
    threadId_{},
    listenSocket_(listenSocket),
    useHighPriority_(useHighPriority),	//是否设置优先级
    eventBase_(nullptr),
    ownEventBase_(false),
    serverEvent_{},
    notificationEvent_{} {
  notificationPipeFDs_[0] = -1;
  notificationPipeFDs_[1] = -1;
}
(4).serve之前的处理函数(略)
(5).启动所有的辅助 IO 线程
if (ioThreads_.size() > 1) {
    ioThreadFactory_.reset(new ThreadFactory(false));
    assert(ioThreadFactory_.get());
    // 故意从线程 1 开始,而不是 0
    for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
      shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
      ioThreads_[i]->setThread(thread);
      thread->start();
    }
  }
void start() {
    if (getState() != uninitialized) {
      return;
    }
    std::shared_ptr<Thread> selfRef = shared_from_this();
    setState(starting);
    Synchronized sync(monitor_);
    thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));

    if (detached_)
      thread_->detach();
    monitor_.wait();
  }
(6).为主(侦听器)IO 线程注册事件
 void TNonblockingIOThread::registerEvents() {
  threadId_ = Thread::get_current();

  assert(eventBase_ == nullptr);
  eventBase_ = getServer()->getUserEventBase();
  if (eventBase_ == nullptr) {
    eventBase_ = event_base_new();
    ownEventBase_ = true;
  }

  // Print some libevent stats
  if (number_ == 0) {
    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
                        event_get_version(),
                        event_base_get_method(eventBase_));
  }

  if (listenSocket_ != THRIFT_INVALID_SOCKET) {
    // Register the server event
    event_set(&serverEvent_,
              listenSocket_,
              EV_READ | EV_PERSIST,
              TNonblockingIOThread::listenHandler,
              server_);
    event_base_set(eventBase_, &serverEvent_);

    // Add the event and start up the server
    if (-1 == event_add(&serverEvent_, nullptr)) {
      throw TException(
          "TNonblockingServer::serve(): "
          "event_add() failed on server listen event");
    }
    GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
  }

  createNotificationPipe();

  // Create an event to be notified when a task finishes
  event_set(&notificationEvent_,
            getNotificationRecvFD(),
            EV_READ | EV_PERSIST,
            TNonblockingIOThread::notifyHandler,
            this);

  // Attach to the base
  event_base_set(eventBase_, &notificationEvent_);

  // Add the event and start up the server
  if (-1 == event_add(&notificationEvent_, nullptr)) {
    throw TException(
        "TNonblockingServer::serve(): "
        "event_add() failed on task-done notification event");
  }
  GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}

event的回调函数:

 static void listenHandler(evutil_socket_t fd, short which, void* v) {
    ((TNonblockingServer*)v)->handleEvent(fd, which);
  }

handleEvent:

void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
  (void)which;
  // Make sure that libevent didn't mess up the socket handles
  assert(fd == serverSocket_);

  // Going to accept a new client socket
  std::shared_ptr<TSocket> clientSocket;

  clientSocket = serverTransport_->accept();
  if (clientSocket) {
    // If we're overloaded, take action here
    if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
      Guard g(connMutex_);
      nConnectionsDropped_++;
      nTotalConnectionsDropped_++;
      if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
        clientSocket->close();
        return;
      } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
        if (!drainPendingTask()) {
          // Nothing left to discard, so we drop connection instead.
          clientSocket->close();
          return;
        }
      }
    }

    // Create a new TConnection for this client socket.
    TConnection* clientConnection = createConnection(clientSocket);

    // Fail fast if we could not create a TConnection object
    if (clientConnection == nullptr) {
      GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
      clientSocket->close();
      return;
    }

    /*
     * Either notify the ioThread that is assigned this connection to
     * start processing, or if it is us, we'll just ask this
     * connection to do its initial state change here.
     *
     * (We need to avoid writing to our own notification pipe, to
     * avoid possible deadlocks if the pipe is full.)
     *
     * The IO thread #0 is the only one that handles these listen
     * events, so unless the connection has been assigned to thread #0
     * we know it's not on our thread.
     */
    if (clientConnection->getIOThreadNumber() == 0) {
      clientConnection->transition();
    } else {
      if (!clientConnection->notifyIOThread()) {
        GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
        clientConnection->close();
      }
    }
  }
}
void TNonblockingIOThread::createNotificationPipe() {
  if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
    throw TException("can't create notification pipe");
  }
  if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
      || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
    throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
  }
  for (auto notificationPipeFD : notificationPipeFDs_) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
    int flags;
    if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0
        || THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
    if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) {
#endif
      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
      throw TException(
          "TNonblockingServer::createNotificationPipe() "
          "FD_CLOEXEC");
    }
  }
}

/**
 * Register the core libevent events onto the proper base.
 */
void TNonblockingIOThread::registerEvents() {
  threadId_ = Thread::get_current();

  assert(eventBase_ == nullptr);
  eventBase_ = getServer()->getUserEventBase();
  if (eventBase_ == nullptr) {
    eventBase_ = event_base_new();
    ownEventBase_ = true;
  }

  // Print some libevent stats
  if (number_ == 0) {
    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
                        event_get_version(),
                        event_base_get_method(eventBase_));
  }

  if (listenSocket_ != THRIFT_INVALID_SOCKET) {
    // Register the server event
    event_set(&serverEvent_,
              listenSocket_,
              EV_READ | EV_PERSIST,
              TNonblockingIOThread::listenHandler,
              server_);
    event_base_set(eventBase_, &serverEvent_);

    // Add the event and start up the server
    if (-1 == event_add(&serverEvent_, nullptr)) {
      throw TException(
          "TNonblockingServer::serve(): "
          "event_add() failed on server listen event");
    }
    GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
  }

  createNotificationPipe();

  // Create an event to be notified when a task finishes
  event_set(&notificationEvent_,
            getNotificationRecvFD(),
            EV_READ | EV_PERSIST,
            TNonblockingIOThread::notifyHandler,
            this);

  // Attach to the base
  event_base_set(eventBase_, &notificationEvent_);

  // Add the event and start up the server
  if (-1 == event_add(&notificationEvent_, nullptr)) {
    throw TException(
        "TNonblockingServer::serve(): "
        "event_add() failed on task-done notification event");
  }
  GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}

bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
  auto fd = getNotificationSendFD();
  if (fd < 0) {
    return false;
  }

  int ret = -1;
  long kSize = sizeof(conn);
  const char * pos = (const char *)const_cast_sockopt(&conn);

#if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
  struct pollfd pfd = {fd, POLLOUT, 0};

  while (kSize > 0) {
    pfd.revents = 0;
    ret = poll(&pfd, 1, -1);
    if (ret < 0) {
      return false;
    } else if (ret == 0) {
      continue;
    }

    if (pfd.revents & POLLHUP || pfd.revents & POLLERR) {
      ::THRIFT_CLOSESOCKET(fd);
      return false;
    }

    if (pfd.revents & POLLOUT) {
      ret = send(fd, pos, kSize, 0);
      if (ret < 0) {
        if (errno == EAGAIN) {
          continue;
        }

        ::THRIFT_CLOSESOCKET(fd);
        return false;
      }

      kSize -= ret;
      pos += ret;
    }
  }
#else
  fd_set wfds, efds;

  while (kSize > 0) {
    FD_ZERO(&wfds);
    FD_ZERO(&efds);
    FD_SET(fd, &wfds);
    FD_SET(fd, &efds);
    ret = select(static_cast<int>(fd + 1), NULL, &wfds, &efds, NULL);
    if (ret < 0) {
      return false;
    } else if (ret == 0) {
      continue;
    }

    if (FD_ISSET(fd, &efds)) {
      ::THRIFT_CLOSESOCKET(fd);
      return false;
    }

    if (FD_ISSET(fd, &wfds)) {
      ret = send(fd, pos, kSize, 0);
      if (ret < 0) {
        if (errno == EAGAIN) {
          continue;
        }

        ::THRIFT_CLOSESOCKET(fd);
        return false;
      }

      kSize -= ret;
      pos += ret;
    }
  }
#endif

  return true;
}

/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
  auto* ioThread = (TNonblockingIOThread*)v;
  assert(ioThread);
  (void)which;

  while (true) {
    TNonblockingServer::TConnection* connection = nullptr;
    const int kSize = sizeof(connection);
    long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
    if (nBytes == kSize) {
      if (connection == nullptr) {
        // this is the command to stop our thread, exit the handler!
        ioThread->breakLoop(false);
        return;
      }
      connection->transition();
    } else if (nBytes > 0) {
      // throw away these bytes and hope that next time we get a solid read
      GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
      ioThread->breakLoop(true);
      return;
    } else if (nBytes == 0) {
      GlobalOutput.printf("notifyHandler: Notify socket closed!");
      ioThread->breakLoop(false);
      // exit the loop
      break;
    } else { // nBytes < 0
      if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
          && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
        GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
        ioThread->breakLoop(true);
        return;
      }
      // exit the loop
      break;
    }
  }
}

void TNonblockingIOThread::breakLoop(bool error) {
  if (error) {
    GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
    // TODO: figure out something better to do here, but for now kill the
    // whole process.
    GlobalOutput.printf("TNonblockingServer: aborting process.");
    ::abort();
  }

  // If we're running in the same thread, we can't use the notify(0)
  // mechanism to stop the thread, but happily if we're running in the
  // same thread, this means the thread can't be blocking in the event
  // loop either.
  if (!Thread::is_current(threadId_)) {
    notify(nullptr);
  } else {
    // cause the loop to stop ASAP - even if it has things to do in it
    event_base_loopbreak(eventBase_);
  }
}

void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
#ifdef HAVE_SCHED_H
  // Start out with a standard, low-priority setup for the sched params.
  struct sched_param sp;
  bzero((void*)&sp, sizeof(sp));
  int policy = SCHED_OTHER;

  // If desired, set up high-priority sched params structure.
  if (value) {
    // FIFO scheduler, ranked above default SCHED_OTHER queue
    policy = SCHED_FIFO;
    // The priority only compares us to other SCHED_FIFO threads, so we
    // just pick a random priority halfway between min & max.
    const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;

    sp.sched_priority = priority;
  }

  // Actually set the sched params for the current thread.
  if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
    GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
  } else {
    GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
  }
#else
  THRIFT_UNUSED_VARIABLE(value);
#endif
}

void TNonblockingIOThread::run() {
  if (eventBase_ == nullptr) {
    registerEvents();
  }
  if (useHighPriority_) {
    setCurrentThreadHighPriority(true);
  }

  if (eventBase_ != nullptr)
  {
    GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
    // Run libevent engine, never returns, invokes calls to eventHandler
    event_base_loop(eventBase_, 0);

    if (useHighPriority_) {
      setCurrentThreadHighPriority(false);
    }

    // cleans up our registered events
    cleanupEvents();
  }

  GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}

void TNonblockingIOThread::cleanupEvents() {
  // stop the listen socket, if any
  if (listenSocket_ != THRIFT_INVALID_SOCKET) {
    if (event_del(&serverEvent_) == -1) {
      GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
    }
  }

  event_del(&notificationEvent_);
}

void TNonblockingIOThread::stop() {
  // This should cause the thread to fall out of its event loop ASAP.
  breakLoop(false);
}

void TNonblockingIOThread::join() {
  // If this was a thread created by a factory (not the thread that called
  // serve()), we join() it to make sure we shut down fully.
  if (thread_) {
    try {
      // Note that it is safe to both join() ourselves twice, as well as join
      // the current thread as the pthread implementation checks for deadlock.
      thread_->join();
    } catch (...) {
      // swallow everything
    }
  }
}
}

回调函数:

void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
  auto* ioThread = (TNonblockingIOThread*)v;
  assert(ioThread);
  (void)which;

  while (true) {
    TNonblockingServer::TConnection* connection = nullptr;
    const int kSize = sizeof(connection);
    long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
    if (nBytes == kSize) {
      if (connection == nullptr) {
        // this is the command to stop our thread, exit the handler!
        ioThread->breakLoop(false);
        return;
      }
      connection->transition();
    } else if (nBytes > 0) {
      // throw away these bytes and hope that next time we get a solid read
      GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
      ioThread->breakLoop(true);
      return;
    } else if (nBytes == 0) {
      GlobalOutput.printf("notifyHandler: Notify socket closed!");
      ioThread->breakLoop(false);
      // exit the loop
      break;
    } else { // nBytes < 0
      if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
          && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
        GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
        ioThread->breakLoop(true);
        return;
      }
      // exit the loop
      break;
    }
  }
}
void TNonblockingServer::TConnection::transition() {
  // ensure this connection is active right now
  assert(ioThread_);
  assert(server_);

  // Switch upon the state that we are currently in and move to a new state
  switch (appState_) {

  case APP_READ_REQUEST:
    // We are done reading the request, package the read buffer into transport
    // and get back some data from the dispatch function
    if (server_->getHeaderTransport()) {
      inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
      outputTransport_->resetBuffer();
    } else {
      // We saved room for the framing size in case header transport needed it,
      // but just skip it for the non-header case
      inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
      outputTransport_->resetBuffer();

      // Prepend four bytes of blank space to the buffer so we can
      // write the frame size there later.
      outputTransport_->getWritePtr(4);
      outputTransport_->wroteBytes(4);
    }

    server_->incrementActiveProcessors();

    if (server_->isThreadPoolProcessing()) {
      // We are setting up a Task to do this work and we will wait on it

      // Create task and dispatch to the thread manager
      std::shared_ptr<Runnable> task = std::shared_ptr<Runnable>(
          new Task(processor_, inputProtocol_, outputProtocol_, this));
      // The application is now waiting on the task to finish
      appState_ = APP_WAIT_TASK;

      // Set this connection idle so that libevent doesn't process more
      // data on it while we're still waiting for the threadmanager to
      // finish this task
      setIdle();

      try {
        server_->addTask(task);
      } catch (IllegalStateException& ise) {
        // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
        GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
        server_->decrementActiveProcessors();
        close();
      } catch (TimedOutException& to) {
        GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
        server_->decrementActiveProcessors();
        close();
      }

      return;
    } else {
      try {
        if (serverEventHandler_) {
          serverEventHandler_->processContext(connectionContext_, getTSocket());
        }
        // Invoke the processor
        processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
      } catch (const TTransportException& ttx) {
        GlobalOutput.printf(
            "TNonblockingServer transport error in "
            "process(): %s",
            ttx.what());
        server_->decrementActiveProcessors();
        close();
        return;
      } catch (const std::exception& x) {
        GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
                            typeid(x).name(),
                            x.what());
        server_->decrementActiveProcessors();
        close();
        return;
      } catch (...) {
        GlobalOutput.printf("Server::process() unknown exception");
        server_->decrementActiveProcessors();
        close();
        return;
      }
    }
    // fallthrough

  // Intentionally fall through here, the call to process has written into
  // the writeBuffer_

  case APP_WAIT_TASK:
    // We have now finished processing a task and the result has been written
    // into the outputTransport_, so we grab its contents and place them into
    // the writeBuffer_ for actual writing by the libevent thread

    server_->decrementActiveProcessors();
    // Get the result of the operation
    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);

    // If the function call generated return data, then move into the send
    // state and get going
    // 4 bytes were reserved for frame size
    if (writeBufferSize_ > 4) {

      // Move into write state
      writeBufferPos_ = 0;
      socketState_ = SOCKET_SEND;

      // Put the frame size into the write buffer
      auto frameSize = (int32_t)htonl(writeBufferSize_ - 4);
      memcpy(writeBuffer_, &frameSize, 4);

      // Socket into write mode
      appState_ = APP_SEND_RESULT;
      setWrite();

      return;
    }

    // In this case, the request was oneway and we should fall through
    // right back into the read frame header state
    goto LABEL_APP_INIT;

  case APP_SEND_RESULT:
    // it's now safe to perform buffer size housekeeping.
    if (writeBufferSize_ > largestWriteBufferSize_) {
      largestWriteBufferSize_ = writeBufferSize_;
    }
    if (server_->getResizeBufferEveryN() > 0
        && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
      checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
                              server_->getIdleWriteBufferLimit());
      callsForResize_ = 0;
    }
    // fallthrough

  // N.B.: We also intentionally fall through here into the INIT state!

  LABEL_APP_INIT:
  case APP_INIT:

    // Clear write buffer variables
    writeBuffer_ = nullptr;
    writeBufferPos_ = 0;
    writeBufferSize_ = 0;

    // Into read4 state we go
    socketState_ = SOCKET_RECV_FRAMING;
    appState_ = APP_READ_FRAME_SIZE;

    readBufferPos_ = 0;

    // Register read event
    setRead();

    return;

  case APP_READ_FRAME_SIZE:
    readWant_ += 4;

    // We just read the request length
    // Double the buffer size until it is big enough
    if (readWant_ > readBufferSize_) {
      if (readBufferSize_ == 0) {
        readBufferSize_ = 1;
      }
      uint32_t newSize = readBufferSize_;
      while (readWant_ > newSize) {
        newSize *= 2;
      }

      auto* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
      if (newBuffer == nullptr) {
        // nothing else to be done...
        throw std::bad_alloc();
      }
      readBuffer_ = newBuffer;
      readBufferSize_ = newSize;
    }

    readBufferPos_ = 4;
    *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);

    // Move into read request state
    socketState_ = SOCKET_RECV;
    appState_ = APP_READ_REQUEST;

    return;

  case APP_CLOSE_CONNECTION:
    server_->decrementActiveProcessors();
    close();
    return;

  default:
    GlobalOutput.printf("Unexpected Application State %d", appState_);
    assert(0);
  }
}

2.run

void TNonblockingIOThread::run() {
  if (eventBase_ == nullptr) {
    registerEvents();
  }
  if (useHighPriority_) {
    setCurrentThreadHighPriority(true);
  }

  if (eventBase_ != nullptr)
  {
    GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
    // Run libevent engine, never returns, invokes calls to eventHandler
    event_base_loop(eventBase_, 0);		//开始循环

    if (useHighPriority_) {
      setCurrentThreadHighPriority(false);
    }

    // cleans up our registered events
    cleanupEvents();
  }

  GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}

3.join

void TNonblockingIOThread::join() {
  if (thread_) {
      thread_->join();
  }
}



比较复杂,函数执行的过程大致是这样,为了了解类的功能及属性,我们看下类:

TNonblockingServer类

class TNonblockingServer : public TServer {
private:
  class TConnection;
  friend class TNonblockingIOThread;
private:
//设置限制值
  static const int LISTEN_BACKLOG = 1024;
  static const size_t CONNECTION_STACK_LIMIT = 1024;
  static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
  static const int MAX_CONNECTIONS = INT_MAX;
  static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
  static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
  static const int IDLE_READ_BUFFER_LIMIT = 1024;
  static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
  static const int RESIZE_BUFFER_EVERY_N = 512;
  static const int DEFAULT_IO_THREADS = 1;
  
  size_t numIOThreads_;
  bool useHighPriorityIOThreads_;
  THRIFT_SOCKET serverSocket_;

  /// The optional user-provided event-base (for single-thread servers)
  event_base* userEventBase_;

  /// For processing via thread pool, may be NULL
  std::shared_ptr<ThreadManager> threadManager_;

  /// Is thread pool processing?
  bool threadPoolProcessing_;

  // Factory to create the IO threads
  std::shared_ptr<ThreadFactory> ioThreadFactory_;

  // Vector of IOThread objects that will handle our IO
  std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;

  // Index of next IO Thread to be used (for round-robin)
  uint32_t nextIOThread_;

  // Synchronizes access to connection stack and similar data
  Mutex connMutex_;

  /// Number of TConnection object we've created
  size_t numTConnections_;

  /// Number of Connections processing or waiting to process
  size_t numActiveProcessors_;

  /// Limit for how many TConnection objects to cache
  size_t connectionStackLimit_;

  /// Limit for number of connections processing or waiting to process
  size_t maxActiveProcessors_;

  /// Limit for number of open connections
  size_t maxConnections_;

  /// Limit for frame size
  size_t maxFrameSize_;

  /// Time in milliseconds before an unperformed task expires (0 == infinite).
  int64_t taskExpireTime_;

  /**
   * Hysteresis for overload state.  This is the fraction of the overload
   * value that needs to be reached before the overload state is cleared;
   * must be <= 1.0.
   */
  double overloadHysteresis_;

  /// Action to take when we're overloaded.
  TOverloadAction overloadAction_;

  /**
   * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
   * and found to be exceeded, reinitialized) to this size.
   */
  size_t writeBufferDefaultSize_;

  /**
   * Max read buffer size for an idle TConnection.  When we place an idle
   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
   * we will free the buffer (such that it will be reinitialized by the next
   * received frame) if it has exceeded this limit.  0 disables this check.
   */
  size_t idleReadBufferLimit_;

  /**
   * Max write buffer size for an idle connection.  When we place an idle
   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
   * we insure that its write buffer is <= to this size; otherwise we
   * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
   * idle connections don't hog memory. 0 disables this check.
   */
  size_t idleWriteBufferLimit_;

  /**
   * Every N calls we check the buffer size limits on a connected TConnection.
   * 0 disables (i.e. the checks are only done when a connection closes).
   */
  int32_t resizeBufferEveryN_;

  /// Set if we are currently in an overloaded state.
  bool overloaded_;

  /// Count of connections dropped since overload started
  uint32_t nConnectionsDropped_;

  /// Count of connections dropped on overload since server started
  uint64_t nTotalConnectionsDropped_;

  /**
   * This is a stack of all the objects that have been created but that
   * are NOT currently in use. When we close a connection, we place it on this
   * stack so that the object can be reused later, rather than freeing the
   * memory and reallocating a new object later.
   */
  std::stack<TConnection*> connectionStack_;

  /**
   * This container holds pointers to all active connections. This container
   * allows the server to clean up unlcosed connection objects at destruction,
   * which in turn allows their transports, protocols, processors and handlers
   * to deallocate and clean up correctly.
   */
  std::vector<TConnection*> activeConnections_;

  /*
  */
  std::shared_ptr<TNonblockingServerTransport> serverTransport_;

  /**
   * Called when server socket had something happen.  We accept all waiting
   * client connections on listen socket fd and assign TConnection objects
   * to handle those requests.
   *
   * @param which the event flag that triggered the handler.
   */
  void handleEvent(THRIFT_SOCKET fd, short which);

  void init() {
    serverSocket_ = THRIFT_INVALID_SOCKET;
    numIOThreads_ = DEFAULT_IO_THREADS;
    nextIOThread_ = 0;
    useHighPriorityIOThreads_ = false;
    userEventBase_ = nullptr;
    threadPoolProcessing_ = false;
    numTConnections_ = 0;
    numActiveProcessors_ = 0;
    connectionStackLimit_ = CONNECTION_STACK_LIMIT;
    maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
    maxConnections_ = MAX_CONNECTIONS;
    maxFrameSize_ = MAX_FRAME_SIZE;
    taskExpireTime_ = 0;
    overloadHysteresis_ = 0.8;
    overloadAction_ = T_OVERLOAD_NO_ACTION;
    writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
    idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
    idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
    resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
    overloaded_ = false;
    nConnectionsDropped_ = 0;
    nTotalConnectionsDropped_ = 0;
  }

public:
  TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
    : TServer(processorFactory), serverTransport_(serverTransport) {
    init();
  }

  TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
    : TServer(processor), serverTransport_(serverTransport) {
    init();
  }


  TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
                     const std::shared_ptr<TProtocolFactory>& protocolFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
                     const std::shared_ptr<ThreadManager>& threadManager
                     = std::shared_ptr<ThreadManager>())
    : TServer(processorFactory), serverTransport_(serverTransport) {
    init();

    setInputProtocolFactory(protocolFactory);
    setOutputProtocolFactory(protocolFactory);
    setThreadManager(threadManager);
  }

  TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
                     const std::shared_ptr<TProtocolFactory>& protocolFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
                     const std::shared_ptr<ThreadManager>& threadManager
                     = std::shared_ptr<ThreadManager>())
    : TServer(processor), serverTransport_(serverTransport) {
    init();

    setInputProtocolFactory(protocolFactory);
    setOutputProtocolFactory(protocolFactory);
    setThreadManager(threadManager);
  }

  TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
                     const std::shared_ptr<TTransportFactory>& inputTransportFactory,
                     const std::shared_ptr<TTransportFactory>& outputTransportFactory,
                     const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
                     const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
                     const std::shared_ptr<ThreadManager>& threadManager
                     = std::shared_ptr<ThreadManager>())
    : TServer(processorFactory), serverTransport_(serverTransport) {
    init();

    setInputTransportFactory(inputTransportFactory);
    setOutputTransportFactory(outputTransportFactory);
    setInputProtocolFactory(inputProtocolFactory);
    setOutputProtocolFactory(outputProtocolFactory);
    setThreadManager(threadManager);
  }

  TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
                     const std::shared_ptr<TTransportFactory>& inputTransportFactory,
                     const std::shared_ptr<TTransportFactory>& outputTransportFactory,
                     const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
                     const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
                     const std::shared_ptr<ThreadManager>& threadManager
                     = std::shared_ptr<ThreadManager>())
    : TServer(processor), serverTransport_(serverTransport) {
    init();

    setInputTransportFactory(inputTransportFactory);
    setOutputTransportFactory(outputTransportFactory);
    setInputProtocolFactory(inputProtocolFactory);
    setOutputProtocolFactory(outputProtocolFactory);
    setThreadManager(threadManager);
  }

  ~TNonblockingServer() override;

  void setThreadManager(std::shared_ptr<ThreadManager> threadManager);

  int getListenPort() { return serverTransport_->getListenPort(); }

  std::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }

  /**
   * Sets the number of IO threads used by this server. Can only be used before
   * the call to serve() and has no effect afterwards.
   */
  void setNumIOThreads(size_t numThreads) {
    numIOThreads_ = numThreads;
    // User-provided event-base doesn't works for multi-threaded servers
    assert(numIOThreads_ <= 1 || !userEventBase_);
  }

  /** Return whether the IO threads will get high scheduling priority */
  bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }

  /** Set whether the IO threads will get high scheduling priority. */
  void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }

  /** Return the number of IO threads used by this server. */
  size_t getNumIOThreads() const { return numIOThreads_; }

  /**
   * Get the maximum number of unused TConnection we will hold in reserve.
   *
   * @return the current limit on TConnection pool size.
   */
  size_t getConnectionStackLimit() const { return connectionStackLimit_; }

  /**
   * Set the maximum number of unused TConnection we will hold in reserve.
   *
   * @param sz the new limit for TConnection pool size.
   */
  void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }

  bool isThreadPoolProcessing() const { return threadPoolProcessing_; }

  void addTask(std::shared_ptr<Runnable> task) {
    threadManager_->add(task, 0LL, taskExpireTime_);
  }

  /**
   * Return the count of sockets currently connected to.
   *
   * @return count of connected sockets.
   */
  size_t getNumConnections() const { return numTConnections_; }

  /**
   * Return the count of sockets currently connected to.
   *
   * @return count of connected sockets.
   */
  size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }

  /**
   * Return the count of connection objects allocated but not in use.
   *
   * @return count of idle connection objects.
   */
  size_t getNumIdleConnections() const { return connectionStack_.size(); }

  /**
   * Return count of number of connections which are currently processing.
   * This is defined as a connection where all data has been received and
   * either assigned a task (when threading) or passed to a handler (when
   * not threading), and where the handler has not yet returned.
   *
   * @return # of connections currently processing.
   */
  size_t getNumActiveProcessors() const { return numActiveProcessors_; }

  /// Increment the count of connections currently processing.
  void incrementActiveProcessors() {
    Guard g(connMutex_);
    ++numActiveProcessors_;
  }

  /// Decrement the count of connections currently processing.
  void decrementActiveProcessors() {
    Guard g(connMutex_);
    if (numActiveProcessors_ > 0) {
      --numActiveProcessors_;
    }
  }

  /**
   * Get the maximum # of connections allowed before overload.
   *
   * @return current setting.
   */
  size_t getMaxConnections() const { return maxConnections_; }

  /**
   * Set the maximum # of connections allowed before overload.
   *
   * @param maxConnections new setting for maximum # of connections.
   */
  void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }

  /**
   * Get the maximum # of connections waiting in handler/task before overload.
   *
   * @return current setting.
   */
  size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }

  /**
   * Set the maximum # of connections waiting in handler/task before overload.
   *
   * @param maxActiveProcessors new setting for maximum # of active processes.
   */
  void setMaxActiveProcessors(size_t maxActiveProcessors) {
    maxActiveProcessors_ = maxActiveProcessors;
  }

  /**
   * Get the maximum allowed frame size.
   *
   * If a client tries to send a message larger than this limit,
   * its connection will be closed.
   *
   * @return Maxium frame size, in bytes.
   */
  size_t getMaxFrameSize() const { return maxFrameSize_; }

  /**
   * Set the maximum allowed frame size.
   *
   * @param maxFrameSize The new maximum frame size.
   */
  void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }

  /**
   * Get fraction of maximum limits before an overload condition is cleared.
   *
   * @return hysteresis fraction
   */
  double getOverloadHysteresis() const { return overloadHysteresis_; }

  /**
   * Set fraction of maximum limits before an overload condition is cleared.
   * A good value would probably be between 0.5 and 0.9.
   *
   * @param hysteresisFraction fraction <= 1.0.
   */
  void setOverloadHysteresis(double hysteresisFraction) {
    if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
      overloadHysteresis_ = hysteresisFraction;
    }
  }

  /**
   * Get the action the server will take on overload.
   *
   * @return a TOverloadAction enum value for the currently set action.
   */
  TOverloadAction getOverloadAction() const { return overloadAction_; }

  /**
   * Set the action the server is to take on overload.
   *
   * @param overloadAction a TOverloadAction enum value for the action.
   */
  void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }

  /**
   * Get the time in milliseconds after which a task expires (0 == infinite).
   *
   * @return a 64-bit time in milliseconds.
   */
  int64_t getTaskExpireTime() const { return taskExpireTime_; }

  /**
   * Set the time in milliseconds after which a task expires (0 == infinite).
   *
   * @param taskExpireTime a 64-bit time in milliseconds.
   */
  void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }

  /**
   * Determine if the server is currently overloaded.
   * This function checks the maximums for open connections and connections
   * currently in processing, and sets an overload condition if they are
   * exceeded.  The overload will persist until both values are below the
   * current hysteresis fraction of their maximums.
   *
   * @return true if an overload condition exists, false if not.
   */
  bool serverOverloaded();

  /** Pop and discard next task on threadpool wait queue.
   *
   * @return true if a task was discarded, false if the wait queue was empty.
   */
  bool drainPendingTask();

  /**
   * Get the starting size of a TConnection object's write buffer.
   *
   * @return # bytes we initialize a TConnection object's write buffer to.
   */
  size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }

  /**
   * Set the starting size of a TConnection object's write buffer.
   *
   * @param size # bytes we initialize a TConnection object's write buffer to.
   */
  void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }

  /**
   * Get the maximum size of read buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will dealloc idle buffer.
   */
  size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }

  /**
   * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
   * Get the maximum size of read buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will dealloc idle buffer.
   */
  size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }

  /**
   * Set the maximum size read buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its read buffer, we free it and allow it to be reinitialized
   * on the next received frame.
   *
   * @param limit of bytes beyond which we will shrink buffers when checked.
   */
  void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }

  /**
   * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
   * Set the maximum size read buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its read buffer, we free it and allow it to be reinitialized
   * on the next received frame.
   *
   * @param limit of bytes beyond which we will shrink buffers when checked.
   */
  void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }

  /**
   * Get the maximum size of write buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will reallocate buffers when checked.
   */
  size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }

  /**
   * Set the maximum size write buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its write buffer, we destroy and construct that buffer with
   * writeBufferDefaultSize_ bytes.
   *
   * @param limit of bytes beyond which we will shrink buffers when idle.
   */
  void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }

  /**
   * Get # of calls made between buffer size checks.  0 means disabled.
   *
   * @return # of calls between buffer size checks.
   */
  int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }

  /**
   * Check buffer sizes every "count" calls.  This allows buffer limits
   * to be enforced for persistent connections with a controllable degree
   * of overhead. 0 disables checks except at connection close.
   *
   * @param count the number of calls between checks, or 0 to disable
   */
  void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }

  /**
   * Main workhorse function, starts up the server listening on a port and
   * loops over the libevent handler.
   */
  void serve() override;

  /**
   * Causes the server to terminate gracefully (can be called from any thread).
   */
  void stop() override;

  /// Creates a socket to listen on and binds it to the local port.
  void createAndListenOnSocket();

  /**
   * Register the optional user-provided event-base (for single-thread servers)
   *
   * This method should be used when the server is running in a single-thread
   * mode, and the event base is provided by the user (i.e., the caller).
   *
   * @param user_event_base the user-provided event-base. The user is
   * responsible for freeing the event base memory.
   */
  void registerEvents(event_base* user_event_base);

  /**
   * Returns the optional user-provided event-base (for single-thread servers).
   */
  event_base* getUserEventBase() const { return userEventBase_; }

  /** Some transports, like THeaderTransport, require passing through
   * the framing size instead of stripping it.
   */
  bool getHeaderTransport();

private:
  /**
   * Callback function that the threadmanager calls when a task reaches
   * its expiration time.  It is needed to clean up the expired connection.
   *
   * @param task the runnable associated with the expired task.
   */
  void expireClose(std::shared_ptr<Runnable> task);

  /**
   * Return an initialized connection object.  Creates or recovers from
   * pool a TConnection and initializes it with the provided socket FD
   * and flags.
   *
   * @param socket FD of socket associated with this connection.
   * @param addr the sockaddr of the client
   * @param addrLen the length of addr
   * @return pointer to initialized TConnection object.
   */
  TConnection* createConnection(std::shared_ptr<TSocket> socket);

  /**
   * Returns a connection to pool or deletion.  If the connection pool
   * (a stack) isn't full, place the connection object on it, otherwise
   * just delete it.
   *
   * @param connection the TConection being returned.
   */
  void returnConnection(TConnection* connection);
};

内部类TConnection

class TNonblockingServer::TConnection {
private:
  /// Server IO Thread handling this connection
  TNonblockingIOThread* ioThread_;

  /// Server handle
  TNonblockingServer* server_;

  /// TProcessor
  std::shared_ptr<TProcessor> processor_;

  /// Object wrapping network socket
  std::shared_ptr<TSocket> tSocket_;

  /// Libevent object
  struct event event_;

  /// Libevent flags
  short eventFlags_;

  /// Socket mode
  TSocketState socketState_;

  /// Application state
  TAppState appState_;

  /// How much data needed to read
  uint32_t readWant_;

  /// Where in the read buffer are we
  uint32_t readBufferPos_;

  /// Read buffer
  uint8_t* readBuffer_;

  /// Read buffer size
  uint32_t readBufferSize_;

  /// Write buffer
  uint8_t* writeBuffer_;

  /// Write buffer size
  uint32_t writeBufferSize_;

  /// How far through writing are we?
  uint32_t writeBufferPos_;

  /// Largest size of write buffer seen since buffer was constructed
  size_t largestWriteBufferSize_;

  /// Count of the number of calls for use with getResizeBufferEveryN().
  int32_t callsForResize_;

  /// Transport to read from
  std::shared_ptr<TMemoryBuffer> inputTransport_;

  /// Transport that processor writes to
  std::shared_ptr<TMemoryBuffer> outputTransport_;

  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
  std::shared_ptr<TTransport> factoryInputTransport_;
  std::shared_ptr<TTransport> factoryOutputTransport_;

  /// Protocol decoder
  std::shared_ptr<TProtocol> inputProtocol_;

  /// Protocol encoder
  std::shared_ptr<TProtocol> outputProtocol_;

  /// Server event handler, if any
  std::shared_ptr<TServerEventHandler> serverEventHandler_;

  /// Thrift call context, if any
  void* connectionContext_;

  /// Go into read mode
  void setRead() { setFlags(EV_READ | EV_PERSIST); }

  /// Go into write mode
  void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }

  /// Set socket idle
  void setIdle() { setFlags(0); }

  /**
   * Set event flags for this connection.
   *
   * @param eventFlags flags we pass to libevent for the connection.
   */
  void setFlags(short eventFlags);

  /**
   * Libevent handler called (via our static wrapper) when the connection
   * socket had something happen.  Rather than use the flags libevent passed,
   * we use the connection state to determine whether we need to read or
   * write the socket.
   */
  void workSocket();

public:
  class Task;

  /// Constructor
  TConnection(std::shared_ptr<TSocket> socket,
              TNonblockingIOThread* ioThread) {
    readBuffer_ = nullptr;
    readBufferSize_ = 0;

    ioThread_ = ioThread;
    server_ = ioThread->getServer();

    // Allocate input and output transports these only need to be allocated
    // once per TConnection (they don't need to be reallocated on init() call)
    inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
    outputTransport_.reset(
        new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));

    tSocket_ =  socket;

    init(ioThread);
  }

  ~TConnection() { std::free(readBuffer_); }

  /// Close this connection and free or reset its resources.
  void close();

  /**
    * Check buffers against any size limits and shrink it if exceeded.
    *
    * @param readLimit we reduce read buffer size to this (if nonzero).
    * @param writeLimit if nonzero and write buffer is larger, replace it.
    */
  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);

  /// Initialize
  void init(TNonblockingIOThread* ioThread);

  /// set socket for connection
  void setSocket(std::shared_ptr<TSocket> socket);

  /**
   * This is called when the application transitions from one state into
   * another. This means that it has finished writing the data that it needed
   * to, or finished receiving the data that it needed to.
   */
  void transition();

  /**
   * C-callable event handler for connection events.  Provides a callback
   * that libevent can understand which invokes connection_->workSocket().
   *
   * @param fd the descriptor the event occurred on.
   * @param which the flags associated with the event.
   * @param v void* callback arg where we placed TConnection's "this".
   */
  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
    assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
    ((TConnection*)v)->workSocket();
  }

  /**
   * Notification to server that processing has ended on this request.
   * Can be called either when processing is completed or when a waiting
   * task has been preemptively terminated (on overload).
   *
   * Don't call this from the IO thread itself.
   *
   * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
   */
  bool notifyIOThread() { return ioThread_->notify(this); }

  /*
   * Returns the number of this connection's currently assigned IO
   * thread.
   */
  int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }

  /// Force connection shutdown for this connection.
  void forceClose() {
    appState_ = APP_CLOSE_CONNECTION;
    if (!notifyIOThread()) {
      server_->decrementActiveProcessors();
      close();
      throw TException("TConnection::forceClose: failed write on notify pipe");
    }
  }

  /// return the server this connection was initialized for.
  TNonblockingServer* getServer() const { return server_; }

  /// get state of connection.
  TAppState getState() const { return appState_; }

  /// return the TSocket transport wrapping this network connection
  std::shared_ptr<TSocket> getTSocket() const { return tSocket_; }

  /// return the server event handler if any
  std::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }

  /// return the Thrift connection context if any
  void* getConnectionContext() { return connectionContext_; }
};

友元类TNonblockingIOThread

class TNonblockingIOThread : public Runnable {
public:
  // Creates an IO thread and sets up the event base.  The listenSocket should
  // be a valid FD on which listen() has already been called.  If the
  // listenSocket is < 0, accepting will not be done.
  TNonblockingIOThread(TNonblockingServer* server,
                       int number,
                       THRIFT_SOCKET listenSocket,
                       bool useHighPriority);

  ~TNonblockingIOThread() override;

  // Returns the event-base for this thread.
  event_base* getEventBase() const { return eventBase_; }

  // Returns the server for this thread.
  TNonblockingServer* getServer() const { return server_; }

  // Returns the number of this IO thread.
  int getThreadNumber() const { return number_; }

  // Returns the thread id associated with this object.  This should
  // only be called after the thread has been started.
  Thread::id_t getThreadId() const { return threadId_; }

  // Returns the send-fd for task complete notifications.
  evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }

  // Returns the read-fd for task complete notifications.
  evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }

  // Returns the actual thread object associated with this IO thread.
  std::shared_ptr<Thread> getThread() const { return thread_; }

  // Sets the actual thread object associated with this IO thread.
  void setThread(const std::shared_ptr<Thread>& t) { thread_ = t; }

  // Used by TConnection objects to indicate processing has finished.
  bool notify(TNonblockingServer::TConnection* conn);

  // Enters the event loop and does not return until a call to stop().
  void run() override;

  // Exits the event loop as soon as possible.
  void stop();

  // Ensures that the event-loop thread is fully finished and shut down.
  void join();

  /// Registers the events for the notification & listen sockets
  void registerEvents();

private:
  /**
   * C-callable event handler for signaling task completion.  Provides a
   * callback that libevent can understand that will read a connection
   * object's address from a pipe and call connection->transition() for
   * that object.
   *
   * @param fd the descriptor the event occurred on.
   */
  static void notifyHandler(evutil_socket_t fd, short which, void* v);

  /**
   * C-callable event handler for listener events.  Provides a callback
   * that libevent can understand which invokes server->handleEvent().
   *
   * @param fd the descriptor the event occurred on.
   * @param which the flags associated with the event.
   * @param v void* callback arg where we placed TNonblockingServer's "this".
   */
  static void listenHandler(evutil_socket_t fd, short which, void* v) {
    ((TNonblockingServer*)v)->handleEvent(fd, which);
  }

  /// Exits the loop ASAP in case of shutdown or error.
  void breakLoop(bool error);

  /// Create the pipe used to notify I/O process of task completion.
  void createNotificationPipe();

  /// Unregisters our events for notification and listen sockets.
  void cleanupEvents();

  /// Sets (or clears) high priority scheduling status for the current thread.
  void setCurrentThreadHighPriority(bool value);

private:
  /// associated server
  TNonblockingServer* server_;

  /// thread number (for debugging).
  const int number_;

  /// The actual physical thread id.
  Thread::id_t threadId_;

  /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
  THRIFT_SOCKET listenSocket_;

  /// Sets a high scheduling priority when running
  bool useHighPriority_;

  /// pointer to eventbase to be used for looping
  event_base* eventBase_;

  /// Set to true if this class is responsible for freeing the event base
  /// memory.
  bool ownEventBase_;

  /// Used with eventBase_ for connection events (only in listener thread)
  struct event serverEvent_;

  /// Used with eventBase_ for task completion notification
  struct event notificationEvent_;

  /// File descriptors for pipe used for task completion notification.
  evutil_socket_t notificationPipeFDs_[2];

  /// Actual IO Thread
  std::shared_ptr<Thread> thread_;
};

ThreadedServer

void TThreadedServer::serve() {
  // 1.serve
  TServerFramework::serve();

  // 2.条件变量wait等待执行
  Synchronized s(clientMonitor_);
  while (!activeClientMap_.empty()) {
    clientMonitor_.wait();
  }
  // 3.join回收thread
  drainDeadClients();
}

server和TSimpleServer一样,不赘述。条件变量在 thrift源码解析之concurrency 中已经阐述过,不赘述。只看下join回收thread:

void TThreadedServer::drainDeadClients() {
  // we're in a monitor here
  while (!deadClientMap_.empty()) {
    auto it = deadClientMap_.begin();
    it->second->join();
    deadClientMap_.erase(it);
  }
}

ThreadedPoolServer

void TThreadPoolServer::serve() {
  TServerFramework::serve();
  threadManager_->stop();
}
void ThreadManager::Impl::stop() {
  Guard g(mutex_);
  bool doStop = false;

  if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
      && state_ != ThreadManager::STOPPED) {
    doStop = true;
    state_ = ThreadManager::JOINING;
  }

  if (doStop) {
    removeWorkersUnderLock(workerCount_);
  }

  state_ = ThreadManager::STOPPED;
}

TEventloopServer

处理网络请求时,通常具有两种体系结构。

基于线程 thread-based architecture
使用多线程来处理客户端的请求,每当接收一个请求便开启一个独立的线程来处理。这种方式虽然简单直观,但仅适用于并发访问不大的场景。因为线程是需要占用一定的内存资源,而且操作系统在线程之间的切换也需要一定的开销。当线程过多时显然会降低网络服务器的性能。另外,当线程在处理IO操作时,在等待输出的这段时间内线程是处于空闲状态,造成CPU资源浪费。

事件驱动 event-driver architecture
事件驱动体系结构是目前广泛使用的一种方式,这种方式定义了一系列的事件处理程序来响应事件的发生,而且将服务端接收连接和事件处理分离,事件本身只是一种状态的改变。在事件驱动的应用中,会将一个或多个客户端的服务请求分离demultiplex和调度dispatch给应用程序。

Reactor 反应堆设计模式

void TEventloopServer::serve() {
  // init listen socket
  if (serverSocket_ == THRIFT_INVALID_SOCKET)
    createAndListenOnSocket();

  accepter_ = new Accepter(eventloop_, this, serverSocket_);
  notify_ = new Notifier(eventloop_);
}
class Accepter : public IFdEventHandler {
public:
    Accepter(IEventLoop& eventloop,
             TEventloopServer* server,
             THRIFT_SOCKET listenSocket);

    ~Accepter();

private:
    virtual void HandleEvent(int fd, TEventType type) override;

    IEventLoop& eventloop_;

    TEventloopServer* server_;

    THRIFT_SOCKET listenSocket_;
};
class Notifier : IFdEventHandler {
public:
    Notifier(IEventLoop& eventloop);

    ~Notifier();

    bool push(TEventloopServer::TConnection* connection);

private:
    virtual void HandleEvent(int fd, TEventType type) override;

    TEventFd TriggerEvent;

    IEventLoop& eventloop_;

    std::list<TEventloopServer::TConnection*> connection_list_;

    /// Synchronizes access to connection stack and similar data
    Mutex connMutex_;
};
class IFdEventHandler
{
public:
  enum   TEventType {
    EVENT_READ = 1 << 0,
    EVENT_WRITE = 1 << 1
  };
  enum   TEventPriority {
    EVENT_PRIORITY_HIGH, EVENT_PRIORITY_NORMAL,
    EVENT_PRIORITY_LOW
  };
  virtual ~IFdEventHandler()       /** Empty*/
  {
  }

  virtual void HandleEvent(int fd, TEventType type) = 0;
};
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

thrift源码解析之server 的相关文章

随机推荐

  • stm32 keil5 ST-LINK USB communication error问题解决

    换个keil版本可以解决相应的问题
  • 图像处理技术OpencvSharp入门

    目录 第一部分 初识Opencv 1 C 下Opencv库 2 安装OpenCvSharp 第二部分 OpencvSharp入门 1 加载图像文件 2 显示图像 第三部分 基础应用 1 颜色转换 2 尺寸调整 3 二值化 4 四则运算 5
  • 软件交付质量复盘与注意事项

    软件交付质量 在日常的工作流程中 比较通用的流程如下图所示 从质量保障和交付的角度来讲 软件交付生命周期中大体可分为如下三个阶段 需求设计质量 这个阶段包括原型图 PRD文档 交互设计 技术方案 测试用例等几项重要产出物 当然他们有一定的前
  • Spring Boot 实现定时任务动态管理,太爽了!

    一 功能说明 SpringBoot的定时任务的加强工具 实现对SpringBoot原生的定时任务进行动态管理 完全兼容原生 Scheduled注解 无需对原本的定时任务进行修改 二 快速使用 具体的功能已经封装成SpringBoot sta
  • yolov5 烟雾和火焰检测

    视频实时多人姿态估计 cpu fps33 实时视频动作检测 action detection 基于人体姿态的跌倒检测 yolov5 烟雾和火焰检测 文章用到的云gpu详细使用说明 随着社会经济的高速发展 工业 企业园区 住宅日益增多 存在一
  • 交叉编译eigen3.2.10至ARM架构

    交叉编译eigen3 2 10至ARM架构 1 下载交叉编译链 PC机为x86架构 目标平台为ARM架构 首先需要安装x86至ARM平台的交叉编译链 需要注意的是 编译链上C库的版本需要和目标平台上的C库版本兼容 我起初参考其他博客直接ap
  • 运放分析--虚短与虚断

    虚短与虚断 1 虚短 如图1所示 虚短是指运放的输入端V 和V 可视为电压差很小 即近似相等 V V 由于并没有实际的物理连接 故我们称其为虚短 以区别物理连接的短路 若其中一端接地 则另一端在必要时 可认为虚地 2 虚断 由于运放是高阻抗
  • ScheduledThreadPoolExecutor 线程池例子

    ScheduledThreadPoolExecutor 线程池例子 一 ScheduledThreadPoolExecutor 使用 1 使用示例 提交任务 简单例子 二 ScheduledThreadPoolExecutor 原理 1 D
  • android状态栏一体化(沉浸式状态栏)

    Android 沉浸式状态栏 状态栏一体化 透明状态栏 仿ios透明状态栏 http blog csdn net jdsjlzx article details 50437779 注 状态栏的字体颜色位白色 如果状态栏背景为白色 上面的博客
  • Easyui入门(二)

    Easyui入门之Tree后台实现 tree的组件简介 案例1 运行结果 2 tree组件工具类的实现思路 预热 方案 代码 链接 代码2 正式从数据库拿数据写 代码 代码2 总结 tree的组件简介 静态的html方式 缺点 如果树形结构
  • C++中类的静态成员变量

    在C语言中 我们知道有static静态变量 生命周期与作用域都跟普通变量有所不同 而在C 的类中 也有静态成员变量同时还有静态成员函数 先来看看C 中静态成员变量与静态成员函数的语法 include
  • 润和软件推出HarmonyOS物联网系列模组Neptune,助力Harmony生态

    在2020 第十七届 中国物联网产业大会上 HarmonyOS首批官方合作伙伴润和软件宣布推出HarmonyOS智能硬件新品 支持HarmonyOS的物联网系列模组Neptune HH SLNPT10x 该系列模组使用的芯片由润和软件HiH
  • C语言,打印杨辉三角

    include
  • 【编译原理】三地址码

    三地址码 编译器构造 编译器的结构 中间语言 中间语言表达式 逆波兰 RPN 形式 图形 语义树 三地址码表达形式 四地址码表达形式 三地址码 三地址码 TAC 指令 三地址码的使用和特点 文字表 优化阶段 编译器构造 编译器的结构 语义检
  • powershell get-date计算指定日期及格式化

    get date format yyyyMMdd 获取当天日期并格式化为20200107的格式 get date UFormat V 获取当天是本年度的第几周 这里有一个bug 就是每周一获取到的还是上周 get date adddays
  • Ant-Design-Pro小试:react开发步骤(mock数据)

    1 router config js path train name train icon profile routes profile path train list name list component Train List 2 me
  • object-c万能解决bug思路

    有关运算符重载 C 支持运算符重载 但 Objective C 中不支持 然而 Objc 中可以看到下面的用法 id obj dict keyStr 它和 id obj dict objectForKey keyStr 等价 这里的 的用法
  • java综合技术分享

    1 心跳机制 1 1心跳包机制 跳包之所以叫心跳包是因为 它像心跳一样每隔固定时间发一次 以此来告诉服务器 这个客户端还活着 事实上这是为了保持长连接 至于这个包的内容 是没有什么特别规定的 不过一般都是很小的包 或者只包含包头的一个空包
  • stack queue free-lock implate

    https github com kayaklee libhalog blob master test clib hv sample lifo cpp https github com kayaklee libhalog blob mast
  • thrift源码解析之server

    文章目录 前言 概述 TSimpleServer serve 1 listen 2 accept 3 newlyConnectedClient TNonblockingServer serve 1 registerEvents 1 赋值us