第一次使用 ZMQ,我正在尝试设置一个进程来处理许多 getimage 请求。当我调试时会发生一些异常,我试图修复并实现一种停止 QueueDevice 终止所有线程并正常退出的方法。
- 接收者.连接(后端绑定地址);在 NetMQ.dll 中发生类型为“NetMQ.InvalidException”的未处理异常,错误代码为 NetMQ.zmq.ErrorCode.EINVAL。为什么这个异常不会停止线程的进一步执行?
- 我尝试将 QueueDevice 设为静态字段并在关闭消息函数中使用 queueDevice.stop() 但随后线程开始抛出 TerminatingExceptions 并且永不退出。那么我可以关闭所有线程和主线程吗?
测试驱动代码
[TestMethod]
public void ProgramStartupShutdownTest()
{
var mockClientWrapper = new Mock<IClient>(MockBehavior.Strict);
var target = new SocketListener(2, mockClientWrapper.Object);
var task = Task.Factory.StartNew(() => target.StartListening("tcp://localhost:81"));
using (var client = NetMQContext.Create())
{
var socket = client.CreateRequestSocket();
socket.Connect("tcp://localhost:81");
var m = new NetMQMessage(new ShutdownMessage().CreateMessageFrames());
socket.SendMessage(m);
}
var timedout = task.Wait(200);
Assert.IsTrue(timedout);
}
我正在使用的代码
private const string BackendBindAddress = "inproc://workers";
public SocketListener(int numberOfWorkers, IClient client )
{
numberOfThreads = numberOfWorkers;
_client = client;
}
public void StartListening(string address)
{
StartZeroMQ(address, context =>
{
for (var i = 0; i <= numberOfThreads; i++)
{
var t = new Thread(WorkerRoutine);
t.Start(
new WorkerParamters
{
Context = context,
Client = _client
}
);
}
});
}
private void StartZeroMQ(string address, Action<NetMQContext> setupWorkers)
{
using (var context = NetMQContext.Create())
{
var queueDevice = new QueueDevice(context, address, BackendBindAddress, DeviceMode.Blocking);
setupWorkers(context);
queueDevice.Start();
}
}
struct WorkerParamters
{
public NetMQContext Context;
public IClient Client;
}
private static void WorkerRoutine(object startparameter)
{
var wp = (WorkerParamters) startparameter;
var client = wp.Client;
using (var receiver = wp.Context.CreateResponseSocket())
{
receiver.Connect(BackendBindAddress);
var running = true;
while (running)
{
var message = receiver.ReceiveMessage();
var letter = Message.ParseMessageFrame(message,
imageMessage => GetImage(imageMessage, client),
videoMessage => GetVideo(videoMessage, client),
shutdownMessage =>
{
running = false;
return true;
});
receiver.Send(letter.ToJson(), Encoding.Unicode);
}
}
}