0

我正在将 curl 集成到基于 kqueue 的异步 I/O 事件循环中。

libcurl 有一个优秀的 API 用于集成到应用程序事件循环中。

您为 libcurl 提供了两个回调,一个用于设置计时器(用于限制请求/连接时间),另一个用于为读/写/错误事件注册 libcurl 的文件描述符。

用于执行 FD 注册的回调的文档在这里:CURLMOPT_SOCKETFUNCTION

通知回调 libcurl 感兴趣的事件的参数有四个枚举值:

CURL_POLL_IN

Wait for incoming data. For the socket to become readable.

CURL_POLL_OUT

Wait for outgoing data. For the socket to become writable.

CURL_POLL_INOUT

Wait for incoming and outgoing data. For the socket to become readable or writable.

CURL_POLL_REMOVE

The specified socket/file descriptor is no longer used by libcurl.

尽管没有明确记录,libcurl 期望在后续调用回调时,更新事件循环的过滤器状态以匹配它传递的内容。即,如果在第一次调用时它通过了CURL_POLL_INEVFILT_READ),而在随后的调用中它通过了CURL_POLL_OUTEVFILT_WRITE),那么原始EVFILT_READ过滤器将被删除。

我更新了 FD 注册码来处理这个问题。

int fr_event_fd_insert(fr_event_list_t *el, int fd,
               fr_event_fd_handler_t read,
               fr_event_fd_handler_t write,
               fr_event_fd_handler_t error,
               void *ctx)
{
    int       filter = 0;
    struct kevent evset[2];
    struct kevent *ev_p = evset;
    fr_event_fd_t *ef, find;

    if (!el) {
        fr_strerror_printf("Invalid argument: NULL event list");
        return -1;
    }

    if (!read && !write) {
        fr_strerror_printf("Invalid arguments: NULL read and write callbacks");
        return -1;
    }

    if (fd < 0) {
        fr_strerror_printf("Invalid arguments: Bad FD %i", fd);
        return -1;
    }

    if (el->exit) {
        fr_strerror_printf("Event loop exiting");
        return -1;
    }

    memset(&find, 0, sizeof(find));

    /*
     *  Get the existing fr_event_fd_t if it exists.
     */
    find.fd = fd;
    ef = rbtree_finddata(el->fds, &find);
    if (!ef) {
        ef = talloc_zero(el, fr_event_fd_t);
        if (!ef) {
            fr_strerror_printf("Out of memory");
            return -1;
        }
        talloc_set_destructor(ef, _fr_event_fd_free);
        el->num_fds++;
        ef->fd = fd;
        rbtree_insert(el->fds, ef);
    /*
     *  Existing filters will be overwritten if there's
     *  a new filter which takes their place.  If there
     *  is no new filter however, we need to delete the
     *  existing one.
     */
    } else {
        if (ef->read && !read) filter |= EVFILT_READ;
        if (ef->write && !write) filter |= EVFILT_WRITE;

        if (filter) {
            EV_SET(ev_p++, ef->fd, filter, EV_DELETE, 0, 0, 0);
            filter = 0;
        }

        /*
         *  I/O handler may delete an event, then
         *  re-add it.  To avoid deleting modified
         *  events we unset the do_delete flag.
         */
        ef->do_delete = false;
    }

    ef->ctx = ctx;

    if (read) {
        ef->read = read;
        filter |= EVFILT_READ;
    }

    if (write) {
        ef->write = write;
        filter |= EVFILT_WRITE;
    }
    ef->error = error;

    EV_SET(ev_p++, fd, filter, EV_ADD | EV_ENABLE, 0, 0, ef);
    if (kevent(el->kq, evset, ev_p - evset, NULL, 0, NULL) < 0) {
        fr_strerror_printf("Failed inserting event for FD %i: %s", fd, fr_syserror(errno));
        talloc_free(ef);
        return -1;
    }
    ef->is_registered = true;

    return 0;
 }

不幸的是,它不起作用。kevent 似乎没有删除旧的过滤器(我们继续收到他们的通知)。

更奇怪的是,如果我在两个单独的调用中应用这两个操作,它工作得很好。

if (filter) {
    EV_SET(&evset, ef->fd, filter, EV_DELETE, 0, 0, 0);
    kevent(el->kq, evset, ev_p - evset, NULL, 0, NULL);
    filter = 0;
}

这是 Sierra 的 kevent 实现中的错误,还是我误解了 kevent 应该如何工作?

4

1 回答 1

0

这里的问题是您不能将EVFILT_READandEVFILT_WRITE标志“或”在一起。

启用或禁用多个过滤器时,您需要在多个结构EV_SET()上多次调用。evset

上例中的非功能代码:

struct kevent evset[2];
struct kevent *ev_p = evset;

if (read) {
    ef->read = read;
    filter |= EVFILT_READ;
}

if (write) {
    ef->write = write;
    filter |= EVFILT_WRITE;
}
ef->error = error;

EV_SET(ev_p++, fd, filter, EV_ADD | EV_ENABLE, 0, 0, ef);
event(el->kq, evset, ev_p - evset, NULL, 0, NULL)

变成:

int count = 0;
struct ev_set[2];

if (read) {
    ef->read = read;
    EV_SET(ev_set[count++], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ef);
}

if (write) {
    ef->write = write;
    EV_SET(ev_set[count++], fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ef);
}
ef->error = error;
kevent(el->kq, ev_set, count, NULL, 0, NULL)

进行此更改后,一切都按预期工作。

于 2017-07-07T23:54:06.260 回答