SourceXR

C/C++ Cross-Reference Tool

Passing Descriptors Between Processes

Passing descriptors (files, sockets, etc.) between processes can be used as a powerful technique to increase parallelism between concurrent processes.

To retain process separation (as opposed to multithreading) but still be able to perform efficiently on multicore architectures, passing descriptors is a way to go to easily be able to have several handling processes cooperating to perform their tasks.

For example, if you have a network application with a single point of entry (the listening socket) but you want to be able to spread the processing load on several processes, parameters passing allow the listening process to pass the connection (socket returned by accept()) to another process to take care of the processing.

Having split processes will simplify application design and maintenance, while allowing provisions for scalability.

Processing flow could be organized as several stages, each of which performing a basic step of processing. Each of these basic step can be easily parallelized. This would constitutes the horizontal partitioning of the work load.

Descriptor passing is a convenient and safe way to distribute the load on these parallelized steps. We describe here an implementation of this technique on Linux.

We assume here an application split in two steps:

  1. the accept stage, responsible for accepting new connections and load balancing them to the second stage,
  2. the processing stage, responsible for data handling.

A single accept stage process is running, and distributes work to one or more processing stage processes. This number does not have to be fixed. We could on the contrary assume that it dynamically adapts to the required work load.

Accept Stage

This stage is basically built around a reactive loop performing accept() when new connections are established and sends these new connections to the processing stage.

Here is the pseudo code of the loop:

// setup
listSock = socket ();

// enable communication with processing stage

listen (listSock, backlog);

// put socket in reactor

// and then loop to accept new connections
while (true) {
    // wait for new connections
    select(); // or another reactor (epoll)
    int s = accept(listSock);
    // send socket to other processes (possibly performing some load balancing)
    // and continue
}

Processing Stage

It accepts new connections from the previous stage and processes them. It is built around a reactive loop as well to perform non-blocking IO and be able to handle many connections at the same time.

Here is the pseudo code of the loop:

// setup communication with accept stage
// and then loop to accept connections
while (true) {
    select();
    // get new connection from accept stage
    // handle application work (possibly delegating work to another process)
    // and continue
}

Passing Descriptors

