1

当我在控制台上打印接收到的消息时,显示的消息都被弄乱了,每条消息都包含 5 个字符串子消息,这些子消息在控制恢复到传入消息回调之前打印在控制台上。我强烈认为这是因为传入的消息事件是在 Booksleeve 中异步引发的?

我参考了以下帖子,PubSub 如何在 BookSleeve/Redis 中工作?,其中作者 Marc Gravell 指出通过将 Completion Mode 设置为“PreserveOrder”来强制同步接收的能力。我已经这样做了,在连接客户端之前和之后都尝试过。两者似乎都不起作用。

有什么想法可以接收消息并按照发送的确切顺序在控制台上打印它们吗?在这种情况下,我只有一个出版商。

谢谢

编辑:

下面的一些代码片段展示了我如何发送消息以及我快速编写的 Booksleeve 包装器。

这里是客户端(我有一个类似Client2的接收消息并检查顺序,但我省略了它,因为它看起来微不足道)。

class Client1
{
    const string ClientId = "Client1";
    private static Messaging Client { get; set; }

    private static void Main(string[] args)
    {
        var settings = new MessagingSettings("127.0.0.1", 6379, -1, 60, 5000, 1000);
        Client = new Messaging(ClientId, settings, ReceiveMessage);
        Client.Connect();

        Console.WriteLine("Press key to start sending messages...");
        Console.ReadLine();

        for (int index = 1; index <= 100; index++)
        {
            //I turned this off because I want to preserve 
            //the order even if messages are sent in rapit succession

            //Thread.Sleep(5); 

            var msg = new MessageEnvelope("Client1", "Client2", index.ToString());
            Client.SendOneWayMessage(msg);
        }

        Console.WriteLine("Press key to exit....");
        Console.ReadLine();

        Client.Disconnect();
    }

    private static void ReceiveMessage(MessageEnvelope msg)
    {
        Console.WriteLine("Message Received");
    }
}

这里是库的相关代码片段:

public void Connect()
    {
        RequestForReplyMessageIds = new ConcurrentBag<string>();

        Connection = new RedisConnection(Settings.HostName, Settings.Port, Settings.IoTimeOut);
        Connection.Closed += OnConnectionClosed;
        Connection.CompletionMode = ResultCompletionMode.PreserveOrder;
        Connection.SetKeepAlive(Settings.PingAliveSeconds);

        try
        {
            if (Connection.Open().Wait(Settings.RequestTimeOutMilliseconds))
            {
                //Subscribe to own ClientId Channel ID
                SubscribeToChannel(ClientId);
            }
            else
            {
                throw new Exception("Could not connect Redis client to server");
            }
        }
        catch
        {
            throw new Exception("Could not connect Redis Client to Server");
        }
    }

public void SendOneWayMessage(MessageEnvelope message)
    {
        SendMessage(message);
    }

private void SendMessage(MessageEnvelope msg)
    {
        //Connection.Publish(msg.To, msg.GetByteArray());
        Connection.Publish(msg.To, msg.GetByteArray()).Wait();
    }

private void IncomingChannelSubscriptionMessage(string channel, byte[] body)
    {
        var msg = MessageEnvelope.GetMessageEnvelope(body);

        //forward received message
        ReceivedMessageCallback(msg);

        //release requestMessage if returned msgId matches
        string msgId = msg.MessageId;
        if (RequestForReplyMessageIds.Contains(msgId))
        {
            RequestForReplyMessageIds.TryTake(out msgId);
        }
    }

public void SubscribeToChannel(string channelName)
    {
        if (!ChannelSubscriptions.Contains(channelName))
        {
            var subscriberChannel = Connection.GetOpenSubscriberChannel();
            subscriberChannel.Subscribe(channelName, IncomingChannelSubscriptionMessage).Wait();
            ChannelSubscriptions.Add(channelName);
        }
    }
4

1 回答 1

1

如果没有确切地看到你是如何检查这个的,很难评论,但我可以说的是,任何线程异常都很难追踪和修复,因此不太可能在 BookSleeve 中解决,因为它已经成功了。然而!绝对会在 StackExchange.Redis 中进行检查。这是我在 SE.Redis 中组装的一个装备(而且,令人尴尬的是,它确实突出了一个小错误,在下一个版本中修复,所以 .222 或更高版本);先输出:

Subscribing...

Sending (preserved order)...
Allowing time for delivery etc...
Checking...
Received: 500 in 2993ms
Out of order: 0

Sending (any order)...
Allowing time for delivery etc...
Checking...
Received: 500 in 341ms
Out of order: 306

(请记住,500 x 5ms 是 2500,所以我们不应该对 2993ms 或 341ms 感到惊讶——这主要是Thread.Sleep我们为推动线程池重叠它们而添加的成本;如果我们删除它,两个循环都需要 0 毫秒,这太棒了 - 但我们无法如此令人信服地看到重叠问题)

如您所见,第一次运行有正确的订单输出;第二次运行的顺序不一,但速度快了十倍。那就是做琐碎工作的时候;对于实际工作,它会更加引人注目。与往常一样,这是一种权衡。

这是测试设备:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using StackExchange.Redis;

static class Program
{
    static void Main()
    {
        using (var conn = ConnectionMultiplexer.Connect("localhost"))
        {
            var sub = conn.GetSubscriber();
            var received = new List<int>();
            Console.WriteLine("Subscribing...");
            const int COUNT = 500;
            sub.Subscribe("foo", (channel, message) =>
            {
                lock (received)
                {
                    received.Add((int)message);
                    if (received.Count == COUNT)
                        Monitor.PulseAll(received); // wake the test rig
                }
                Thread.Sleep(5); // you kinda need to be slow, otherwise
                // the pool will end up doing everything on one thread
            });
            SendAndCheck(conn, received, COUNT, true);
            SendAndCheck(conn, received, COUNT, false);
        }

        Console.WriteLine("Press any key");
        Console.ReadLine();
    }
    static void SendAndCheck(ConnectionMultiplexer conn, List<int> received, int quantity, bool preserveAsyncOrder)
    {
        conn.PreserveAsyncOrder = preserveAsyncOrder;
        var sub = conn.GetSubscriber();
        Console.WriteLine();
        Console.WriteLine("Sending ({0})...", (preserveAsyncOrder ? "preserved order" : "any order"));
        lock (received)
        {
            received.Clear();
            // we'll also use received as a wait-detection mechanism; sneaky

            // note: this does not do any cheating;
            // it all goes to the server and back
            for (int i = 0; i < quantity; i++)
            {
                sub.Publish("foo", i);
            }

            Console.WriteLine("Allowing time for delivery etc...");
            var watch = Stopwatch.StartNew();
            if (!Monitor.Wait(received, 10000))
            {
                Console.WriteLine("Timed out; expect less data");
            }
            watch.Stop();
            Console.WriteLine("Checking...");
            lock (received)
            {
                Console.WriteLine("Received: {0} in {1}ms", received.Count, watch.ElapsedMilliseconds);
                int wrongOrder = 0;
                for (int i = 0; i < Math.Min(quantity, received.Count); i++)
                {
                    if (received[i] != i) wrongOrder++;
                }
                Console.WriteLine("Out of order: " + wrongOrder);
            }
        }
    }
}
于 2014-03-25T11:47:26.420 回答