Poll/Notify Thread Synchronization Model in Multi-Thread Environment

In most cases, we need to implement thread synchronization mechanism in the following manner: There are W Workers, and each worker communicates with each other through message passing. There is a Coordinator in each Worker, which is in charge of message sending and dispatching. Specifically, Sender and Receiver in Coordinator are for message sending and receiving. When one message arrived, Receiver should trigger an event according to the content of this message, and pass this event to Worker to do some user defined event handling.

Each Receiver maintains a channel(message buffer) for each Sender. There are W Workers, and there should be W channels for each Receiver. In the Receiver side, a listening-thread is needed to poll on these channels. Mostly, we do not want to waste CPU cycles on the listening-thread. We need a Poll/Notify mechanism to wake up the listening thread in Receiver when a channel is ready to read. I.E, when there is no channel available, the listening-thread should be blocked.

This model is useful in many cases, such as user-space network protocol implementation, and message passing between NUMA nodes(like this paper :GraM: Scaling Graph Computation to the Trillions does).

conditional variable V.S. eventfd V.S. sockets

pthread provides conditional wait mechanism. Thread A wait for some event, and thread B can notify the threads that waiting on the event. This is the Wait/Notify thread synchronization. One thread wait and the other thread notify. The limitation of conditional wait is one thread can not wait on multiple condition variables.

eventfd is introduced in Linux kernel after version 2.6.22.

#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);

eventfd() creates an “eventfd object” that can be used as an event wait/notify mechanism by user-space applications, and by the kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (uint64_t) counter that is maintained by the kernel. This counter is initialized with the value specified in the argument initval.

eventfd will create an file descriptor for event notification. The file descriptor can be used in epoll/read/write calls.

More information can be found in the man page.

Socket is used for communication in distributed environment, which would bring the overhead of network communication stack in our case.

Poll/Notify Implementation with eventfd and epoll

The Poll/Notify mechanism can be implemented with eventfd and epoll. For the Receiver, it only needs to create an eventfd for each channel, and add them to epoll file descriptor. The listening thread epoll_wait on the epoll file descriptor.

These codes can be found in Github

The Coordinator Implmentation :

//
// Created by pgplus1628 on 2/16/16.
//

#pragma once

#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <glog/logging.h>
#include <errno.h>
#include <stdint.h>


#define EPOLL_WAIT_TIMEOUT (1000) // 1.0 second

class Receiver;

struct RXArgs {
	Receiver * rx;
};


class Receiver {
private :
	int kclient_;
	int id_;

public :
	int * chan_fds; // event fds of this channel
	pthread_t listener;

	Receiver(int kclients, int id) : kclient_(kclients), id_(id) {
		chan_fds = new int [kclient_];
		for(int i = 0;i < kclient_;i ++) {
			chan_fds[i] = eventfd(0, EFD_NONBLOCK);
			CHECK_NE(chan_fds[i], -1) << " Create eventfd for channel " << i << " failed.";
		}
	}

	// TODO
	void register_read_handler() { }

	void listen() {
		int ret;

		// create epoll event
		struct epoll_event events[kclient_];
		int ep_fd = epoll_create(kclient_);
		CHECK_GE(ep_fd, 0) << "RX::epoll_create failed.";

		for(int i = 0;i < kclient_;i ++) {
			struct epoll_event read_event;
			int efd = chan_fds[i];
			read_event.events = EPOLLHUP | EPOLLERR | EPOLLIN | EPOLLET;
			read_event.data.u32 = i; // store index
			ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, efd, &read_event);
			CHECK_GE(ret, 0) << "RX " << id_ << " ::epoll_ctl failed.";
		}

