0

我正在开发一个 UDP 客户端/服务器应用程序,并且我的发送命令的应用程序运行良好 - 我可以监控通过 nc 和 hexdump 发送到端口的内容,它们完美解码。

在我应该接收命令的应用程序上,我正在使用带有 MSG_DONTWAIT 标志的 recvfrom。我这样做是因为我还需要检查队列以查找要发送的内容,因此仅将其保留为阻塞状态不是一种选择。如果我删除 MSG_DONTWAIT 标志,则会正确接收和处理消息,但它会阻止等待,这对我的应用程序不起作用。当使用 MSG_DONTWAIT 时,它总是返回 -1 并将 errno 设置为 EAGAIN。虽然在没有发送任何内容时会出现这种情况,但它根本不会收到任何内容。我认为它会返回 EAGAIN 直到有东西可用,但情况似乎并非如此。相关代码发布在下面 - 我错过了什么?

uint8_t Receiver::Setup(uint16_t rx_port, uint16_t tx_port)
{

    std::stringstream ss;
    ss << "UDP session manager, setup ports.";
    Logger::Info(ss.str());

    tx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
    rx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);

    if (rx_socket_fd < 0)
    {
        Logger::Error("Could not open an rx UDP socket!");
    }
    else
    {
        std::cout << "rx_socket_fd is " << rx_socket_fd << "\n";
    }
    if (tx_socket_fd < 0)
    {
        Logger::Error("Could not open an tx UDP socket!");
    }
    else
    {
        std::cout << "tx_socket_fd is " << tx_socket_fd << "\n";
    }


    int reuse = 1;
    if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
        Logger::Warn("Could not set socket reuse!");

    #ifdef SO_REUSEPORT
    reuse = 1;
        if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
            Logger::Warn("setsockopt(SO_REUSEPORT) failed");
    #endif

    reuse = 1;
    if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
        Logger::Warn("Could not set socket reuse!");

    #ifdef SO_REUSEPORT
    reuse = 1;
        if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
            Logger::Warn("setsockopt(SO_REUSEPORT) failed");
    #endif

    memset(&tx_sockaddr, 0, sizeof(tx_sockaddr));
    memset(&rx_sockaddr, 0, sizeof(rx_sockaddr));

    tx_sockaddr.sin_family = AF_INET;
    tx_sockaddr.sin_addr.s_addr = INADDR_ANY;
    tx_sockaddr.sin_port = htons(tx_port);

    rx_sockaddr.sin_family = AF_INET;
    rx_sockaddr.sin_addr.s_addr = INADDR_ANY;
    rx_sockaddr.sin_port = htons(rx_port);

    int rva = 0;

    rva = bind(tx_socket_fd, (const struct sockaddr *) &tx_sockaddr, sizeof(tx_sockaddr) );

    if (rva < 0)
    {
        std::stringstream ss;
        ss << "UDP SessionManager: Could not bind to tx socket (bind returned error code " << rva << ", errno is " << errno << ")";
        Logger::Error(ss.str());
    }

    rva = bind(rx_socket_fd, (const struct sockaddr *) &rx_sockaddr, sizeof(rx_sockaddr) );

    if (rva < 0)
    {
        std::stringstream ss;
        ss << "UDP SessionManager: Could not bind to rx socket (bind returned error code " << rva << ", errno is " << errno << ")";
        Logger::Error(ss.str());
    }

    return NO_ERROR;
}


uint8_t Receiver::SendTelemetry(const TelemetryBase * telemetry)
{
    const uint8_t * bytes = EncodeTelemetryToSend(telemetry);

    if (bytes == NULL)
    {
        Logger::Error("Receiver: Something went wrong trying to encode the telemetry.");
        return 1;
    }

    const UDPHeader * header = (const UDPHeader * ) bytes;
    uint16_t numBytes = header->length;

    std::stringstream ss;
    ss << "Receiver::SendTelemetry - bytesToWrite is " << numBytes << "\n";
    Logger::Info(ss.str());

    int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );

    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    if (rva == -1  && errno == EINVAL)
    {
        ss.clear();
        ss << "invalid argument!";
        Logger::Warn(ss.str());
    }
    else if (rva < 0)
    {
        ss.clear();

        ss << "Failed to write to the UDP port, errno is " << errno;

        Logger::Warn(ss.str());
        return 1;
    }

    delete bytes;

    return 0;
}



