124

我目前正在掌握 .NET 的反应式扩展框架,并且正在通过我找到的各种介绍资源(主要是http://www.introtorx.com

我们的应用程序涉及许多检测网络帧的硬件接口,这些将是我的 IObservables,然后我有各种组件将消耗这些帧或对数据执行某种方式的转换并生成一种新类型的帧。例如,还有其他组件需要显示每第 n 帧。我确信 Rx 将对我们的应用程序有用,但是我正在为 IObserver 接口的实现细节而苦苦挣扎。

我一直在阅读的大多数(如果不是全部)资源都说我不应该自己实现 IObservable 接口,而是使用提供的函数或类之一。根据我的研究,创建一个似乎可以满足Subject<IBaseFrame>我的需求,我将拥有从硬件接口读取数据然后调用Subject<IBaseFrame>实例的 OnNext 函数的单线程。然后,不同的 IObserver 组件将接收来自该主题的通知。

我的困惑来自本教程附录中给出的建议,其中说:

避免使用主题类型。Rx 实际上是一种函数式编程范式。使用主题意味着我们现在正在管理状态,这可能会发生变化。同时处理变异状态和异步编程是非常困难的。此外,许多运算符(扩展方法)都经过精心编写,以确保订阅和序列的正确和一致的生命周期得以维持;当你介绍主题时,你可以打破这个。如果您明确使用主题,未来的版本也可能会出现显着的性能下降。

我的应用程序对性能非常关键,我显然会在使用 Rx 模式之前测试它的性能,然后再将其用于生产代码;但是我担心我正在使用 Subject 类做一些违背 Rx 框架精神的事情,并且该框架的未来版本会损害性能。

有没有更好的方法来做我想做的事?无论是否有任何观察者,硬件轮询线程都将持续运行(否则硬件缓冲区将备份),因此这是一个非常热的序列。然后我需要将接收到的帧传递给多个观察者。

任何建议将不胜感激。

4

5 回答 5

86

好的,如果我们忽略我的教条方式并忽略“主题是好/坏”。让我们看看问题空间。

我敢打赌,你要么拥有 2 种系统风格中的 1 种,要么需要迎合。

  1. 消息到达时系统引发事件或回调
  2. 您需要轮询系统以查看是否有任何消息要处理

对于选项 1,很简单,我们只需用适当的 FromEvent 方法包装它就可以了。去酒吧!

对于选项 2,我们现在需要考虑如何轮询以及如何有效地执行此操作。此外,当我们获得价值时,我们如何发布它?

我想你会想要一个专用的轮询线程。您不希望其他编码人员敲打 ThreadPool/TaskPool 并让您处于 ThreadPool 饥饿的境地。或者,您不想要上下文切换的麻烦(我猜)。所以假设我们有自己的线程,我们可能会有某种 While/Sleep 循环,我们坐在那里进行轮询。当检查发现一些消息时,我们会发布它们。好吧,所有这些听起来都非常适合 Observable.Create。现在我们可能不能使用 While 循环,因为它不允许我们返回 Disposable 以允许取消。幸运的是,您已经阅读了整本书,因此精通递归调度!

我想这样的事情可能会奏效。#未测试

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

我真的不喜欢主题的原因是,这通常是开发人员对问题没有真正清晰设计的情况。攻入一个主题,到处戳它,然后让可怜的支持开发者猜测 WTF 正在继续。当您使用 Create/Generate 等方法时,您正在本地化对序列的影响。您可以通过一种方法查看所有内容,并且您知道没有其他人会产生令人讨厌的副作用。如果我看到一个主题字段,我现在必须去寻找一个班级中所有正在使用它的地方。如果某个 MFer 公开暴露了一个,那么所有的赌注都没有了,谁知道这个序列是如何使用的!异步/并发/接收很难。你不需要让副作用和因果关系编程更让你头晕目眩,让事情变得更难。

于 2013-01-22T14:16:40.513 回答
43

一般来说,您应该避免使用Subject,但是对于您在这里所做的事情,我认为它们工作得很好。当我在 Rx 教程中遇到“避免主题”消息时,我问了一个类似的问题。

引用Dave Sexton(来自 Rxx)

“主题是 Rx 的有状态组件。当您需要创建类似事件的可观察对象作为字段或局部变量时,它们非常有用。”

我倾向于将它们用作 Rx 的入口点。因此,如果我有一些代码需要说“发生了一些事情”(就像你有的那样),我会使用 a Subjectand call OnNext。然后将其公开IObservable为其他人订阅(您可以AsObservable()在您的主题上使用以确保没有人可以投射到主题并搞砸事情)。

您也可以通过 .NET 事件和 use 来实现这一点FromEventPattern,但如果我只想将事件变成一个IObservable无论如何,我看不到有一个事件而不是 a 的好处Subject(这可能意味着我错过了这里有东西)

但是,您应该非常强烈地避免IObservable使用 a 订阅 a Subject,即不要将 a 传递给SubjectIObservable.Subscribe方法。

于 2013-01-18T19:56:31.403 回答
34

通常,当您管理一个主题时,您实际上只是在重新实现 Rx 中已经存在的功能,而且可能不是以一种健壮、简单和可扩展的方式。

当您尝试将一些异步数据流调整到 Rx 中(或从当前非异步的数据流创建异步数据流)时,最常见的情况通常是:

  • 数据源是一个事件:正如 Lee 所说,这是最简单的情况:使用 FromEvent 并前往酒吧。

  • 数据源来自同步操作,并且您想要轮询更新,(例如 Web 服务或数据库调用):在这种情况下,您可以使用 Lee 建议的方法,或者对于简单的情况,您可以使用类似Observable.Interval.Select(_ => <db fetch>). 您可能希望使用 DistinctUntilChanged() 来防止在源数据中没有任何更改时发布更新。

  • 数据源是某种调用您的回调的异步 api:在这种情况下,使用 Observable.Create 连接您的回调以在观察者上调用 OnNext/OnError/OnComplete。

  • 数据源是一个阻塞直到有新数据可用的调用(例如一些同步套接字读取操作):在这种情况下,您可以使用 Observable.Create 包装从套接字读取并发布到 Observer.OnNext 的命令式代码读取数据时。这可能类似于您对主题所做的事情。

使用 Observable.Create 与创建一个管理 Subject 的类相当等同于使用 yield 关键字与创建一个实现 IEnumerator 的整个类。当然,你可以把 IEnumerator 写得像 yield 代码一样干净和好公民,但是哪个封装更好,感觉设计更整洁呢?Observable.Create 与管理 Subjects 也是如此。

Observable.Create 为您提供了一个干净的模式,用于惰性设置和干净的拆卸。您如何通过包装主题的类来实现这一点?你需要某种 Start 方法......你怎么知道什么时候调用它?还是你总是启动它,即使没有人在听?完成后,如何让它停止从套接字读取/轮询数据库等?你必须有某种 Stop 方法,而且你不仅要访问你订阅的 IObservable,还要访问最初创建 Subject 的类。

使用 Observable.Create,一切都集中在一个地方。Observable.Create 的主体在有人订阅之前不会运行,所以如果没有人订阅,你就永远不会使用你的资源。并且 Observable.Create 返回一个 Disposable ,它可以干净地关闭您的资源/回调等 - 当 Observer 取消订阅时调用它。你用来生成 Observable 的资源的生命周期与 Observable 本身的生命周期紧密相关。

于 2014-08-06T06:58:29.210 回答
8

引用的块文本几乎解释了为什么您不应该使用Subject<T>,但更简单地说,您正在结合观察者和可观察者的功能,同时在两者之间注入某种状态(无论您是封装还是扩展)。

这就是你遇到麻烦的地方;这些责任应该彼此分开和区别开来。

也就是说,在您的具体情况下,我建议您将您的担忧分解为更小的部分。

首先,您的线程很热,并且始终监视硬件以获取发出通知的信号。你通常会怎么做? 事件。所以让我们从那个开始。

让我们定义EventArgs您的事件将触发。

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

现在,将触发事件的类。请注意,这可能是一个静态类(因为您总是有一个线程在运行监视硬件缓冲区),或者您按需调用的订阅. 您必须根据需要进行修改。

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

所以现在你有一个公开事件的类。Observables 可以很好地处理事件。如此之多,以至于IObservable<T>如果您遵循标准事件模式,通过 class 上的静态FromEventPattern方法将事件流(将事件流视为事件的多次触发)转换为实现的Observable一流支持。

使用您的事件源和FromEventPattern方法,我们可以轻松地创建一个IObservable<EventPattern<BaseFrameEventArgs>>EventPattern<TEventArgs>该类体现了您在 .NET 事件中看到的内容,特别是派生自的实例EventArgs和表示发送者的对象),如下所示:

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

当然,你想要一个IObservable<IBaseFrame>, 但这很简单,使用类上的Select扩展方法Observable来创建一个投影(就像你在 LINQ 中一样,我们可以将所有这些包装在一个易于使用的方法中):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}
于 2013-01-18T18:02:26.463 回答
-1

概括主题不适合用于公共接口是不好的。虽然这肯定是正确的,但这不是反应式编程方法的样子,但对于您的经典代码来说,它绝对是一个很好的改进/重构选项。

如果您有一个带有公共 set 访问器的普通属性并且您想通知更改,那么没有什么反对用 BehaviorSubject 替换它的。INPC 或其他其他事件并不是那么干净,它让我个人感到厌烦。为此,您可以并且应该将 BehaviorSubjects 用作公共属性而不是普通属性,并放弃 INPC 或其他事件。

此外,主题界面使您界面的用户更加了解您的属性的功能,并且更有可能订阅而不是仅仅获得价值。

如果您希望其他人收听/订阅属性的更改,最好使用它。

于 2018-06-14T06:07:08.107 回答