4

我目前正在测试我的网络代码。这涉及通过 IPv4 环回地址 (127.0.0.1) 建立连接。不幸的是,程序经常(并非总是)在发送数据时给出 EPIPE 错误。

我正在使用伯克利网络套接字和 libevent。我通过以下方式制作了一个非阻塞套接字:

CBSocketReturn CBNewSocket(uint64_t * socketID,bool IPv6){
    *socketID = socket(IPv6 ? PF_INET6 : PF_INET, SOCK_STREAM, 0);
    if (*socketID == -1) {
        if (errno == EAFNOSUPPORT || errno == EPROTONOSUPPORT) {
            return CB_SOCKET_NO_SUPPORT;
        }
        return CB_SOCKET_BAD;
    }
    // Stop SIGPIPE annoying us.
    if (CB_NOSIGPIPE) {
        int i = 1;
        setsockopt(*socketID, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i));
    }
    // Make socket non-blocking
    evutil_make_socket_nonblocking((evutil_socket_t)*socketID);
    return CB_SOCKET_OK;
}

我通过以下方式进行连接事件:

bool CBSocketDidConnectEvent(uint64_t * eventID,uint64_t loopID,uint64_t socketID,void (*onDidConnect)(void *,void *),void * node){
    CBEvent * event = malloc(sizeof(*event));
    event->loop = (CBEventLoop *)loopID;
    event->onEvent.ptr = onDidConnect;
    event->node = node;
    event->event = event_new(((CBEventLoop *)loopID)->base, (evutil_socket_t)socketID, EV_TIMEOUT|EV_WRITE, CBDidConnect, event);
    if (NOT event->event) {
        free(event);
        event = 0;
    }
    *eventID = (uint64_t)event;
    return event;
}
void CBDidConnect(evutil_socket_t socketID,short eventNum,void * arg){
    CBEvent * event = arg;
    if (eventNum & EV_TIMEOUT) {
        // Timeout for the connection
        event->loop->onTimeOut(event->loop->communicator,event->node,CB_TIMEOUT_CONNECT);
    }else{
        // Connection successful
        event->onEvent.ptr(event->loop->communicator,event->node);
    }
}

并通过以下方式添加:

bool CBSocketAddEvent(uint64_t eventID,uint16_t timeout){
    CBEvent * event = (CBEvent *)eventID;
    int res;
    if (timeout) {
        struct timeval time = {timeout,0};
        res = event_add(event->event, &time);
    }else
        res = event_add(event->event, NULL);
    return NOT res;
}

连接:

bool CBSocketConnect(uint64_t socketID,uint8_t * IP,bool IPv6,uint16_t port){
    // Create sockaddr_in6 information for a IPv6 address
    int res;
    if (IPv6) {
        struct sockaddr_in6 address;
        memset(&address, 0, sizeof(address)); // Clear structure.
        address.sin6_family = AF_INET6;
        memcpy(&address.sin6_addr, IP, 16); // Move IP address into place.
        address.sin6_port = htons(port); // Port number to network order
        res = connect((evutil_socket_t)socketID, (struct sockaddr *)&address, sizeof(address));
    }else{
        struct sockaddr_in address;
        memset(&address, 0, sizeof(address)); // Clear structure.
        address.sin_family = AF_INET;
        memcpy(&address.sin_addr, IP + 12, 4); // Move IP address into place. Last 4 bytes for IPv4.
        address.sin_port = htons(port); // Port number to network order
        res = connect((evutil_socket_t)socketID, (struct sockaddr *)&address, sizeof(address));
    }
    if (NOT res || errno == EINPROGRESS)
        return true;
    return false;
}

连接后,会发生 canSend 事件:

bool CBSocketCanSendEvent(uint64_t * eventID,uint64_t loopID,uint64_t socketID,void (*onCanSend)(void *,void *),void * node){
    CBEvent * event = malloc(sizeof(*event));
    event->loop = (CBEventLoop *)loopID;
    event->onEvent.ptr = onCanSend;
    event->node = node;
    event->event = event_new(((CBEventLoop *)loopID)->base, (evutil_socket_t)socketID, EV_TIMEOUT|EV_WRITE|EV_PERSIST, CBCanSend, event);
    if (NOT event->event) {
        free(event);
        event = 0;
    }
    *eventID = (uint64_t)event;
    return event;
}
void CBCanSend(evutil_socket_t socketID,short eventNum,void * arg){
    CBEvent * event = arg;
    if (eventNum & EV_TIMEOUT) {
        // Timeout when waiting to write.
        event->loop->onTimeOut(event->loop->communicator,event->node,CB_TIMEOUT_SEND);
    }else{
        // Can send
        event->onEvent.ptr(event->loop->communicator,event->node);
    }
}

但是发送经常会出现 EPIPE 错误:

int32_t CBSocketSend(uint64_t socketID,uint8_t * data,uint32_t len){
    ssize_t res = send((evutil_socket_t)socketID, data, len, CB_SEND_FLAGS);
    printf("SENT (%li): ",res);
    for (uint32_t x = 0; x < res; x++) {
        printf("%c",data[x]);
    }
    printf("\n");
    if (res >= 0)
        return (int32_t)res;
    if (errno == EAGAIN)
        return 0; // False event. Wait again.
    return CB_SOCKET_FAILURE; // Failure
}

它登陆return CB_SOCKET_FAILURE;并且 errno 设置为 EPIPE。现在为什么会这样?如果设置了发送标志,则它只是 MSG_NOSIGNAL,因为 SIGPIPE 一直以这个错误中断程序。我希望 EPIPE 导致 CBSocketSend 返回 CB_SOCKET_FAILURE 并且不中断程序,但是 EPIPE 没有原因导致发送失败,那么为什么要这样做呢?

