SourceXR

C/C++ Cross-Reference Tool

Non-Blocking accept()/connect()

A key to achieve high performance with network applications is to avoid any blocking operation, especially when the time to complete them is not deterministic. In this short article we'll describe how to perform accepts and connects without blocking, using the well known reactor pattern.

Reactor Pattern

In a few words: a reactor is a way to watch several descriptors in one place and then dispatch received events to the proper handler. A watched descriptor can be a socket, a file descriptor, a signal, etc. The events to watch are read event, write event and so on. When the reactor returns a ready event, it means that the corresponding call will not block. For example, a read notification means that the read() system call will return without blocking.

A network application is basically designed around an event loop, which is waiting in the reactor. Whenever an event arrived, the reactor returns and the handler specific code is run.

On Linux the reactor pattern is built using select(), poll() or epoll().

The following code shows a network loop using epoll:

const int size = 10; // hint
int epollfd = epoll_create (size);
if (epollfd == -1) {
   std::cerr << strerror (errno) << "\n";
   return 1;
}

while (true) {

   const int infinity = -1;
   // wait for events
   epoll_event events[size];
   int r = epoll_wait (epollfd, events, size, infinity);
   if (r == -1) {
      close (epollfd);
      close (fd);
      return 1;
   }

   // demultiplex events returned in events variable
   // ...
}

Setup

For both operations, the common operations to perform are the following:

  • create the socket in non-blocking mode (since kernel 2.6.27)
  • put it in non-blocking mode (with fcntl) if needed
  • add it to the reactor

For the accepting socket, the event to watch is a read event, and conversely for the connecting socket a write event is to be watched.

The following code snippet shows the socket creation and setup:

    int fd = socket (AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (fd == -1) {
        std::cerr << strerror (errno) << "\n";
        return 1;
    }

Connect

connect() is called and its return code tells us if the connection was established immediately or not: EINPROGRESS is returned. In that case, we add the socket to the reactor watching for a write event (EPOLLOUT for epoll). Then we wait for events with epoll_wait.

    // issue connect (may be established immediately)
    const char *ip = argv[1];
    const unsigned short port = atoi (argv[2]);
    sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons (port);
    inet_aton (ip, &addr.sin_addr);

    bool connected = true;
    int i = connect (fd, reinterpret_cast<sockaddr *> (&addr),
                     sizeof (sockaddr_in));
    if ((i == -1)
        && (errno == EINPROGRESS)) {
        // connection not yet established, use reactor
        connected = false;
    }

The connection was not immediately established: we add the socket to the reactor, waiting for a write notification:

    // add fd to reactor
    event.events = EPOLLIN;
    if (!connected) {
        event.events |= EPOLLOUT; // connection established is a write event
    }
    event.data.fd = fd; // user data
    int r = epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, &event);
    if (r == -1) {
        std::cerr << strerror (errno) << "\n";
        close (epollfd);
        close (fd);
        return 1;
    }

