7

我在我的应用程序中使用 ZeroMQ 进行网络连接,手册指出,通过将ZMQ_DONTWAIT标志提供给flags参数 insendrecv导致函数不阻塞线程。然而它在我的情况下不起作用:

            std::cout << "a";
            if(ToSend.try_pop(send))
            {
                std::cout << "b";
                local.send(send.data(),send.size(),ZMQ_DONTWAIT);
            }
            std::cout << "c";
            if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT))
                std::cout << "Received: " << (char*)recv.data() << std::endl;
            std::cout << "d" << std::endl;

这打印:

abcdab

我做了一个小课来让事情变得更容易:

客户端类(为简化起见,从所有“未使用”的东西中删除)

class client
{
public:
    client()
    {

    }
    inline bool init(unsigned short threads = 1)
    {
        Running = true;
        context = zmq_init (threads);
        if(context == NULL)
            return false;
        socket = zmq_socket (context, ZMQ_REQ);
        if(socket == NULL)
            return false;
        return true;
    }
    inline int connect(const char * address, unsigned short port)
    {
        return zmq_connect(socket,string_format("tcp://%s:%d",address,port).c_str());
    }
    inline bool send (void *data, size_t len_, int flags_ = 0)
    {
        message_t request (len_);
        memcpy ((void *) request.data (), data, len_);
        int rc = zmq_send (socket, request.data(), request.size(), flags_);
        if (rc >= 0)
            return true;
        if (rc == -1 && zmq_errno () == EAGAIN)
            return false;
        throw error_t ();
    }
    inline bool recv (void * data, size_t len_, int flags_)
    {
        message_t reply(len_);
        int rc = zmq_recv (socket, reply.data(), len_, flags_);
        if (rc >= 0)
        {
            memcpy (data,(void *)reply.data(), reply.size());
            return true;
        }
        if (rc == -1 && zmq_errno () == EAGAIN)return false;
        throw error_t ();
    }
    inline bool IsRunning()
    {
        return Running;
    }
private:
    void * context;
    void * socket;
    std::atomic<bool> Running;
};

这是工作线程:

namespace Data
{
    Concurrency::concurrent_queue <message_t> ToSend;
    void Processor(char * address, unsigned short port, unsigned short threads)
    {
        client local;
        if(!local.init(threads))return;
        if(local.connect(address,port) != 0)return;
        message_t recv(Networking::max_packet);
        message_t send(Networking::max_packet);
        while(local.IsRunning())
        {
            std::cout << "a";
            if(ToSend.try_pop(send))
            {
                std::cout << "b";
                local.send(send.data(),send.size(),ZMQ_DONTWAIT);
            }
            std::cout << "c";
            if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT))
                std::cout << "Received: " << (char*)recv.data() << std::endl;
            std::cout << "d" << std::endl;
        }
    }
};

问题以某种方式存在于此。我只是不知道为什么它不起作用。

这就是我启动工作线程的方式:

int Thread(char * address, unsigned short port, unsigned short threads)
{
    std::thread data(Data::Processor,address,port,threads);
    data.detach();
    while(!Data::status){}
    return Data::status;
}
int main(int argc, char* argv[])
{

    std::thread s(Server::RUN);

    Client::message_t tosend(14);
    memcpy((void*)tosend.data(),"Hello World !\0",14);

    Client::Data::ToSend.push(tosend);

    std::cout << Client::Thread("127.0.0.1",5555,1) << std::endl;

    s.join();
    return 0;
}

这一切似乎都是正确的,那么为什么 recv/send 阻塞了我的线程?为什么国旗不起作用?

4

1 回答 1

1

它打印“abcd”,然后打印“ab”。这意味着代码

std::cout << "a";
if(ToSend.try_pop(send))
{
    std::cout << "b";

尽管您在问题末尾显示的代码暗示您只执行了一次 ToSend.push() ,但执行了两次。

您是否附加了调试器以查看挂起的线程以及它的调用堆栈是什么?

  • 将 std::cout 放在 mutex/critsect 后面,这样您就知道一次只有一个线程在写入,并报告您是哪个线程。
  • 将每个输出放在它自己的行上,这样你就不会因为缓冲而错过一个字母。
  • 如果您不阻止数据,您希望如何接收它?你在某处有一个 select/poll/WaitForSingleObject 调用吗?
于 2013-05-20T04:19:06.597 回答