我在 .NET 4.5 上使用 NetMQ (Nuget 3.3.2.2),我有一个带有 PUSH 套接字的快速生成器进程,以及一个使用 PULL 套接字的慢速消费者进程。如果我发送足够多的消息来命中发送 HWM,则发送进程会无限期地阻塞线程。
一些人为的(生成器)代码说明了这个问题:
using (var ctx = NetMQContext.Create())
using (var pushSocket = ctx.CreatePushSocket())
{
pushSocket.Connect("tcp://127.0.0.1:42404");
var template = GenerateMessageBody(i);
for (int i = 1; i <= 100000; i++)
{
pushSocket.SendMoreFrame("SampleMessage").SendFrame(Messages.SerializeToByteArray(template));
if (i % 1000 == 0)
Console.WriteLine("Sent " + i + " messages");
}
Console.WriteLine("All finished");
Console.ReadKey();
}
在我的配置中,这通常会报告它已经发送了大约 5000 或 6000 条消息,然后会简单地阻止。如果我将 send HWM 设置为一个较大的值(或 0),那么它会按预期发送所有消息。
看起来它在再次尝试之前等待接收另一个命令,这里:(SocketBase.TrySend)
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
// If timeout is reached in the meantime, return EAGAIN.
while (true)
{
ProcessCommands(timeoutMillis, false);
从我在 0MQ 指南中读到的内容来看,阻塞一个完整的 PUSH 套接字是正确的行为(也是我希望它做的),但是我希望它在消费者清除其队列后恢复。
没有使用某种 TrySend 模式并自己处理块,是否有一些我可以设置的选项或我可以使用的其他工具来让 PUSH 套接字尝试定期重新发送被阻止的消息?