What happens when you put a epoll fd (File descriptor) inside another epoll fd? Just what you’d expect: Any event in the epoll fd inside the epoll fd will trigger the inner epoll fd and the outer epoll fd. This might not sound like much but it fixed a lot of bad patters in my API design.
Note that although only epoll is mentioned here, I believe the same handling can be extended to poll also.
Before I realized you don’t have to have only a single epoll fd for a process or thread, whenever an fd needed to be added to the event loop, it was added with callbacks. Imagine managing someone, but every time the underling had to contact an outside party you’d have to be interrupted to inform the underling whenever the outside party had a reply ready. This would cause API internal logic to spill over to the caller side.
struct module_initialization_options
{
...
int (*register_fd)(void *context, int fd, uint32_t events);
int (*unregister_fd)(void *context, int fd);
...
};
In addition to file descriptors spilling over, there’s added error handling on the callee side for the rare cases when these fail. In addition the registered fd might differ on what should be called when it’s active. It might be different on a global timer, a client connection or file system. Although rarely should one component encompass many interfaces.
Adding timers for reconnects or maintenance walks was painful as yet more callbacks were reguested of the callee for setting and unsetting a timer.
Instead if the API creates its own epoll fd, it can register any of it’s own components to it and abstract away all fd worries, not bothering the callee with its internals. Lets take an example of an API sending messages to an arbitrary number of end-points. For the sake of brevity the number of end-points is determined on initialization.
include <netinet/in.h>
/** @brief Connection module context */
typedef struct conn_module conn_module_t;
/**
* @brief Initializes a connection module with given end-points.
*
* @param n_end_points Number of end-points.
* @param end_points Array of end-points.
*
* @return == NULL Failure.
* @return != NULL Connection module context. */
conn_module_t *conn_module_create(unsigned int n_end_points,
struct sockaddr_storage *end_points);
/**
* @brief Returns the event fd for the connection module.
*
* @return == Pollable event fd.
*/
int conn_module_efd(conn_module_t *ctx);
/**
* @brief Process available data in connection module
*
* @param ctx Connection module context.
* @param timeout Timeout for processing, see man 2 epoll_wait
*
* @return >= 0 Number of events processed.
* @return >= -1 Internal error.
*/
int conn_module_process(conn_module_t *ctx, int timeout);
/**
* @brief Sends data to all connected peers.
*
* @param ctx Connection module context.
* @param buf Buffer of data to send.
* @param buf_size Length of data to send.
*
* @return Number of peers data was sent to.
*/
int conn_module_send(conn_module_t *ctx, uint8_t *buf, size_t buf_size);
/**
* @brief Deinitalize connection module
*
* @param ctx Connection module context.
*/
void conn_module_destroy(conn_module_t *ctx);
This boils down to the initialization consisting of the main context creation and the per connection creation function:
/** @brief Structure per connection */
struct conn_module_connection
{
int fd; //!< If =! -1, active connection to end-point
unsigned int retry_counter; //!< If > 0, retry counter for reconnect.
struct sockaddr_storage end_point; //!< Address of end-point.
bool connection_complete; //!< TCP connection completed.
};
struct conn_module
{
int efd; //!< Module event fd.
int timer_fd; //!< Timer fd.
unsigned int n_connections; // Number of structs in \p connections.
struct conn_module_connection connections[]; // Array of connections.
};
/** @brief Create socket and connect */
static void conn_module_socket(conn_module_t *ctx,
struct conn_module_connection *conn)
{
conn->fd = socket(conn->end_point.ss_family,
SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (conn->fd != -1)
{
struct epoll_event ev = {.events = EPOLLIN, .data = { .ptr = conn }};
// Add new socked to efd.
if (!epoll_ctl(ctx->efd, EPOLL_CTL_ADD, conn->fd, &ev))
{
// Connect the socket.
int ret = connect(conn->fd, (struct sockaddr *)&conn->end_point,
sizeof(conn->end_point));
if (!ret || errno == EINPROGRESS)
{
// Success.
conn->connection_complete = !ret;
return;
}
epoll_ctl(ctx->efd, EPOLL_CTL_DEL, conn->fd, &ev);
}
close(conn->fd);
conn->fd = -1;
}
}
conn_module_t *conn_module_create(unsigned int n_end_points,
struct sockaddr_storage *end_points)
{
conn_module_t *ctx = calloc(
1, sizeof(*ctx) + sizeof(struct conn_module_connection) * n_end_points);
if (ctx)
{
// Create our internal epoll fd.
ctx->efd = epoll_create1(EPOLL_CLOEXEC);
if (ctx->efd != -1)
{
// Create a timer fd for reconnecting etc.
ctx->timer_fd =
timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (ctx->timer_fd != -1)
{
struct epoll_event ev = {.events = EPOLLIN, .data = {.ptr = ctx}};
// Add timer to efd.
if (!epoll_ctl(ctx->efd, EPOLL_CTL_ADD, ctx->timer_fd, &ev))
{
unsigned int i;
// Copy connection data.
ctx->n_connections = n_end_points;
for (i = 0; i < n_end_points; i++)
{
ctx->connections[i].fd = -1;
memcpy(&ctx->connections[i].end_point, &end_points[i],
sizeof(*end_points));
conn_module_socket(ctx, &ctx->connections[i]);
}
return ctx;
}
close(ctx->timer_fd);
}
close(ctx->efd);
}
free(ctx);
}
return NULL;
}
An arbitrary number of fd are now properly encapsulated in the module itself, instead of being the callee’s problem or even having the same polling mechanism as the callee.
In this case the data added to efd is a pointer. It could also be an fd, if for example the clients were arranged in a search structure by their fd. Lets look at the processing function anyway:
int conn_module_process(conn_module_t *ctx, int timeout)
{
const unsigned int max_events = 128;
struct epoll_event ev[max_events];
int ret;
ret = epoll_wait(ctx->efd, ev, max_events, timeout);
if (ret >= 0)
{
const unsigned int n_events = ret;
unsigned int i;
for (i = 0; i < n_events; i++)
{
if (ev[i].data.ptr == ctx)
{
conn_module_process_timer(ctx);
}
else
{
struct conn_module_connection *conn = ev[i].data.ptr;
conn_module_process_connection(ctx, conn);
}
}
}
else
{
// Error handling.
}
return ret;
}
Now if you suddenly have a submodule for the module that also generated fd events, then this starts to get tricky. You could still register just the main fd for this and if the submodule has a similar timeout mechanism, it isn’t that bad to call it’s processing function with a 0 timeout.
But let’s say there’s other fds that are similar to conn_module_connection, in that they are a multitude with different structure than conn_module_connection. In such a case it’s probably better to start adding the fd and using the search structure to locate it in the process-loop.
I shun away from allocating a dedicated context per fd for epoll. This would mean an extra allocation per fd and would probably lead to memory leaks is some branches when the fd is closed.
Although when not allocating a context per fd insert to epoll, one could just close the fd, I don’t recommend this. See especially man 7 epoll Q6/A6.
Writing the process loop for each module I wrote became quite tiresome fast so now it always boils down to a single function and a function pointer I use to make process as simple as possible:
/**
* @brief Callback invoked on each fd active.
*
* @param context Context given to \p sukat_epoll_wait
* @param events Events in fd.
* @param data Data registered to fd.
*
* @return == 0 Continue to next step.
* @return != 0 Stop and return this value from wait.
*/
typedef int (*sukat_epoll_cb)(void *context, uint32_t events,
union epoll_data *data);
/**
* @brief Waits for epoll events.
*
* @param efd Epoll fd.
* @param cb Callback to invoke per fd.
* @param context Context to pass to callback.
* @param timeout Timeout to wait for events.
*/
int sukat_epoll_wait(int efd, sukat_epoll_cb cb, void *context, int timeout)
{
const unsigned int max_events = 128;
struct epoll_event ev[max_events];
int ret;
ret = epoll_wait(efd, ev, max_events, timeout);
if (ret >= 0)
{
unsigned int n_events = (unsigned int)ret, i;
for (i = 0; i < n_events; i++)
{
ret = cb(context, ev[i].events, &ev[i].data);
if (ret)
{
break;
}
}
}
else
{
ERR("Failed to wait for events on fd %d: %s", efd, strerror(errno));
}
return ret;
}
This is from the sukat_dlts module, but it’s the same everywhere.