2

I'm attempting to wrap a device class with an IObservable. Without Rx its used like this:

device.IncomingData += data => { /* do something with float[] data */ };
device.Start(500);
// Something...
device.Stop(); 

So far I've got a wrapper class like below, which tracks how many observers are using the stream and stops and starts it accordingly.

Is there not a built-in way to track the observers with Rx?

private class ObservableWrapper
{
  private int _observers;

  public ObservableStreamer(IDevice device)
  {
    Stream = Observable.FromEvent<float[]>(
      e =>
        {
          device.IncomingData += e;
          int obs = Interlocked.Increment(ref _observers);

          if (obs < 2)
            device.Start();
        },
      e =>
        {
          device.IncomingData -= e;
          int obs = Interlocked.Decrement(ref _observers);

          if (obs < 1)
            device.Stop();
        });
  }

  public IObservable<float[]> Stream { get; private set; }
} 

var wrap = new ObservableWrapper(device);
wrap.Stream.Subscribe(data => { /* do something with float[] data */ });
4

2 回答 2

4

在构建自定义 observable 时,不要费心添加引用计数或连接共享。如果您需要这些功能,您可以分别使用RefCount和添加它们Publish。您也应该几乎永远没有理由实施I(Connectable)Observable自己。

至于您的具体用例,它可能是一个相当简单的扩展方法:

public static DeviceExtensions
{
    public static IObservable<float[]> AsObservable(this Device device)
    {
        return Observable.CreateWithDisposable<float[]>(obs =>
        {
            IDisposable disposable = Observable.FromEvent<float[]>(
                e => device.IncomingData += e,
                e => device.IncomingData -= e
                )
                .Finally(device.Stop)
                .Subscribe(obs);

            device.Start();

            return disposable;
        });
    }
}

现在你可以像这样使用它:

IObservable<float[]> observableData = device.AsObservable()
    .RefCount(); // If you need ref counting

observableData.Subscribe(data => {});

observableData.Subscribe(data => {});
于 2013-05-02T07:33:19.153 回答
2

听起来您实际上已经更好地映射到使用较少的IConnectableObservable。Connect 方法将调用 Start 并返回一个调用 Stop 的可处置对象。Subscribe 方法将转发到 Observable.FromEvent (没有所有的引用计数)。然后,您可以在其上使用RefCount将其转回常规的 IObservable。与您当前的实现一样,您必须对所有订阅使用相同的实例,否则计数将无法正常工作。

例如(未编译的代码传入):

class ObservableDevice : IConnectableObservable
{
    public ObservableDevice(IDevice device)
    {
        _device = device;
        //not strictly necessary to cache this, but this way you only
        //create it once
        _stream = Observable.FromEvent<...>(...);
    }

    private IDevice _device;
    private IObservable _stream;

    public IDisposable Connect()
    {
        //it's up to you if you want/need to guard against multiple starts
        _device.Start();
        return Disposable.Create(() => { _device.Stop(); });
    }

    public IDisposable Subscribe(IObserver observer)
    {
        //error checking if you want, or just defer to 
        //_stream.Subscribe's error checking
        return _stream.Subscribe(observer);
    }
}
于 2013-05-01T03:01:45.820 回答