10

我最近一直在阅读有关 IObservable 的内容。到目前为止,我已经查看了各种 SO 问题,并观看了有关他们可以做什么的视频。我认为的整个“推动”机制非常出色,但我仍在试图弄清楚一切究竟是做什么的。从我的阅读中,我猜想在某种程度上IObservable是可以“观察”的东西,并且IObservers是“观察者”。

所以现在我要尝试在我的应用程序中实现它。在开始之前,我想先弄清楚一些事情。我已经看到 IObservable 与 IEnumerable 相反,但是,在我的特定实例中,我真的看不到任何可以合并到我的应用程序中的地方。

目前,我大量使用事件,以至于我可以看到“管道”开始变得难以管理。我想,IObservable 可以帮助我。

考虑以下设计,这是我在应用程序中围绕 I/O 的包装器(仅供参考,我通常必须处理字符串):

我有一个名为IDataIO

public interface IDataIO
{
  event OnDataReceived;
  event OnTimeout:
  event OnTransmit;
}

现在,我目前有三个实现这个接口的类,这些类中的每一个都在某种程度上利用了异步方法调用,引入了某种类型的多线程处理:

public class SerialIO : IDataIO;
public class UdpIO : IDataIO;
public class TcpIO : IDataIO;

这些类中的每一个都有一个实例包含在我的最终类中,称为 IO(它还实现了 IDataIO - 遵循我的策略模式):

public class IO : IDataIO
{
  public SerialIO Serial;
  public UdpIO Udp;
  public TcpIO Tcp;
}

我已经利用策略模式来封装这三个类,以便IDataIO在运行时在不同实例之间进行更改时使其对最终用户“不可见”。正如您可以想象的那样,这在后台导致了相当多的“事件管道”。

那么,在我的情况下,如何在此处使用“推送”通知?我不想订阅事件(DataReceived 等),而是简单地将数据推送给任何感兴趣的人。我有点不确定从哪里开始。我仍在尝试玩弄 的想法/泛型类Subject,以及它的各种化身(ReplaySubject/AsynSubject/BehaviourSubject)。有人可以请教我这个(也许参考我的设计)?或者这根本不适合IObservable

PS。随时纠正我的任何“误解”:)

4

3 回答 3

9

Observables 非常适合表示数据流,因此您的DataReceived事件可以很好地建模为 observable 模式,例如IObservable<byte>or IObservable<byte[]>。您还可以获得额外的好处,OnError并且OnComplete非常方便。

在实现方面,很难说你的确切场景,但我们经常使用Subject<T>它作为底层源并调用OnNext来推送数据。也许像

// Using a subject is probably the easiest way to push data to an Observable
// It wraps up both IObservable and IObserver so you almost never use IObserver directly
private readonly Subject<byte> subject = new Subject<byte>();

private void OnPort_DataReceived(object sender, EventArgs e)
{
    // This pushes the data to the IObserver, which is probably just a wrapper
    // around your subscribe delegate is you're using the Rx extensions
    this.subject.OnNext(port.Data); // pseudo code 
}

然后,您可以通过属性公开主题:

public IObservable<byte> DataObservable
{
    get { return this.subject; } // Or this.subject.AsObservable();
}

您可以用 替换您的DataReceived事件,IDataIOIObservable<T>让每个策略类以他们需要的任何方式处理他们的数据并推送到Subject<T>.

另一方面,订阅 Observable 的任何人都可以像处理事件一样处理它(只需使用),或者您可以使用、、等Action<byte[]>在流上执行一些非常有用的工作。SelectWhereBuffer

private IDataIO dataIo = new ...

private void SubscribeToData()
{ 
    dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes);
}

private void On16Bytes(IList<byte> bytes)
{
    // do stuff
}

ReplaySubjectConnectableObservable当您知道您的订阅者将迟到但仍需要赶上所有事件时,/ s 非常棒。源缓存它推送的所有内容,并为每个订阅者重播所有内容。只有你可以说这是否是你真正需要的行为(但要小心,因为它会缓存所有显然会增加你的内存使用的东西)。