Passing descriptors does not require processes to be related (parent and child through fork()) but they have to be connected through a UNIX socket (of course they're located on the same host!).

There will be one connection through a UNIX socket between each processing stage process and the accept stage process. That is, if we have 5 processing processes, there will be 5 socket pairs created.

Socket Creation

The socket is created using the UNIX transport protocol, in a reliable way:

#include <sys/un.h>

int sock = socket (PF_UNIX, SOCK_STREAM, 0);
// set as non blocking
int flags = fcntl(sock, F_GETFL, 0);
fcntl (sock, F_SETFL, flags | O_NONBLOCK);

Processes Communication Setup

We assume that the processing stage processes are already created.

They create the UNIX socket using the preceding code snippet and bind it to be able to receive the accept stage connection.

We assume that the accept stage already knows the binding address of the process. They can use a well-known address or have a way of communicating the binding address to the accept stage (for example if the accept stage forks the process stage, the binding address can be defined at this moment).

// address is the filename used for binding, it has to be an absolute path
const char *address = "/tmp/socket";
unlink (address); // remove any previously created socket
sockaddr_un addr;
addr.sun_family = AF_LOCAL;
strcpy (addr.sun_path, address);
bind (sock, reinterpret_cast<sockaddr*> (&addr),
      sizeof (sockaddr_un));
// listen to connections from accept stage
listen (sock, 5);
// and add socket to reactor

When an accept process connects to the process stage, we add the new connection to the reactor. Messages received on this connection contain the passed descriptors.

sockaddr_un addr;
socklen_t ssize = sizeof (sockaddr_un);
int r = accept (sock, reinterpret_cast<sockaddr*> (&addr), &ssize);
if (r != -1) {
    // add new connection to the reactor
}

Connecting Processes

The accept stage performs the connect call to the process stage:

sockaddr_un addr;
addr.sun_family = AF_LOCAL;
const char *path = "/tmp/socket";
strcpy (addr.sun_path, path);
// sock is the previously created UNIX socket
connect (sock, reinterpret_cast<sockaddr*> (&addr),
         sizeof (sockaddr_un));

The pipe between the two processes is now ready, we can pass descriptors.

Passing Descriptors

Passing descriptors consists in writing specific data on the pipe we have just established using the msghdr data structure which allows passing of other things than merely data (they are ancillary data).

It can be used to pass Unix credentials as well.

Sender

On sender side, we fill the required members of the cmsghdr structure and we send one byte of data (which is unused here, but could be used to add semantic information to the receiving process):

int fdToPass;
// Data to pass to receiving process (should be at least one byte long)
char c = 0;
void *data = &c;
size_t dataSize = 1;

// Fill data structure to pass descriptor
msghdr msg = { 0, 0, 0, 0, 0, 0, 0 };
iovec iov;
iov.iov_base = data;
iov.iov_len = dataSize;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

char buf[CMSG_SPACE (sizeof (int))];
msg.msg_control = buf;
msg.msg_controllen = sizeof (buf);

cmsghdr* cmsg = CMSG_FIRSTHDR (&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS; // indicates we're passing descriptors
cmsg->cmsg_len = CMSG_LEN (sizeof (int));
int* fd = reinterpret_cast<int*>(CMSG_DATA (cmsg));
*fd = fdToPass;
msg.msg_controllen = cmsg->cmsg_len;

// send this message to the chosen process
// sock identifies the connection accept process <-> processing process
sendmsg (sock, &msg, 0);
// and close the descriptor (the descriptor remains open for the receiving process)
close (fdToPass);

Receiver

On the receiver side, we check the contents of the received cmsghdr structure and get the passed descriptor:

msghdr msg = { 0, 0, 0, 0, 0, 0, 0 };

// receive data from the accept stage
char c = 0;
iovec iov;
iov.iov_base = &c;
iov.iov_len = 1;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

char buf[CMSG_SPACE (sizeof (int))];
msg.msg_control = buf;
msg.msg_controllen = sizeof (buf);

int r = recvmsg (sock, &msg, 0);

cmsghdr* cmsg = CMSG_FIRSTHDR( &msg );
if ((cmsg == 0)
    || (cmsg->cmsg_level != SOL_SOCKET)
    || (cmsg->cmsg_type != SCM_RIGHTS)
    || (cmsg->cmsg_len != CMSG_LEN (sizeof (int)))) {
    // wrong message
    // handle error
    return 0;
}

// get descriptor from message
int fd = *reinterpret_cast<int*> (CMSG_DATA (cmsg));

// do whatever is needed:
// put in reactor, etc

Putting Everything Together

Now we have the building blocks for the two processes. Here is the code corresponding to the two processes. Compile them using the following command lines:

g++ -Wall -W -o accept accept.cpp
g++ -Wall -W -o process process.cpp

Accept.cpp

The source code for the acceptor process is available here.

Process.cpp

The source code for the processing process is available here.

To simplify, they use a well-known socket (/tmp/socket) to communicate. Launch them with the following command line:

./process

./accept 3000

Then you can fire a telnet to the port 3000 and see what happens.

If you check with netstat, you will see that the connection from telnet to the accept process is now handled by the processing process:

# netstat -ap
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address    Foreign Address  State       PID/Program name
tcp        0      0 *:3000           *:*              LISTEN      31807/accept
tcp        0      0 localhost:3000   localhost:53957  ESTABLISHED 31803/process
tcp        0      0 localhost:53957  localhost:3000   ESTABLISHED 31808/telnet

Conclusion

Passing descriptors is a very powerful technique. We've just seen a glimpse on how to implement it here, but the level of indirection it adds to the application architecture allows many more possibilities, such as increased parallelism, load balancing, reliability (if one process crashes, the other remain), easier upgrade (a running process can be shutdown and restarted, but the application can still work on the remaining processes), etc.

Comments !