13

我最近一直在使用响应式框架做一些工作,到目前为止我一直非常喜欢它。我正在考虑用一些过滤的 IObservable 替换传统的轮询消息队列,以清理我的服务器操作。以旧的方式,我处理进入服务器的消息是这样的:

// Start spinning the process message loop
   Task.Factory.StartNew(() =>
   {
       while (true)
       {
           Command command = m_CommandQueue.Take();
           ProcessMessage(command);
       }
   }, TaskCreationOptions.LongRunning);

这导致连续轮询线程将来自客户端的命令委托给 ProcessMessage 方法,在该方法中我有一系列 if/else-if 语句,这些语句确定命令的类型并根据其类型委托工作

我将使用 Reactive 的事件驱动系统替换它,为此我编写了以下代码:

 private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
 private IObservable<BesiegedMessage> m_MessagePublisher;

 m_MessagePublisher = m_MessageQueue
       .GetConsumingEnumerable()
       .ToObservable(TaskPoolScheduler.Default);

        // All generic Server messages (containing no properties) will be processed here
 IDisposable genericServerMessageSubscriber = m_MessagePublisher
       .Where(message => message is GenericServerMessage)
       .Subscribe(message =>
       {
           // do something with the generic server message here
       }

我的问题是,虽然这可行,但使用阻塞集合作为这样的 IObservable 的支持是一种好习惯吗?我没有看到 Take() 曾经以这种方式调用过,这让我认为消息会堆积在队列中而不会在处理后被删除?

将主题作为后备集合来驱动将接收这些消息的过滤后的 IObservable 会更有效吗?还有什么我在这里遗漏的可能有利于这个系统的架构的东西吗?

4

4 回答 4

8

这是一个完整的工作示例,在 Visual Studio 2012 下测试。

  1. 创建一个新的 C# 控制台应用程序。
  2. 右键单击您的项目,选择“管理 NuGet 包”,然后添加“反应式扩展 - 主库”。

添加此 C# 代码:

using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace DemoRX
{
    class Program
    {
        static void Main(string[] args)
        {
            BlockingCollection<string> myQueue = new BlockingCollection<string>();
            {                
                IObservable<string> ob = myQueue.
                  GetConsumingEnumerable().
                  ToObservable(TaskPoolScheduler.Default);

                ob.Subscribe(p =>
                {
                    // This handler will get called whenever 
                    // anything appears on myQueue in the future.
                    Console.Write("Consuming: {0}\n",p);                    
                });
            }
            // Now, adding items to myQueue will trigger the item to be consumed
            // in the predefined handler.
            myQueue.Add("a");
            myQueue.Add("b");
            myQueue.Add("c");           
            Console.Write("[any key to exit]\n");
            Console.ReadKey();
        }
    }
}

您将在控制台上看到:

[any key to exit]
Consuming: a
Consuming: b
Consuming: c

使用 RX 的真正好处是您可以使用 LINQ 的全部功能来过滤掉任何不需要的消息。例如,添加一个.Where按“a”过滤的子句,然后观察会发生什么:

ob.Where(o => (o == "a")).Subscribe(p =>
{
    // This will get called whenever something appears on myQueue.
    Console.Write("Consuming: {0}\n",p);                    
});

哲学笔记

与启动专用线程来轮询队列相比,此方法的优势在于,您不必担心程序退出后正确处理线程。这意味着您不必为 IDisposable 或 CancellationToken 操心(在处理 BlockingCollection 时总是需要这样做,否则您的程序可能会因拒绝死亡的线程而挂起退出)。

相信我,编写完全健壮的代码来处理来自 BlockingCollection 的事件并不像您想象的那么容易。我更喜欢使用 RX 方法,如上所示,因为它更简洁、更健壮、代码更少,并且您可以使用 LINQ 进行过滤。

潜伏

我对这种方法的速度感到惊讶。

在我的 Xeon X5650 @ 2.67Ghz 上,处理 1000 万个事件需要 5 秒,每个事件大约需要 0.5 微秒。将项目放入 BlockingCollection 需要 4.5 秒,因此 RX 将它们取出并处理它们的速度几乎与它们进入时一样快。

穿线

在我所有的测试中,RX 只启动了一个线程来处理队列中的任务。

这意味着我们有一个非常好的模式:我们可以使用 RX 从多个线程收集传入数据,将它们放入共享队列中,然后在单个线程上处理队列内容(根据定义,这是线程安全的)。

这种模式在处理多线程代码时消除了很多麻烦,通过队列将数据的生产者和消费者解耦,其中生产者可以是多线程的,消费者是单线程的,因此是线程安全的。这就是使 Erlang 如此健壮的概念。有关此模式的更多信息,请参阅多线程变得非常简单

于 2014-04-26T21:30:43.447 回答
7

这是直接从我的后部提取的东西 - 任何真正的解决方案都将在很大程度上取决于您的实际使用情况,但这里是“有史以来最便宜的伪消息队列系统”:

想法/动机:

  • 故意曝光IObservable<T>这样的订阅者可以进行他们想要的任何过滤/交叉订阅
  • 整个队列是无类型的,但是RegisterPublish类型安全的(ish)
  • YMMV 及其Publish()所在位置 - 尝试移动它
  • 通常Subject是禁止的,尽管在这种情况下它确实会产生一些简单的代码。
  • 也可以将注册“内部化”以实际进行订阅,但随后队列将需要管理IDisposables创建的 - 呸,让您的消费者处理它!

编码:

public class TheCheapestPubSubEver
{    
    private Subject<object> _inner = new Subject<object>();

    public IObservable<T> Register<T>()
    {
        return _inner.OfType<T>().Publish().RefCount();
    }
    public void Publish<T>(T message)
    {
        _inner.OnNext(message);
    }
}

用法:

void Main()
{
    var queue = new TheCheapestPubSubEver();

    var ofString = queue.Register<string>();
    var ofInt = queue.Register<int>();

    using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
    using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
    {
        queue.Publish("Foo");
        queue.Publish(1);
        Console.ReadLine();
    }
}

输出:

A string! Foo
An int! 1

然而,这并没有严格执行“消费消费者”——特定类型的多个寄存器会导致多个观察者调用——即:

var queue = new TheCheapestPubSubEver();

var ofString = queue.Register<string>();
var anotherOfString = queue.Register<string>();
var ofInt = queue.Register<int>();

using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
using(anotherOfString.Subscribe(s => Console.WriteLine("Another string! {0}", s)))

{
    queue.Publish("Foo");
    queue.Publish(1);
    Console.ReadLine();
}

结果是:

A string! Foo
Another string! Foo
An int! 1
于 2013-04-02T22:52:14.627 回答
1

我没有BlockingCollection在这种情况下使用过 - 所以我在“推测” - 你应该运行它来批准,反驳。

BlockingCollection可能只会使这里的事情更加复杂(或提供很少的帮助)。看看乔恩的这篇文章- 只是为了确认。GetConsumingEnumerable将为您提供可枚举的“每个订阅者”。最终使他们筋疲力尽 - Rx 需要考虑的事情。

此外,IEnumerable<>.ToObservable“源”进一步变平。当它起作用时(你可以查找源代码——我更推荐使用 Rx)——每个订阅者都会创建一个自己的“枚举器”——所以所有人都会得到他们自己版本的提要。我真的不确定,在这样的 Observable 场景中是如何实现的。

无论如何-如果您想提供应用程序范围的消息-IMO,您需要以Subject其他形式介绍或声明(例如发布等)。从这个意义上说,我认为 BlockingCollection 不会有任何帮助 - 但同样,最好你自己尝试一下。

注意(哲学的)

如果您想组合消息类型,或组合不同的来源 - 例如在更“真实世界”的场景中 - 它会变得更加复杂。我必须说它变得非常有趣。

注意将它们“扎根”到单一共享流中(并避免 Jer 正确建议的内容)。

我建议你不要试图逃避使用Subject. 对于你所需要的,那是你的朋友——不管所有与无状态相关的讨论以及主题有多糟糕——你实际上都有一个状态(并且你需要一个“状态”)——Rx 在“事后”开始,所以你无论如何享受它的好处。

我鼓励你这样做,因为我喜欢它的结果。

于 2013-04-02T22:08:23.130 回答
-1

我的问题是我们已经将一个队列(我通常将它与一个消费者的破坏性读取联系起来,特别是如果您使用 BlockingCollection 时)变成广播(发送给现在正在收听的任何人和每个人)。

这似乎是两个相互矛盾的想法。

我已经看到这样做了,但它随后被丢弃,因为它是“对错误问题的正确解决方案”。

于 2013-04-09T11:48:29.213 回答