当我学习 Rx 时,我发现关于 Rx的http://leecampbell.blogspot.co.uk/博客系列对于理解理论非常有用(这些帖子现在有点过时了,API 也发生了变化,所以请注意这一点)

于 2012-06-25T13:48:03.467 回答
5

这绝对是 observables 的理想案例。班级可能会看到最大的IO进步。首先,让我们更改接口以使用可观察对象,看看组合类变得多么简单。

public interface IDataIO
{
    //you will have to fill in the types here.  Either the event args
    //the events provide now or byte[] or something relevant would be good.
    IObservable<???> DataReceived;
    IObservable<???> Timeout;
    IObservable<???> Transmit;
}

public class IO : IDataIO
{
    public SerialIO Serial;
    public UdpIO Udp;
    public TcpIO Tcp;

    public IObservable<???> DataReceived
    {
        get 
        {
            return Observable.Merge(Serial.DataReceived,
                                    Udp.DataReceived,
                                    Tcp.DataReceived);
        }
    }

    //similarly for other two observables
}

旁注:您可能会注意到我更改了接口成员名称。在 .NET 中,事件通常被命名<event name>并且引发它们的函数被调用On<event name>

对于生产类,您有一些取决于实际来源的选项。假设您正在使用 .NET SerialPort 类SerialIODataReceived返回一个IObservable<byte[]>. 由于 SerialPort 已经有一个接收数据的事件,您可以直接使用它来制作您需要的 observable。

public class SerialIO : IDataIO
{
    private SerialPort _port;

    public IObservable<byte[]> DataRecived
    {
        get
        {
            return Observable.FromEventPattern<SerialDataReceivedEventHandler,
                                               SerialDataReceivedEventArgs>(
                        h => _port.DataReceived += h,
                        h => _port.DataReceived -= h)
                   .Where(ep => ep.EventArgs.EventType == SerialData.Chars)
                   .Select(ep =>
                           {
                              byte[] buffer = new byte[_port.BytesToRead];
                              _port.Read(buffer, 0, buffer.Length);
                              return buffer;
                           });
        }
    }
}

对于没有现有事件源的情况,您可能需要使用 RichK 建议的主题。他的回答很好地涵盖了这种使用模式,所以我不会在这里重复。

您没有展示如何使用此接口,但根据用例,在这些类上设置其他函数 returnIObservable本身并完全取消这些“事件”可能更有意义。使用基于事件的异步模式,您必须将事件与调用的函数分开以触发工作,但使用可观察对象,您可以从函数中返回它们,以使您订阅的内容更加明显。这种方法还允许从每个调用返回的 observables 发送OnErrorOnCompleted消息来表示操作的结束。根据您对组合类的使用,我不希望这在这种特殊情况下有用,但要记住这一点。

于 2012-06-26T04:03:44.613 回答
0

使用 IObservable 代替事件

如果只对 nuget 包 rxx 具有的属性更改感兴趣

IObservable<string> obs=Observable2.FromPropertyChangedPattern(() => obj.Name)

(以及许多其他方法)


或者如果事件排除了属性更改/希望避免实施 INotifyPropertyChanged

class ObserveEvent_Simple
{
    public static event EventHandler SimpleEvent;
    static void Main()
    {          
       IObservable<string> eventAsObservable = Observable.FromEventPattern(
            ev => SimpleEvent += ev,
            ev => SimpleEvent -= ev);
    }
}

类似于来自http://rxwiki.wikidot.com/101samples#toc6的 u/Gideon Engelberth

https://rehansaeed.com/reactive-extensions-part2-wrapping-events/涵盖


这篇codeproject文章也致力于将事件转换为反应事件

https://www.codeproject.com/Tips/1078183/Weak-events-in-NET-using-Reactive-Extensions-Rx

并且还处理弱订阅

于 2017-10-14T19:34:55.167 回答