我有一个 Somdron 的“Reliable Pub-Sub”模式的 v4.0.0.1 实现,用于在新应用程序的两个部分之间进行通信。该应用程序将有一个“服务器”(执行所有繁重计算的引擎)和“客户端”,它们将发送请求并从服务器获取有关进度的信息。
我当前版本的“Reliable Pub-Sub”的问题是我似乎没有正确的方法将请求从客户端发送到服务器。让我首先向您展示代码:
服务器:
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Linq;
namespace Demo.Messaging.Server
{
public class ZeroMqMessageServer : IDisposable
{
private const string WELCOME_MESSAGE = "WM";
private const string HEARTBEAT_MESSAGE = "HB";
private const string PUBLISH_MESSAGE_TOPIC = "PUB";
private readonly TimeSpan HEARTBEAT_INTERVAL = TimeSpan.FromSeconds(2);
private NetMQActor actor;
private NetMQTimer heartbeatTimer;
private XPublisherSocket publisher;
private NetMQPoller poller;
public ZeroMqMessageServer(string address)
{
Address = address;
actor = NetMQActor.Create(Start);
}
private void Start(PairSocket shim)
{
using (publisher = new XPublisherSocket())
{
publisher.SetWelcomeMessage(WELCOME_MESSAGE);
publisher.Bind(Address);
//publisher.ReceiveReady -= DropPublisherSubscriptions;
publisher.ReceiveReady += DropPublisherSubscriptions;
heartbeatTimer = new NetMQTimer(HEARTBEAT_INTERVAL);
heartbeatTimer.Elapsed += OnHeartbeatTimeElapsed;
shim.ReceiveReady += OnShimReceiveReady;
shim.SignalOK(); // Let the actor know we are ready to work.
poller = new NetMQPoller() { publisher, shim, heartbeatTimer };
poller.Run();
}
}
private void DropPublisherSubscriptions(object sender, NetMQSocketEventArgs e)
{
publisher.SkipMultipartMessage();
}
private void OnHeartbeatTimeElapsed(object sender, NetMQTimerEventArgs e)
{
publisher.SendFrame(HEARTBEAT_MESSAGE);
}
private void OnShimReceiveReady(object sender, NetMQSocketEventArgs e)
{
var socket = e.Socket;
string command = socket.ReceiveFrameString();
if (command == PUBLISH_MESSAGE_TOPIC)
{
// Forward the message to the publisher.
NetMQMessage message = socket.ReceiveMultipartMessage();
publisher.SendMultipartMessage(message);
}
else if (command == NetMQActor.EndShimMessage)
{
// Dispose command received, stop the poller.
poller.Stop();
}
}
public void PublishMessage(NetMQMessage message)
{
// We can use actor like NetMQSocket and publish messages.
actor.SendMoreFrame(PUBLISH_MESSAGE_TOPIC)
.SendMultipartMessage(message);
}
public string Address { get; private set; }
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
actor?.Dispose();
publisher?.Dispose();
poller?.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
客户:
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using Messaging.Helpers;
namespace Demo.Messaging.Client
{
public class ZeroMqMessageClient : IDisposable
{
private string SUBSCRIBE_COMMAND = "S";
private const string WELCOME_MESSAGE = "WM";
private const string HEARTBEAT_MESSAGE = "HB";
private const string PUBLISH_MESSAGE_TOPIC = "PUB";
private readonly TimeSpan TIMEOUT = TimeSpan.FromSeconds(5);
private readonly TimeSpan RECONNECTION_PERIOD = TimeSpan.FromSeconds(5);
private readonly string[] addressCollection;
private List<string> subscriptions = new List<string>();
private NetMQTimer timeoutTimer;
private NetMQTimer reconnectionTimer;
private NetMQActor actor;
private SubscriberSocket subscriber;
private PairSocket shim;
private NetMQPoller poller;
public ZeroMqMessageClient(params string[] addresses)
{
addressCollection = addresses;
actor = NetMQActor.Create(Start);
}
private void Start(PairSocket shim)
{
this.shim = shim;
shim.ReceiveReady += OnShimReceiveReady;
timeoutTimer = new NetMQTimer(TIMEOUT);
timeoutTimer.Elapsed += OnTimeoutTimerElapsed;
reconnectionTimer = new NetMQTimer(RECONNECTION_PERIOD);
reconnectionTimer.Elapsed += OnReconnectionTimerElapsed;
poller = new NetMQPoller() { shim, timeoutTimer, reconnectionTimer };
shim.SignalOK();
Connect();
poller.Run();
if (subscriber != null)
subscriber.Dispose();
}
private void Connect()
{
using (NetMQPoller tmpPoller = new NetMQPoller())
{
List<SubscriberSocket> socketCollection = new List<SubscriberSocket>();
SubscriberSocket connectedSocket = null;
EventHandler<NetMQSocketEventArgs> messageHandler = (s, e) =>
{
connectedSocket = (SubscriberSocket)e.Socket;
tmpPoller.Stop();
};
// We cancel the poller without setting the connected socket.
NetMQTimer tmpTimeoutTimer = new NetMQTimer(TIMEOUT);
tmpTimeoutTimer.Elapsed += (s, e) => tmpPoller.Stop();
tmpPoller.Add(tmpTimeoutTimer);
// Attempt to subscribe to the supplied list of addresses.
foreach (var address in addressCollection)
{
SubscriberSocket socket = new SubscriberSocket();
socketCollection.Add(socket);
//socket.ReceiveReady -= messageHandler;
socket.ReceiveReady += messageHandler;
tmpPoller.Add(socket);
// Subscribe to welcome messages.
socket.Subscribe(WELCOME_MESSAGE);
socket.Connect(address);
}
tmpPoller.Run(); // Block and wait for connection.
// We should have an active socket/conection.
if (connectedSocket != null)
{
// Remove the connected socket from the collection.
socketCollection.Remove(connectedSocket);
ZeroMqHelpers.CloseConnectionsImmediately(socketCollection);
// Set the active socket.
subscriber = connectedSocket;
//subscriber.SkipMultipartMessage(); // This skips the welcome message.
// Subscribe to subscriptions.
subscriber.Subscribe(HEARTBEAT_MESSAGE);
foreach (var subscription in subscriptions)
subscriber.Subscribe(subscription);
// Remove start-up handler, now handle messages properly.
subscriber.ReceiveReady -= messageHandler;
subscriber.ReceiveReady += OnSubscriberReceiveReady;
poller.Add(subscriber);
// Reset timers.
timeoutTimer.Enable = true;
reconnectionTimer.Enable = false;
}
else // We need to attempt re-connection.
{
// Close all existing connections.
ZeroMqHelpers.CloseConnectionsImmediately(socketCollection);
timeoutTimer.Enable = false;
reconnectionTimer.Enable = true;
}
}
}
private void OnShimReceiveReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveFrameString();
if (command == NetMQActor.EndShimMessage)
{
poller.Stop();
}
else if (command == SUBSCRIBE_COMMAND)
{
string topic = e.Socket.ReceiveFrameString();
subscriptions.Add(topic);
if (subscriber != null)
subscriber.Subscribe(topic);
}
}
private void OnTimeoutTimerElapsed(object sender, NetMQTimerEventArgs e)
{
if (subscriber != null)
{
poller.Remove(subscriber);
subscriber.Dispose();
subscriber = null;
Connect();
}
}
private void OnReconnectionTimerElapsed(object sender, NetMQTimerEventArgs e)
{
// We re-attempt connection.
Connect();
}
private void OnSubscriberReceiveReady(object sender, NetMQSocketEventArgs e)
{
// Here we just forwward the message on to the actor.
var message = subscriber.ReceiveMultipartMessage();
string topic = message[0].ConvertToString();
// Let us see what is in the message.
if (message.Count() > 1)
{
string content = message[1].ConvertToString();
Console.WriteLine($"ZMQ_ALT - {topic}:: {content}");
}
if (topic == WELCOME_MESSAGE)
{
// Disconnection has occurred we might want to restore state from a snapshot.
}
else if (topic == HEARTBEAT_MESSAGE)
{
// We got a heartbeat, lets postponed the timer.
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
else
{
shim.SendMultipartMessage(message);
}
}
public void Subscribe(string topic)
{
actor.SendMoreFrame(SUBSCRIBE_COMMAND).SendFrame(topic);
}
public NetMQMessage ReceiveMessage()
{
return actor.ReceiveMultipartMessage();
}
public void PublishMessage(NetMQMessage message)
{
actor.SendMoreFrame(PUBLISH_MESSAGE_TOPIC)
.SendMultipartMessage(message);
}
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
actor?.Dispose();
subscriber?.Dispose();
shim?.Dispose();
poller?.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
现在,我可以在两个单独的控制台应用程序中使用 main 方法中的以下代码将消息从服务器发送到客户端,这很棒,客户端
SERVER 的 Program.cs:
class Program
{
static void Main(string[] args)
{
using (ZeroMqMessageServer server = new ZeroMqMessageServer("tcp://127.0.0.1:6669"))
{
while (true)
{
NetMQMessage message = new NetMQMessage();
message.Append("A");
message.Append(new Random().Next().ToString());
server.PublishMessage(message);
Thread.Sleep(200);
}
}
}
}
客户的 Program.cs:
class Program
{
static void Main(string[] args)
{
Task.Run(() =>
{
using (ZeroMqMessageClient client = new ZeroMqMessageClient("tcp://127.0.0.1:6669"))
{
client.Subscribe(String.Empty);
while (true) { }
}
});
Console.ReadLine();
}
}
客户端正确地自动检测断开的连接并重新连接,奇妙的小模式。
但是,这种开箱即用的模式不允许客户端向服务器发送消息。所以在客户端我添加了以下代码
public void PublishMessage(NetMQMessage message)
{
actor.SendMoreFrame(PUBLISH_MESSAGE_TOPIC)
.SendMultipartMessage(message);
}
在客户端中,我已将publisher.ReceiveReady += DropPublisherSubscriptions;
事件处理程序更改为
private void DropPublisherSubscriptions(object sender, NetMQSocketEventArgs e)
{
var message = e.Socket.ReceiveMultipartMessage();
string topic = message[0].ConvertToString();
Console.WriteLine($"TOPIC = {topic}");
// Let us see what is in the message.
if (message.Count() > 1)
{
string content = message[1].ConvertToString();
Console.WriteLine($"TEST RECIEVE FROM CLIENT - {topic}:: {content}");
}
publisher.SkipMultipartMessage();
}
但这似乎无法处理我的消息。它接收心跳和欢迎消息,但我做的不对。
我怎样才能启用/促进客户端与服务器交谈而不破坏我已经拥有的东西?
谢谢你的时间。