1

我有以下模式:

多个线程正在向 ConcurrentQueue 发送消息,该 ConcurrentQueue 由单线程Dealer轮询,以便将消息发送到Router

发送多条消息时会引发以下异常:

“SocketException - 无法完成非阻塞套接字操作”

这是使消息出队并将其发送给经销商的线程代码:

Task.Factory.StartNew((state) =>
        {
            using (NetMQSocket dealerSocket = new DealerSocket(_connectionString))
            using (NetMQPoller poller = new NetMQPoller() { dealerSocket })
            {
                dealerSocket.ReceiveReady += DealerSocketOnReceiveReady;
                poller.RunAsync();

                while (true)
                {
                    Message<T> message;
                    if (!_concurrentQueue.TryDequeue(out message)) continue;

                    _pendingRequests.Add(message.Id, message);
                    var mpm = new NetMQMessage(4);
                    mpm.AppendEmptyFrame();
                    mpm.Append(message.Body);
                    mpm.AppendEmptyFrame();
                    mpm.Append(message.Id.ToString());
                    dealerSocket.SendMultipartMessage(mpm);
                }
            }
        }, TaskCreationOptions.LongRunning, _cancellationTokenSource.Token);

发送 MultipartMessage 时发生 SocketException

我试图增加 SendBuffer 大小和/或 SendHighWatermark 大小,但我仍然有同样的错误。

我是否需要处理此异常并重置套接字,或者我永远不应该在这种情况下?

4

1 回答 1

0

DealerSocket 用于 2 个线程:主线程和轮询器。为了只在一个线程中使用dealerSocket,我们使用了NetMQQueue。

更多细节在这里

于 2016-09-21T08:41:06.483 回答