20

我的同事和我有争执。我们正在编写一个处理大量数据的 .NET 应用程序。它接收数据元素,根据某些标准将它们的子集分组为块并处理这些块。

假设我们有类型的数据项Foo一个接一个地到达某个源(例如来自网络)。我们希望收集相关类型对象的集,从每个这样的子集Foo构造一个类型对象并处理类型对象。BarBar

我们中的一个人建议了以下设计。它的主要主题是IObservable<T>直接从我们组件的接口中公开对象。

// ********* Interfaces **********
interface IFooSource
{
    // this is the event-stream of objects of type Foo
    IObservable<Foo> FooArrivals { get; }
}

interface IBarSource
{
    // this is the event-stream of objects of type Bar
    IObservable<Bar> BarArrivals { get; }
}

/ ********* Implementations *********
class FooSource : IFooSource
{
    // Here we put logic that receives Foo objects from the network and publishes them to the FooArrivals event stream.
}

class FooSubsetsToBarConverter : IBarSource
{
    IFooSource fooSource;

    IObservable<Bar> BarArrivals
    {
        get
        {
            // Do some fancy Rx operators on fooSource.FooArrivals, like Buffer, Window, Join and others and return IObservable<Bar>
        }
    }
}

// this class will subscribe to the bar source and do processing
class BarsProcessor
{
    BarsProcessor(IBarSource barSource);
    void Subscribe(); 
}

// ******************* Main ************************
class Program
{
    public static void Main(string[] args)
    {
        var fooSource = FooSourceFactory.Create();
        var barsProcessor = BarsProcessorFactory.Create(fooSource) // this will create FooSubsetToBarConverter and BarsProcessor

        barsProcessor.Subscribe();
        fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
    }
}

另一个人提出了另一种设计,它的主题是使用我们自己的发布者/订阅者接口,并且仅在需要时在实现中使用 Rx。

//********** interfaces *********

interface IPublisher<T>
{
    void Subscribe(ISubscriber<T> subscriber);
}

interface ISubscriber<T>
{
    Action<T> Callback { get; }
}


//********** implementations *********

class FooSource : IPublisher<Foo>
{
    public void Subscribe(ISubscriber<Foo> subscriber) { /* ...  */ }

    // here we put logic that receives Foo objects from some source (the network?) publishes them to the registered subscribers
}

class FooSubsetsToBarConverter  : ISubscriber<Foo>, IPublisher<Bar>
{
    void Callback(Foo foo)
    {
        // here we put logic that aggregates Foo objects and publishes Bars when we have received a subset of Foos that match our criteria
        // maybe we use Rx here internally.
    }

    public void Subscribe(ISubscriber<Bar> subscriber) { /* ...  */ }
}

class BarsProcessor : ISubscriber<Bar>
{
    void Callback(Bar bar)
    {
        // here we put code that processes Bar objects
    }
}

//********** program *********
class Program
{
    public static void Main(string[] args)
    {
        var fooSource = fooSourceFactory.Create();
        var barsProcessor = barsProcessorFactory.Create(fooSource) // this will create BarsProcessor and perform all the necessary subscriptions

        fooSource.Run();  // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
    }
}

你觉得哪一个更好?暴露IObservable<T>并让我们的组件从 Rx 操作符创建新的事件流,或者定义我们自己的发布者/订阅者接口并在需要时在内部使用 Rx?

以下是有关设计需要考虑的一些事项:

  • 在第一个设计中,我们接口的消费者触手可及 Rx 的全部功能,并且可以执行任何 Rx 操作符。我们中的一个人声称这是一个优势,另一个人声称这是一个缺点。

  • 第二种设计允许我们在后台使用任何发布者/订阅者架构。第一个设计将我们与 Rx 联系在一起。

  • 如果我们希望使用 Rx 的强大功能,则需要在第二种设计中进行更多工作,因为我们需要将自定义发布者/订阅者实现转换为 Rx 并返回。它需要为每个希望进行事件处理的类编写胶水代码。

4

4 回答 4

16

暴露IObservable<T>不会以任何方式污染带有 Rx 的设计。事实上,设计决策与公开旧式 .NET 事件或滚动您自己的 pub/sub 机制之间的挂起完全相同。唯一的区别是IObservable<T>更新的概念。

需要证明吗?看看 F#,它也是一种 .NET 语言,但比 C# 更年轻。在 F# 中,每个事件都派生自IObservable<T>. 老实说,我认为抽象一个完全合适的 .NET pub/sub 机制是没有意义的——也就是说IObservable<T>——远离你自己的 pub/sub 抽象。只是暴露IObservable<T>

对我来说,滚动您自己的 pub/sub 抽象就像将 Java 模式应用于 .NET 代码一样。不同之处在于,在 .NET 中,观察者模式一直都有很好的框架支持,根本不需要自己动手。

于 2012-07-09T21:48:28.177 回答
8