uint8_t Receiver::SendCommand(const CommandBase * command)
{
    const uint8_t * bytes = EncodeCommandToSend(command);

    if (bytes == NULL)
    {
        Logger::Error("Receiver: Something went wrong trying to encode the message.");
        return 1;
    }

    const UDPHeader * header = (const UDPHeader * ) bytes;
    uint16_t numBytes = header->length;

    std::stringstream ss;
    ss << "Receiver::SendCommand - bytesToWrite is " << numBytes << "\n";
    Logger::Info(ss.str());

    int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );

    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    if (rva < 0)
    {
        ss.clear();

        ss << "Failed to write to the UDP port, errno is " << errno;

        Logger::Warn(ss.str());
        return 1;
    }

    delete bytes;

    return 0;
}

uint8_t Receiver::Receive()
{
    uint8_t inputBuffer[UDP_BUFFER_BYTES];
    memset(inputBuffer, '\0', UDP_BUFFER_BYTES);

    int totalBytesRead = 0;

    //socklen_t addressLength = sizeof(rx_sockaddr);
    struct sockaddr_in sender;
    socklen_t len;

    totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                          MSG_DONTWAIT, (struct sockaddr *)  &sender, &len );

    if ( totalBytesRead >= 0 )
    {
        std::stringstream ss;
        ss << "UDP port read " << totalBytesRead << " bytes";
        Logger::Info(ss.str() );

        const CommandBase * command = DecodeReceivedCommand(inputBuffer);

        if (command == NULL)
        {
            Logger::Warn("Failed to decode received command from commanding app.");
            return UDP_ERROR_DECODE_FAILED;
        }

        EnqueCommand(command);

    }
    else
    {
        std::stringstream ss;
        ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;
        Logger::Debug(ss.str());
    }

    return UDP_ERROR_NO_ERROR;
}



void Receiver::ProcessingLoopThread()
{
    while ( GetState() == STATE_RUN )
    {
        const TelemetryBase * telemetry = DequeTelemetry();

        while (telemetry != NULL)
        {
            std::stringstream ss;
            ss << "Receiver sending telemetry with ID: " << telemetry->GetTelemetryID();
            Logger::Debug(ss.str());

            SendTelemetry(telemetry);
            delete telemetry;
            telemetry = DequeTelemetry();
        }

        Receive();

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}
4

3 回答 3

1

一些东西:

这可能不是问题,但我鼓励您在任何其他代码有机会运行并清除 errno 之前捕获 errno。而不是这个:

    std::stringstream ss;
    ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;

更好的:

totalBytesRead = recvfrom(rx_socket_fd,...
int lasterror = errno; // catch errno before anything else can change it


. . .
ss << "UDP port rva = " << totalBytesRead << ", errno is " << lasterror;

回到你原来的问题。

我猜您在使用非阻塞 MSG_DONTWAIT 标志时需要多次轮询套接字。

看起来您的主循环在每次轮询套接字之间休眠了 10 毫秒。如果那是您的设计,那么只需执行以下操作:

创建套接字时,在其上设置 10 毫秒超时:

timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10 * 1000; // 10 milliseconds
setsockopt(rx_socket_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));

然后只需MSG_DONTWAIT从 recvfrom 调用中删除标志。

此外,删除主循环中的 sleep 语句:

 std::this_thread::sleep_for(std::chrono::milliseconds(10));

然后优雅地处理超时错误作为可能发生的良性事情

totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                      0, (struct sockaddr *)  &sender, &len );


if (totalBytesRead >= 0 )
{
    // data available - handle it
}
else
{
    if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
    {
        // socket timed out after waiting 10 milliseconds
    }
    else
    {
        // actual socket error
    }
}
于 2018-12-29T01:23:16.393 回答
1
struct sockaddr_in sender;
socklen_t len;

totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                      MSG_DONTWAIT, (struct sockaddr *)  &sender, &len );

您没有分配len合理的值。如果您不初始化len为套接字地址的大小,则调用可能会失败。

还有,你抓得errno太晚了。您必须在收到错误的调用后立即捕获它。否则,其他中间操作可以改变它的值。所以你不能依赖获得一个合理的价值。

您使用单独的套接字进行发送和接收非常奇怪。如果您向同一个端点发送和接收,您应该只使用一个套接字。

于 2018-12-29T03:36:08.530 回答
-1

MSG_DONTWAIT(自 Linux 2.2 起)启用非阻塞操作;如果操作将阻塞,则调用失败并显示错误 EAGAIN 或 EWOULDBLOCK(这也可以使用带有 F_SETFL fcntl(2) 的 O_NONBLOCK 标志来启用)。

于 2021-07-21T09:33:03.760 回答