当我在控制台上打印接收到的消息时,显示的消息都被弄乱了,每条消息都包含 5 个字符串子消息,这些子消息在控制恢复到传入消息回调之前打印在控制台上。我强烈认为这是因为传入的消息事件是在 Booksleeve 中异步引发的?
我参考了以下帖子,PubSub 如何在 BookSleeve/Redis 中工作?,其中作者 Marc Gravell 指出通过将 Completion Mode 设置为“PreserveOrder”来强制同步接收的能力。我已经这样做了,在连接客户端之前和之后都尝试过。两者似乎都不起作用。
有什么想法可以接收消息并按照发送的确切顺序在控制台上打印它们吗?在这种情况下,我只有一个出版商。
谢谢
编辑:
下面的一些代码片段展示了我如何发送消息以及我快速编写的 Booksleeve 包装器。
这里是客户端(我有一个类似Client2
的接收消息并检查顺序,但我省略了它,因为它看起来微不足道)。
class Client1
{
const string ClientId = "Client1";
private static Messaging Client { get; set; }
private static void Main(string[] args)
{
var settings = new MessagingSettings("127.0.0.1", 6379, -1, 60, 5000, 1000);
Client = new Messaging(ClientId, settings, ReceiveMessage);
Client.Connect();
Console.WriteLine("Press key to start sending messages...");
Console.ReadLine();
for (int index = 1; index <= 100; index++)
{
//I turned this off because I want to preserve
//the order even if messages are sent in rapit succession
//Thread.Sleep(5);
var msg = new MessageEnvelope("Client1", "Client2", index.ToString());
Client.SendOneWayMessage(msg);
}
Console.WriteLine("Press key to exit....");
Console.ReadLine();
Client.Disconnect();
}
private static void ReceiveMessage(MessageEnvelope msg)
{
Console.WriteLine("Message Received");
}
}
这里是库的相关代码片段:
public void Connect()
{
RequestForReplyMessageIds = new ConcurrentBag<string>();
Connection = new RedisConnection(Settings.HostName, Settings.Port, Settings.IoTimeOut);
Connection.Closed += OnConnectionClosed;
Connection.CompletionMode = ResultCompletionMode.PreserveOrder;
Connection.SetKeepAlive(Settings.PingAliveSeconds);
try
{
if (Connection.Open().Wait(Settings.RequestTimeOutMilliseconds))
{
//Subscribe to own ClientId Channel ID
SubscribeToChannel(ClientId);
}
else
{
throw new Exception("Could not connect Redis client to server");
}
}
catch
{
throw new Exception("Could not connect Redis Client to Server");
}
}
public void SendOneWayMessage(MessageEnvelope message)
{
SendMessage(message);
}
private void SendMessage(MessageEnvelope msg)
{
//Connection.Publish(msg.To, msg.GetByteArray());
Connection.Publish(msg.To, msg.GetByteArray()).Wait();
}
private void IncomingChannelSubscriptionMessage(string channel, byte[] body)
{
var msg = MessageEnvelope.GetMessageEnvelope(body);
//forward received message
ReceivedMessageCallback(msg);
//release requestMessage if returned msgId matches
string msgId = msg.MessageId;
if (RequestForReplyMessageIds.Contains(msgId))
{
RequestForReplyMessageIds.TryTake(out msgId);
}
}
public void SubscribeToChannel(string channelName)
{
if (!ChannelSubscriptions.Contains(channelName))
{
var subscriberChannel = Connection.GetOpenSubscriberChannel();
subscriberChannel.Subscribe(channelName, IncomingChannelSubscriptionMessage).Wait();
ChannelSubscriptions.Add(channelName);
}
}