3

我有一个Action<Action> operation它做一些长期运行的工作并通过提供的Action参数发送心跳。在我调用它之前,我想设置一个IObservable<Unit>在每次操作发送心跳时产生一个 Next 元素。 我怎么做?

创建IObservable并不复杂,但我猜从概念上最简单的方法是从事件创建它,但遗憾的是你不能在方法体中创建事件(除非你知道一种方法)。我可以使用Observable.FromEvent(Action<Action> addHandler, Action<Action> removeHandler), 但这需要丑陋的封闭式 Actions 才能将方法调用转发到(参见下面的示例)。
有没有更优雅的方式?

Action forward = null;
Action sendHeartbeat = () => { if (forward != null) forward(); };
//ugly, since it does not scale to multiple observers:
IObservable<Unit> heartbeatObs =
  Observable.FromEvent(handler => {
    forward = handler;
  }, _ => { forward = null; });
operation(sendHeartbeat);

即使我没有使用零维前向动作,而是使用它的集合,它也会很丑陋,因为我将重新实现+=and-=运算符EventHandlers

4

1 回答 1

1

不确定我是否完全理解你的问题,但如果我有,你可以做这样的事情,它使用 aSubject来传输心跳。您可以在命名空间中找到主题System.Reactive.Subjects

 // set up a subject
 Subject<Unit> heartbeatSubject=new Subject<Unit>();

 // set up the heartbeataction to push units down the subject
 Action sendHeartbeat = () => { heartbeatSubject.OnNext(Unit.Default); };

 // use the heartbeat subject somehow.
 heartbeatSubject.Subscribe(_=>Console.WriteLine("Operation produced a heartbeat"));

 // start the operation
 operation(sendHeartbeat);

编辑

正如您在评论中提到的,并且根据官方 RX 行,最好避免使用主题,除非在演示/示例代码中。我刚刚尝试过使用 Observable.Create,这可能会更好吗?

Action<Action> longRunningAction=(
    hb => { for(int i=0;i<100;i++) { hb(); Thread.Sleep(1000);}});

var ob=Observable.Create<Unit>(
(IObserver<Unit> observer) =>
{
    longRunningAction(()=>observer.OnNext(Unit.Default));
    return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});

ob.Subscribe(_=>Console.WriteLine("Received a heartbeat"));
于 2013-07-24T10:38:39.700 回答