1

我在 .NET 4.5 上使用 NetMQ (Nuget 3.3.2.2),我有一个带有 PUSH 套接字的快速生成器进程,以及一个使用 PULL 套接字的慢速消费者进程。如果我发送足够多的消息来命中发送 HWM,则发送进程会无限期地阻塞线程。

一些人为的(生成器)代码说明了这个问题:

        using (var ctx = NetMQContext.Create())
        using (var pushSocket = ctx.CreatePushSocket())
        {
            pushSocket.Connect("tcp://127.0.0.1:42404");

            var template = GenerateMessageBody(i);
            for (int i = 1; i <= 100000; i++)
            {
                pushSocket.SendMoreFrame("SampleMessage").SendFrame(Messages.SerializeToByteArray(template));

                if (i % 1000 == 0)
                    Console.WriteLine("Sent " + i + " messages");
            }
            Console.WriteLine("All finished");
            Console.ReadKey();
        }

在我的配置中,这通常会报告它已经发送了大约 5000 或 6000 条消息,然后会简单地阻止。如果我将 send HWM 设置为一个较大的值(或 0),那么它会按预期发送所有消息。

看起来它在再次尝试之前等待接收另一个命令,这里:(SocketBase.TrySend)

        // Oops, we couldn't send the message. Wait for the next
        // command, process it and try to send the message again.
        // If timeout is reached in the meantime, return EAGAIN.
        while (true)
        {
            ProcessCommands(timeoutMillis, false);

从我在 0MQ 指南中读到的内容来看,阻塞一个完整的 PUSH 套接字是正确的行为(也是我希望它做的),但是我希望它在消费者清除其队列后恢复。

没有使用某种 TrySend 模式并自己处理块,是否有一些我可以设置的选项或我可以使用的其他工具来让 PUSH 套接字尝试定期重新发送被阻止的消息?

4

0 回答 0