Implementing Event Polling in Multi-Thread Environment with eventfd and epoll
Poll/Notify Thread Synchronization Model in Multi-Thread Environment
In most cases, we need to implement thread synchronization mechanism in the following manner: There are W Workers, and each worker communicates with each other through message passing. There is a Coordinator in each Worker, which is in charge of message sending and dispatching. Specifically, Sender and Receiver in Coordinator are for message sending and receiving. When one message arrived, Receiver should trigger an event according to the content of this message, and pass this event to Worker to do some user defined event handling.
Each Receiver maintains a channel(message buffer) for each Sender. There are W Workers, and there should be W channels for each Receiver. In the Receiver side, a listening-thread is needed to poll on these channels. Mostly, we do not want to waste CPU cycles on the listening-thread. We need a Poll/Notify mechanism to wake up the listening thread in Receiver when a channel is ready to read. I.E, when there is no channel available, the listening-thread should be blocked.
This model is useful in many cases, such as user-space network protocol implementation, and message passing between NUMA nodes(like this paper :GraM: Scaling Graph Computation to the Trillions does).
conditional variable V.S. eventfd V.S. sockets
pthread
provides conditional wait
mechanism. Thread A wait for some event, and thread B can notify the threads that waiting on the event. This is the Wait/Notify thread synchronization. One thread wait and the other thread notify. The limitation of conditional wait
is one thread can not wait on multiple condition variables.
eventfd
is introduced in Linux kernel after version 2.6.22.
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
eventfd() creates an “eventfd object” that can be used as an event wait/notify mechanism by user-space applications, and by the kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (uint64_t) counter that is maintained by the kernel. This counter is initialized with the value specified in the argument initval.
eventfd
will create an file descriptor for event notification. The file descriptor can be used in epoll/read/write
calls.
More information can be found in the man page.
Socket
is used for communication in distributed environment, which would bring the overhead of network communication stack in our case.
Poll/Notify Implementation with eventfd and epoll
The Poll/Notify mechanism can be implemented with eventfd and epoll. For the Receiver, it only needs to create an eventfd for each channel, and add them to epoll file descriptor. The listening thread epoll_wait
on the epoll file descriptor.
These codes can be found in Github
The Coordinator Implmentation :
//
// Created by pgplus1628 on 2/16/16.
//
#pragma once
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <glog/logging.h>
#include <errno.h>
#include <stdint.h>
#define EPOLL_WAIT_TIMEOUT (1000) // 1.0 second
class Receiver;
struct RXArgs {
Receiver * rx;
};
class Receiver {
private :
int kclient_;
int id_;
public :
int * chan_fds; // event fds of this channel
pthread_t listener;
Receiver(int kclients, int id) : kclient_(kclients), id_(id) {
chan_fds = new int [kclient_];
for(int i = 0;i < kclient_;i ++) {
chan_fds[i] = eventfd(0, EFD_NONBLOCK);
CHECK_NE(chan_fds[i], -1) << " Create eventfd for channel " << i << " failed.";
}
}
// TODO
void register_read_handler() { }
void listen() {
int ret;
// create epoll event
struct epoll_event events[kclient_];
int ep_fd = epoll_create(kclient_);
CHECK_GE(ep_fd, 0) << "RX::epoll_create failed.";
for(int i = 0;i < kclient_;i ++) {
struct epoll_event read_event;
int efd = chan_fds[i];
read_event.events = EPOLLHUP | EPOLLERR | EPOLLIN | EPOLLET;
read_event.data.u32 = i; // store index
ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, efd, &read_event);
CHECK_GE(ret, 0) << "RX " << id_ << " ::epoll_ctl failed.";
}
LOG(INFO) << "RX " << id_ << " ::begin listening.";
// while loop
uint64_t val;
while(1) {
bool ok = true;
int nfds = epoll_wait(ep_fd, &events[0], kclient_, EPOLL_WAIT_TIMEOUT);
if (nfds > 0) {
// handle each ready fd.
for(int i = 0;i < nfds;i ++) {
int cid = static_cast<int>(events[i].data.u32);
if ( events[i].events & EPOLLHUP) {
LOG(ERROR) << "RX " << id_<< " ::epoll eventfd has epoll hup for " << cid;
ok = false;
} else if (events[i].events & EPOLLERR) {
LOG(ERROR) << "RX " << id_ << " ::epoll eventfd has epoll error for " << cid;
ok = false;
} else if (events[i].events & EPOLLIN) {
int event_fd = chan_fds[cid];
ret = read(event_fd, &val, sizeof(val));
CHECK_GE(ret, 0) << "RX" << id_ << " ::read event_fd failed.";
// TODO read handler
LOG(INFO) << "RX" << id_ << " ::read from " << cid << " val " << val << " with size " << ret << " bytes.";
}
if (!ok) { break; }
}
if (!ok) { break; }
} else if (nfds == 0) {
LOG(INFO) << "RX " << id_ << " ::epoll wait time out.";
} else {
LOG(ERROR) << "RX " << id_ << " ::epoll wait error.";
}
}
LOG(INFO) << "RX " << id_ << " ::exit listen.";
}
static void * listen_fn(void * args_) {
struct RXArgs * args = static_cast<struct RXArgs*>(args_);
Receiver * rx = args->rx;
rx->listen();
free(args);
pthread_exit(NULL);
}
void start_listener() {
struct RXArgs * args = (struct RXArgs*)malloc(sizeof(struct RXArgs));
args->rx = this;
pthread_create(&listener, NULL, Receiver::listen_fn, (void*)args);
}
int get_id() { return id_; }
~Receiver() {
// join listener
pthread_join(listener, NULL);
// close fds
for(int i = 0;i < kclient_; i ++) {
if(chan_fds[i] >= 0) { close(chan_fds[i]); chan_fds[i] = -1;}
}
// free
delete chan_fds;
}
};
class Sender {
private :
int kserver_;
int id_;
public :
int * chan_fds; // channel eventfds, get from Receiver
Sender(int kserver, int id, Receiver ** servers) : kserver_(kserver), id_(id){
chan_fds = new int [kserver_];
for(int i = 0;i < kserver;i ++) {
chan_fds[i] = servers[i]->chan_fds[id_];
}
}
void signal(int serv_id, uint64_t val) {
ssize_t ret = 0;
do {
ret = write(chan_fds[serv_id], &val, sizeof(val));
} while(ret < 0 && errno == EAGAIN);
CHECK_GE(ret, 0) << "TX " << id_ << " -> " << serv_id << " write fd error : " << strerror(errno);
}
int get_id() { return id_; }
};
Main.cpp
#include "coordinator.hpp"
#include <pthread.h>
#include <glog/logging.h>
#define NNODE (8)
struct TXArgs {
Sender * tx;
};
void * tx_fn(void *args_) {
struct TXArgs * args = static_cast<struct TXArgs*>(args_);
Sender * tx = args->tx;
int times = 3;
for(int i = 0;i < times;i ++) {
uint64_t val = (uint64_t)(tx->get_id()) + 1;
for(int rid = 0; rid < NNODE; rid ++) {
tx->signal(rid, val);
LOG(INFO) << tx->get_id() << " -> " << rid << " val " << val;
}
}
return NULL;
}
int main(int argc, char **argv) {
Receiver * rxs[NNODE];
Sender *txs[NNODE];
for(int i = 0;i < NNODE; i ++) {
rxs[i] = new Receiver(NNODE, i);
}
for(int i = 0;i < NNODE;i ++) {
txs[i] = new Sender(NNODE, i, rxs);
}
// start listeners
for(int i = 0;i < NNODE;i ++) {
rxs[i]->start_listener();
}
// start senders
pthread_t senders[NNODE];
for(int i = 0;i < NNODE;i ++) {
struct TXArgs * sarg = (struct TXArgs*) malloc(sizeof(TXArgs));
sarg->tx = txs[i];
pthread_create(&(senders[i]), NULL, tx_fn, (void*) sarg);
}
// join threads
for(int i = 0;i < NNODE;i ++) {
pthread_join(senders[i], NULL);
}
for(int i = 0;i < NNODE;i ++) {
delete rxs[i];
rxs[i] = nullptr;
}
return 0;
}