我正在将 curl 集成到基于 kqueue 的异步 I/O 事件循环中。
libcurl 有一个优秀的 API 用于集成到应用程序事件循环中。
您为 libcurl 提供了两个回调,一个用于设置计时器(用于限制请求/连接时间),另一个用于为读/写/错误事件注册 libcurl 的文件描述符。
用于执行 FD 注册的回调的文档在这里:CURLMOPT_SOCKETFUNCTION
通知回调 libcurl 感兴趣的事件的参数有四个枚举值:
CURL_POLL_IN
Wait for incoming data. For the socket to become readable.
CURL_POLL_OUT
Wait for outgoing data. For the socket to become writable.
CURL_POLL_INOUT
Wait for incoming and outgoing data. For the socket to become readable or writable.
CURL_POLL_REMOVE
The specified socket/file descriptor is no longer used by libcurl.
尽管没有明确记录,libcurl 期望在后续调用回调时,更新事件循环的过滤器状态以匹配它传递的内容。即,如果在第一次调用时它通过了CURL_POLL_IN
(EVFILT_READ
),而在随后的调用中它通过了CURL_POLL_OUT
(EVFILT_WRITE
),那么原始EVFILT_READ
过滤器将被删除。
我更新了 FD 注册码来处理这个问题。
int fr_event_fd_insert(fr_event_list_t *el, int fd,
fr_event_fd_handler_t read,
fr_event_fd_handler_t write,
fr_event_fd_handler_t error,
void *ctx)
{
int filter = 0;
struct kevent evset[2];
struct kevent *ev_p = evset;
fr_event_fd_t *ef, find;
if (!el) {
fr_strerror_printf("Invalid argument: NULL event list");
return -1;
}
if (!read && !write) {
fr_strerror_printf("Invalid arguments: NULL read and write callbacks");
return -1;
}
if (fd < 0) {
fr_strerror_printf("Invalid arguments: Bad FD %i", fd);
return -1;
}
if (el->exit) {
fr_strerror_printf("Event loop exiting");
return -1;
}
memset(&find, 0, sizeof(find));
/*
* Get the existing fr_event_fd_t if it exists.
*/
find.fd = fd;
ef = rbtree_finddata(el->fds, &find);
if (!ef) {
ef = talloc_zero(el, fr_event_fd_t);
if (!ef) {
fr_strerror_printf("Out of memory");
return -1;
}
talloc_set_destructor(ef, _fr_event_fd_free);
el->num_fds++;
ef->fd = fd;
rbtree_insert(el->fds, ef);
/*
* Existing filters will be overwritten if there's
* a new filter which takes their place. If there
* is no new filter however, we need to delete the
* existing one.
*/
} else {
if (ef->read && !read) filter |= EVFILT_READ;
if (ef->write && !write) filter |= EVFILT_WRITE;
if (filter) {
EV_SET(ev_p++, ef->fd, filter, EV_DELETE, 0, 0, 0);
filter = 0;
}
/*
* I/O handler may delete an event, then
* re-add it. To avoid deleting modified
* events we unset the do_delete flag.
*/
ef->do_delete = false;
}
ef->ctx = ctx;
if (read) {
ef->read = read;
filter |= EVFILT_READ;
}
if (write) {
ef->write = write;
filter |= EVFILT_WRITE;
}
ef->error = error;
EV_SET(ev_p++, fd, filter, EV_ADD | EV_ENABLE, 0, 0, ef);
if (kevent(el->kq, evset, ev_p - evset, NULL, 0, NULL) < 0) {
fr_strerror_printf("Failed inserting event for FD %i: %s", fd, fr_syserror(errno));
talloc_free(ef);
return -1;
}
ef->is_registered = true;
return 0;
}
不幸的是,它不起作用。kevent 似乎没有删除旧的过滤器(我们继续收到他们的通知)。
更奇怪的是,如果我在两个单独的调用中应用这两个操作,它工作得很好。
if (filter) {
EV_SET(&evset, ef->fd, filter, EV_DELETE, 0, 0, 0);
kevent(el->kq, evset, ev_p - evset, NULL, 0, NULL);
filter = 0;
}
这是 Sierra 的 kevent 实现中的错误,还是我误解了 kevent 应该如何工作?