		LOG(INFO) << "RX " << id_ << " ::begin listening.";
		// while loop
		uint64_t val;
		while(1) {
			bool ok = true;
			int nfds = epoll_wait(ep_fd, &events[0], kclient_, EPOLL_WAIT_TIMEOUT);
			if (nfds > 0) {
				// handle each ready fd.
				for(int i = 0;i < nfds;i ++) {
					int cid = static_cast<int>(events[i].data.u32);
					if ( events[i].events & EPOLLHUP) {
						LOG(ERROR) << "RX " << id_<< " ::epoll eventfd has epoll hup for " << cid;
						ok = false;
					} else if (events[i].events & EPOLLERR) {
						LOG(ERROR) << "RX " << id_ << " ::epoll eventfd has epoll error for " << cid;
						ok = false;
					} else if (events[i].events & EPOLLIN) {
						int event_fd = chan_fds[cid];
						ret = read(event_fd, &val, sizeof(val));
						CHECK_GE(ret, 0) << "RX" << id_ << " ::read event_fd failed.";
						// TODO read handler
						LOG(INFO) << "RX" << id_ << " ::read from " << cid << " val " << val << " with size " << ret << " bytes.";
					}
					if (!ok) { break; }
				}
				if (!ok) { break; }
			} else if (nfds == 0) {
				LOG(INFO) << "RX " << id_ << " ::epoll wait time out.";
			} else {
				LOG(ERROR) << "RX " << id_ << " ::epoll wait error.";
			}
		}

		LOG(INFO) << "RX " << id_ << " ::exit listen.";
	}

	static void * listen_fn(void * args_) {
		struct RXArgs * args = static_cast<struct RXArgs*>(args_);
		Receiver * rx = args->rx;
		rx->listen();
		free(args);
		pthread_exit(NULL);
	}


	void start_listener() {
		struct RXArgs * args = (struct RXArgs*)malloc(sizeof(struct RXArgs));
		args->rx = this;
		pthread_create(&listener, NULL, Receiver::listen_fn, (void*)args);
	}

	int get_id() { return id_; }

	~Receiver() {
		// join listener
		pthread_join(listener, NULL);
		// close fds
		for(int i = 0;i < kclient_; i ++) {
			if(chan_fds[i] >= 0) { close(chan_fds[i]); chan_fds[i] = -1;}
		}
		// free
		delete chan_fds;
	}


};



class Sender {
private :
	int kserver_;
	int id_;

public :
	int * chan_fds; // channel eventfds, get from Receiver

	Sender(int kserver, int id, Receiver ** servers) : kserver_(kserver), id_(id){
		chan_fds = new int [kserver_];
		for(int i = 0;i < kserver;i ++) {
			chan_fds[i] = servers[i]->chan_fds[id_];
		}
	}

	void signal(int serv_id, uint64_t val) {
		ssize_t ret = 0;
		do {
			ret = write(chan_fds[serv_id], &val, sizeof(val));
		} while(ret < 0 && errno == EAGAIN);
		CHECK_GE(ret, 0) << "TX " << id_ << " -> " << serv_id << " write fd error : " << strerror(errno);
	}

	int get_id() { return id_; }

};

Main.cpp

#include "coordinator.hpp"
#include <pthread.h>
#include <glog/logging.h>

#define NNODE (8)

struct TXArgs {
	Sender * tx;
};

void * tx_fn(void *args_) {
	struct TXArgs * args = static_cast<struct TXArgs*>(args_);
	Sender * tx = args->tx;

	int times = 3;
	for(int i = 0;i < times;i ++) {
		uint64_t val = (uint64_t)(tx->get_id()) + 1;
		for(int rid = 0; rid < NNODE; rid ++) {
			tx->signal(rid, val);
			LOG(INFO) << tx->get_id() << " -> " << rid << " val " << val;
		}
	}

	return NULL;
}


int main(int argc, char **argv) {

	Receiver * rxs[NNODE];
	Sender *txs[NNODE];

	for(int i = 0;i < NNODE; i ++) {
		rxs[i] = new Receiver(NNODE, i);
	}

	for(int i = 0;i < NNODE;i ++) {
		txs[i] = new Sender(NNODE, i, rxs);
	}

	// start listeners
	for(int i = 0;i < NNODE;i ++) {
		rxs[i]->start_listener();
	}


	// start senders
	pthread_t senders[NNODE];
	for(int i = 0;i < NNODE;i ++) {
		struct TXArgs * sarg = (struct TXArgs*) malloc(sizeof(TXArgs));
		sarg->tx = txs[i];
		pthread_create(&(senders[i]), NULL, tx_fn, (void*) sarg);
	}

	// join threads
	for(int i = 0;i < NNODE;i ++) {
		pthread_join(senders[i], NULL);
	}

	for(int i = 0;i < NNODE;i ++) {
		delete rxs[i];
		rxs[i] = nullptr;
	}

	return 0;
}