首先,值得注意的是它IObservable<T>mscorlib.dllandSystem命名空间的一部分,因此暴露它在某种程度上等同于暴露IComparable<T>or IDisposable。这相当于选择 .NET 作为您的平台,您似乎已经这样做了。

现在,我不想提出答案,而是提出一个不同的问题,然后提出不同的心态,我希望(并相信)你会从那里开始管理。

您基本上是在问:我们是否想在整个系统中推广 Rx 运算符的分散使用?. 现在显然这不是很吸引人,因为您可能在概念上将 Rx 视为第 3 方库。

无论哪种方式,答案都不在于你们两个提出的基本设计,而在于这些设计的用户。我建议将您的设计分解为抽象级别,并确保 Rx 运算符的使用仅限于一个级别。当我谈到抽象级别时,我的意思是类似于OSI 模型,只是在同一个应用程序的代码中。

在我的书中,最重要的是不要站在设计的立场上“让我们创造一些可以在整个系统中使用和分散的东西,因此我们需要确保我们只做一次并且恰到好处,因为未来的所有岁月”。我更像是“让我们让这个抽象层产生其他层当前实现其目标所需的最小 API ” 。

关于您的两个设计的简单性,实际上很难判断,因为Foo并且Bar没有告诉我太多关于用例的信息,因此也没有告诉我可读性因素(顺便说一句,这些因素因用例而异)。

于 2012-07-09T16:27:05.870 回答
2

在第一个设计中,我们接口的消费者触手可及 Rx 的全部功能,并且可以执行任何 Rx 操作符。我们中的一个人声称这是一个优势,另一个人声称这是一个缺点。

我同意 Rx 的可用性是一个优势。列出为什么它是一个缺点的一些原因可能有助于确定如何解决这些问题。我看到的一些优点是:

  • 正如 Yam 和 Christoph 所反对的那样,IObservable/IObserver 从 .NET 4.0 开始就在 mscorlib 中,因此(希望)它将成为每个人都会立即理解的标准概念,例如事件或 IEnumerable。
  • Rx 的运营商。一旦您需要组合、过滤或以其他方式操作潜在的多个流,这些将变得非常有用。您可能会发现自己使用自己的界面以某种形式重做这项工作。
  • Rx 的合同。Rx 库执行一个定义明确的合约,并尽可能多地执行该合约。即使您需要自己制作运营商,Observable.Create也会执行合同执行工作(这就是IObservableRx 团队不建议直接实施的原因)。
  • Rx 库有很好的方法来确保您在需要时使用正确的线程。

我已经写了一些图书馆不包括我的案例的运营商。

第二种设计允许我们在后台使用任何发布者/订阅者架构。第一个设计将我们与 Rx 联系在一起。

我看不出暴露Rx 的选择对你如何在底层实现架构的影响比使用你自己的接口更重要。我会断言,除非绝对必要,否则您不应该发明新的 pub/sub 架构。

此外,Rx 库可能具有将简化“引擎盖下”部分的运算符。

如果我们希望使用 Rx 的强大功能,则需要在第二种设计中进行更多工作,因为我们需要将自定义发布者/订阅者实现转换为 Rx 并返回。它需要为每个希望进行事件处理的类编写胶水代码。

是和不是。如果我看到第二个设计,我首先想到的是:“这几乎就像 IObservable;让我们编写一些扩展方法来转换接口。” 胶水代码编写一次,随处使用。

胶水代码很简单,但如果你认为你会使用 Rx,只需公开 IObservable 并省去麻烦。

进一步的考虑

基本上,您的替代设计在 3 个关键方面与 IObservable/IObserver 不同。

  1. 没有办法取消订阅。这可能只是复制到问题时的疏忽。如果没有,如果你走那条路,强烈考虑添加它。
  2. 没有定义错误流向下游的路径(例如IObserver.OnError)。
  3. 无法指示流的完成(例如IObserver.OnCompleted)。这仅在您的基础数据旨在具有终止点时才相关。

您的替代设计还将回调作为操作返回,而不是将其作为接口上的方法,但我认为区别并不重要。

Rx 库鼓励使用函数式方法。您的FooSubsetsToBarConverter类将更适合作为IObservable<Foo>返回的扩展方法IObservable<Bar>。这稍微减少了混乱(为什么要在函数可以正常运行时创建具有一个属性的类)并且更适合 Rx 库其余部分的链式组合。您可以将相同的方法应用于备用接口,但如果没有操作员的帮助,这可能会更加困难。

于 2012-07-10T04:35:14.273 回答
0

另一种选择可能是:

interface IObservableFooSource : IFooSource
{
    IObservable<Foo> FooArrivals
    {
        get;
    }
}

class FooSource : IObservableFooSource 
{
    // Implement the interface explicitly
    IObservable<Foo> IObservableFooSource.FooArrivals
    {
        get
        {
        }
    }
}

这样,只有期望 IObservableFooSource 的客户端才能看到特定于 RX 的方法,而期望 IFooSource 或 FooSource 的客户端则不会。

于 2012-07-09T12:44:18.730 回答