We wait for epoll and when it returns, and if a write event is available, we check the pending sockets errors, as connect() may have failed. The returned error is to be interpreted like errors returned by connect() in blocking mode.

    while (true) {

        epoll_event events[size];
        const int infinity = -1;
        // and wait for events
        int r = epoll_wait (epollfd, events, size, infinity);
        if (r == -1) {
            close (epollfd);
            close (fd);
            return 1;
        }

        // demultiplex events
        int i = 0;
        while (i < r) {
            if (events[i].data.fd == fd) {
                if (events[i].events & EPOLLOUT) {
                    if (!connected) {
                        // connection established
                        // check pending error if ever
                        int err = 0;
                        socklen_t len = sizeof (int);
                        int i = getsockopt (fd, SOL_SOCKET, SO_ERROR, &err,
                                            &len);
                        if (i != -1) {
                            if (err == 0) {
                                std::cout << "Connection established\n";
                                // disable write event: to reenable when
                                // data are to be sent on the socket
                                epoll_event event;
                                event.events = EPOLLIN;
                                event.data.fd = fd; // user data
                                int r = epoll_ctl (epollfd, EPOLL_CTL_MOD,
                                                   fd, &event);
                                if (r == -1) {
                                    std::cerr << strerror (errno) << "\n";
                                    close (epollfd);
                                    close (fd);
                                    return 1;
                                }
                            }
                            else {
                                // connection failed
                                std::cerr << strerror (err) << "\n";
                                epoll_event event;
                                event.events = 0;
                                event.data.fd = fd; // user data
                                epoll_ctl (epollfd, EPOLL_CTL_DEL,
                                            fd, &event);
                                close (fd);
                            }

If we successfully connected the peer, we disable the write notification as we have nothing (yet) to send to the peer. When it is the case, we would have to enable again the event.

Accept

Here is the code to setup the server. Apart from the classic initialization (socket creation, reuse mode setting, binding to the local listening port and reactor setup), there is small to add to accept new connections in non-blocking mode.

    int listenfd = socket (AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (listenfd == -1) {
        std::cerr << strerror (errno) << "\n";
        return 1;
    }

    int yes = 1;
    if (setsockopt (listenfd, SOL_SOCKET, SO_REUSEADDR, &yes,
                    sizeof (yes)) == -1 ) {
        std::cerr << strerror (errno) << "\n";
        return 1;
    }

    const unsigned short port = atoi (argv[1]);
    sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons (port);
    addr.sin_addr.s_addr = INADDR_ANY;

    if (bind (listenfd, reinterpret_cast<struct sockaddr *> (&addr),
              sizeof (sockaddr_in)) == -1) {
        std::cerr << strerror (errno) << "\n";
        return 1;
    }

    if (listen (listenfd, 5) == -1) {
        std::cerr << strerror (errno) << "\n";
        return 1;
    }

A new connection is notified by the reactor as a read event, therefore we add the listening socket with the EPOLLIN mask:

    const int size = 10; // hint
    int epollfd = epoll_create (size);
    if (epollfd == -1) {
        std::cerr << strerror (errno) << "\n";
        return 1;
    }

    epoll_event event;

    // add fd to reactor
    event.events = EPOLLIN; // new connection is a read event
    event.data.fd = listenfd; // user data
    int r = epoll_ctl (epollfd, EPOLL_CTL_ADD, listenfd, &event);
    if (r == -1) {
        std::cerr << strerror (errno) << "\n";
        close (epollfd);
        close (listenfd);
        return 1;
    }

Be careful when accepting the connection, as it may have already been shut down:

    while (true) {

        epoll_event events[size];
        const int infinity = -1;
        // and wait for events
        int r = epoll_wait (epollfd, events, size, infinity);
        if (r == -1) {
            close (epollfd);
            close (listenfd);
            return 1;
        }

        // demultiplex events
        int i = 0;
        while (i < r) {
            if (events[i].data.fd == listenfd) {
                // new connection available
                sockaddr_in remote;
                socklen_t len = sizeof (sockaddr_in);
                int i = accept (listenfd,
                                reinterpret_cast<sockaddr *> (&remote),
                                &len);
                if (i == -1) {
                    if ((errno == EAGAIN)
                        || (errno == EWOULDBLOCK)) {
                        // already closed
                    }
                    else {
                        std::cerr << strerror (errno) << "\n";
                    }
                }
                else {
                    // handle new connection:
                    // add it to reactor, etc
                    std::cout << "New connection\n";
                }
            }
            ++i;
        }
    }

Conclusion

The reactor pattern allows a seamless integration of connecting/accepting sockets within the network application. There is no longer an excuse to perform blocking calls for these two operations. Moreover, many toolkits already implements this mechanisms, just provide the callbacks to perform required work when the connection is established/accepted, and go!

Source Code

The source file of the two files can be found here (connector) and here (acceptor).

Comments !