上次我收到错误时,我注意到连接的线程仍在 connect() 调用中。使连接事件由单独的线程而不是连接的线程处理是否有危险?

在这些地方查看网络代码:

https://github.com/MatthewLM/cbitcoin/blob/master/test/testCBNetworkCommunicator.c https://github.com/MatthewLM/cbitcoin/tree/master/src/structures/CBObject/CBNetworkCommunicator https://github。 com/MatthewLM/cbitcoin/tree/master/dependencies/sockets

谢谢你。

编辑:我再次运行它,在 connect() 完成后出现错误。

编辑 2:似乎连接事件是在没有对方接受的情况下给出的。

4

3 回答 3

2

我不是 TCP/IP 专家,但我确实注意到,EPIPE即使 MSG_NOSIGNAL 设置为“面向流的套接字”,该文档仍然可以返回。看起来您正在使用SOCK_STREAM. 另一端可能正在断开连接。

CBSocketConnect()看起来如果你得到你EINPROGRESS只是返回true - 如果它连接成功,你也会返回。您将无法知道是否需要等待连接完成。根据这个你可以select()poll()为连接完成。


以上是我在@MatthewMitchell 和@user315052 的要求下对OP 的评论的转贴。


编辑: 我正在添加对此答案的更详细描述,以及随后的一些讨论。

所以,首先尝试做connect(). 然后,如果EINPROGRESS是错误结果,则从 注册一个写事件唤醒libevent。进入回调函数后,使用级别的套接字选项EV_WRITE检查连接的状态。如果返回的选项值为,则连接成功。否则,将其视为一个数字。getsockopt()SO_ERRORSOL_SOCKET0errno

按照此答案中的说明遵循此建议后,您发现客户端遇到了错误ECONNREFUSED。这解释了为什么您的写入失败了EPIPE. 调查你的服务器后,你发现服务器因为报错而无法监听绑定的地址EADDRINUSESO_REUSEADDR这可以通过在监听套接字上设置选项来处理。

于 2012-08-22T18:46:15.477 回答
1

下面是一个简单的libevent玩具程序,它合成EINPROGRESS,然后通过等待等待连接完成EV_WRITE。基本上,这个程序表明,在您的应用程序中,您应该首先尝试执行connect调用,如果它失败并显示EINPROGRESS,您应该在执行 I/O 之前等待完成。

这是libevent回调函数:

extern "C" void on_connect (int sock, short ev, void *arg) {
    assert(ev == EV_WRITE);
    std::cout << "got wrieable on: " << sock << '\n';
    int optval = -1;
    socklen_t optlen = sizeof(optval);
    getsockopt(sock, SOL_SOCKET, SO_ERROR, &optval, &optlen);
    assert(optval == 0);
    std::cout << "succesful asynchronous connect on: " << sock << '\n';
    event_loopbreak();
}

这些是玩具应用程序使用的一些辅助函数:

static void init_addr (struct sockaddr_in *addr, short port) {
    memset(addr, '\0', sizeof(*addr));
    addr->sin_family = AF_INET;
    addr->sin_port = htons(port);
    addr->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
}

static void setup_accept (int sock) {
    const int one = 1;
    struct sockaddr_in addr;
    init_addr(&addr, 9876);
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    bind(sock, (struct sockaddr *)&addr, sizeof(addr));
    listen(sock, 1);
}

static int complete_accept (int sock) {
    struct sockaddr_in addr;
    socklen_t addrlen = sizeof(addr);
    return accept(sock, (struct sockaddr *)&addr, &addrlen);
}

static int try_connect (int sock) {
    struct sockaddr_in addr;
    init_addr(&addr, 9876);
    return connect(sock, (struct sockaddr *)&addr, sizeof(addr));
}

main程序如下:

int main () {
    int accept_sock = socket(PF_INET, SOCK_STREAM, 0);
    setup_accept(accept_sock);
    int sock = socket(PF_INET, SOCK_STREAM, 0);
    fcntl(sock, F_SETFL, fcntl(sock, F_GETFL) | O_NONBLOCK);
    std::cout << "trying first connect on: " << sock << '\n';
    int r = try_connect(sock);
    assert(r < 0 && errno == EINPROGRESS);
    event_init();
    struct event ev_connect;
    event_set(&ev_connect, sock, EV_WRITE, on_connect, 0);
    event_add(&ev_connect, 0);
    int new_sock = complete_accept(accept_sock);
    event_dispatch();
    return 0;
}
于 2012-08-22T08:14:45.517 回答
0

从你的进程被唤醒处理连接成功的那一刻起,直到它试图写入套接字的那一刻,连接的状态仍然可以在操作系统的内核角度发生变化,而 libevent 无法预见它。

假设您要连接的服务器以我将要描述的方式运行,您所描述的场景可以由以下阶段组成。给定进程 A(您的客户端)和进程 B(连接的另一端):

  1. B 运行,绑定服务器套接字,等待。
  2. A 运行, connect(), 等待
  3. B醒来,做accept()
  4. A 唤醒以处理连接成功。
  5. B 关闭套接字(由于进程终止或显式close())。
  6. A 尝试发送,得到errno == EPIPE

这可以在环回上重现。

顺便说一句,SO_NOSIGPIPE不是便携式插座选项。如果您正在编写一个可移植的 C 库,最好忽略使用signal()with的信号SIG_IGN

于 2012-08-20T16:01:34.597 回答