2

我正在尝试评估使用 Rx 从发布/订阅模式创建序列(即经典观察者模式,其中下一个元素由生产者发布)。这与 .net 事件基本相同,只是我们需要对其进行概括,以便不需要事件,因此我无法利用 Observable.FromEvent。我玩过 Observable.Create 和 Observable.Generate 并发现自己最终不得不编写代码来处理 pub/sub(即我必须编写生产者/消费者代码来存储已发布的项目,然后通过用它调用 IObserver.OnNext() ),所以看起来我并没有真正利用 Rx ......

我是否在寻找正确的路径,或者这是否适合 Rx?

谢谢

4

2 回答 2

2

使用 RX 绝对适合发布/订阅。这是一个演示,说明了使用IObservableRX 的最简单的发布/订阅模式。

使用 NuGet 将 Reactive Extensions (RX) 添加到您的项目中,搜索rx-main并安装Reactive Extensions - Main Library.

using System;
using System.Reactive.Subjects;

namespace RX_2
{
    public static class Program
    {
        static void Main(string[] args)
        {
            Subject<int> stream = new Subject<int>();

            stream.Subscribe(
                o =>
                {
                    Console.Write(o);
                });

            stream.Subscribe(
                o =>
                {
                    Console.Write(o);
                });

            for (int i = 0; i < 5; i++)
            {
                stream.OnNext(i);
            }

            Console.ReadKey();
        }
    }
}

执行时,代码输出如下:

0011223344
于 2014-08-20T11:14:02.177 回答
2

IObservables您的发布者只是将一些作为属性公开。而你的消费者只是Subscribe给他们(或者在订阅之前做他们想要的任何 Rx-fu)。

有时这就像Subjects在您的发布者中使用一样简单。有时它更复杂,因为您的发布者实际上正在观察其他一些可观察的过程。

这是一个愚蠢的例子:

public class Publisher
{
    private readonly Subject<Foo> _topic1;

    /// <summary>Observe Foo values on this topic</summary>
    public IObservable<Foo> FooTopic
    {
       get { return _topic1.AsObservable(); }
    }

    private readonly IObservable<long> _topic2;

    /// <summary>Observe the current time whenever our clock ticks</summary>
    public IObservable<DateTime> ClockTickTopic
    {
        get { return _topic2.Select(t => DateTime.Now); }
    }

    public Publisher()
    {
         _topic1 = new Subject<Foo>();
         // tick once each second
         _topic2 = Observable.Interval(TimeSpan.FromSeconds(1));
    }

    /// <summary>Let everyone know about the new Foo</summary>
    public NewFoo(Foo foo) { _topic1.OnNext(foo); }
}


// interested code...
Publisher p = ...;
p.FooTopic.Subscribe(foo => ...);

p.ClickTickTopic.Subscribe(currentTime => ...);

// count how many foos occur during each clock tick
p.FooTopic.Buffer(p.ClockTickTopic)
    .Subscribe(foos => Console.WriteLine("{0} foos during this interval", foos.Count));
于 2013-07-31T18:06:05.530 回答