3

我想知道使用 BookSleeve 发布和订阅频道的最佳方式是什么。我目前实现了几个静态方法(见下文),这些方法让我可以将内容发布到特定频道,新创建的频道存储在private static Dictionary<string, RedisSubscriberConnection> subscribedChannels;.

这是正确的方法吗,因为我想发布到频道并订阅同一应用程序中的频道(注意:我的包装器是一个静态类)。即使我想发布和订阅,创建一个频道是否足够?显然,我不会发布到与在同一应用程序中订阅的频道相同的频道。但我测试了它并且它有效:

 RedisClient.SubscribeToChannel("Test").Wait();
 RedisClient.Publish("Test", "Test Message");

它奏效了。

这是我的问题:

1)设置一个专用的发布通道和一个专用的订阅通道而不是同时使用一个通道会更有效吗?

2)“channel”和“PatternSubscription”在语义上有什么区别?我的理解是我可以通过PatternSubscription()同一个频道订阅多个“主题”,对吗?但是,如果我想为每个“主题”调用不同的回调,我必须为每个主题设置一个频道,对吗?那是有效的还是你会建议不要这样做?

这里是代码片段。

谢谢!!!

    public static Task<long> Publish(string channel, byte[] message)
    {
        return connection.Publish(channel, message);
    }

    public static Task SubscribeToChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);

        RedisSubscriberConnection channel = connection.GetOpenSubscriberChannel();

        subscribedChannels[subscriptionString] = channel;

        return channel.PatternSubscribe(subscriptionString, OnSubscribedChannelMessage);
    }

    public static Task UnsubscribeFromChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);

        if (subscribedChannels.Keys.Contains(subscriptionString))
        {
            RedisSubscriberConnection channel = subscribedChannels[subscriptionString];

            Task  task = channel.PatternUnsubscribe(subscriptionString);

            //remove channel subscription
            channel.Close(true);
            subscribedChannels.Remove(subscriptionString);

            return task;
        }
        else
        {
            return null;
        }
    }

    private static string ChannelSubscriptionString(string channelName)
    {
        return channelName + "*";
    }
4

1 回答 1

6

1:您的示例中只有一个频道(Test);频道只是用于特定发布/订阅交换的名称。但是,由于 redis API 工作方式的特殊性,有必要使用 2 个连接。具有任何订阅的连接不能做任何其他事情,除了:

  • 听消息
  • 管理自己的订阅 ( subscribe, psubscribe, unsubscribe, punsubscribe)

但是,我不明白这一点:

private static Dictionary<string, RedisSubscriberConnection>

除非您要满足特定于您的需求,否则您不应该需要多个订阅者连接。单个订阅者连接可以处理任意数量的订阅。快速检查client list我的一台服务器,我与(在撰写本文时)23,002 个订阅有一个连接。这可能会减少,但是:它有效。

2:模式订阅支持通配符;因此/topic/1/topic/2/您可以订阅/topic/*. 所使用的实际通道的名称publish作为回调签名的一部分提供给接收者。

任何一个都可以工作。应该注意的是,性能publish受唯一订阅总数的影响 - 但坦率地说,即使您有数以万计的订阅频道使用subscribe而不是psubscribe.

但从publish

时间复杂度:O(N+M),其中 N 是订阅接收通道的客户端数量,M 是订阅模式的总数(任何客户端)。

我建议阅读pub/sub的 redis 文档。


编辑以下问题:

a)如果我想保证在接收项目时保留从同一发布者发送项目的顺序,我假设我必须同步“发布”(使用 Result 或 Wait()),对吗?

这根本没有任何区别;既然你提到Result/ Wait(),我假设你在谈论 BookSleeve - 在这种情况下,多路复用器已经保留了命令顺序。Redis 本身是单线程的,并且总是会按顺序处理单个连接上的命令。但是:订阅者上的回调可以异步执行,并且可以(单独)交给工作线程。我目前正在调查是否可以强制它从RedisSubscriberConnection.

更新:从 1.3.22 开始,您可以将其设置CompletionModePreserveOrder- 然后所有回调将按顺序完成,而不是同时完成。

b)根据您的建议进行调整后,无论有效负载的大小如何,我在发布少量项目时都会获得出色的性能。但是,当同一发布者发送 100,000 个或更多项目时,性能会迅速下降(仅从我的机器发送就下降到 7-8 秒)。

首先,那个时间听起来很高——我在本地测试(对于 100,000 个出版物,包括等待所有出版物的响应)1766 毫秒(本地)或 1219 毫秒(远程)(这听起来可能违反直觉,但我的“本地”不是t 运行相同版本的 redis;我的“远程”在 Centos 上是 2.6.12;我的“本地”在 Windows 上是 2.6.8-pre2)。

我不能让你的实际服务器更快或加快网络速度,但是:如果这是数据包碎片,我已经添加(只为你)一个SuspendFlush()/ResumeFlush()对。这禁用了急切刷新(即当发送队列为空时;其他类型的刷新仍然发生);您可能会发现这有帮助:

conn.SuspendFlush();
try {
    // start lots of operations...
} finally {
    conn.ResumeFlush();
}

请注意,在Wait您恢复之前,您不应该这样做,因为在您调用之前ResumeFlush(),发送缓冲区中可能仍有一些操作。一切就绪后,我得到(100,000 次操作):

local: 1766ms (eager-flush) vs 1554ms (suspend-flush)
remote: 1219ms (eager-flush) vs 796ms (suspend-flush)

如您所见,它对远程服务器的帮助更大,因为它将通过网络放置更少的数据包。

我不能使用事务,因为稍后要发布的项目并非一次全部可用。有没有办法在考虑到这些知识的情况下进行优化?

认为上面已经解决了这个问题 - 但请注意最近CreateBatch也添加了。批处理的操作很像事务 - 只是:没有事务。同样,它是另一种减少数据包碎片的机制。在您的特定情况下,我怀疑暂停/恢复(在同花顺时)是您最好的选择。

您是否建议使用一个通用 RedisConnection 和一个 RedisSubscriberConnection 或任何其他配置来让此类包装器执行所需的功能?

只要您不执行阻塞操作(blpopbrpopbrpoplpush),或将过大的 BLOB 放入网络中(可能会在清除时延迟其他操作),那么每种类型的单个连接通常都可以很好地工作。但 YMMV 取决于您的确切使用要求。

于 2013-04-08T13:24:36.997 回答