I'm attempting to wrap a device class with an IObservable
. Without Rx its used like this:
device.IncomingData += data => { /* do something with float[] data */ };
device.Start(500);
// Something...
device.Stop();
So far I've got a wrapper class like below, which tracks how many observers are using the stream and stops and starts it accordingly.
Is there not a built-in way to track the observers with Rx?
private class ObservableWrapper
{
private int _observers;
public ObservableStreamer(IDevice device)
{
Stream = Observable.FromEvent<float[]>(
e =>
{
device.IncomingData += e;
int obs = Interlocked.Increment(ref _observers);
if (obs < 2)
device.Start();
},
e =>
{
device.IncomingData -= e;
int obs = Interlocked.Decrement(ref _observers);
if (obs < 1)
device.Stop();
});
}
public IObservable<float[]> Stream { get; private set; }
}
var wrap = new ObservableWrapper(device);
wrap.Stream.Subscribe(data => { /* do something with float[] data */ });