有时,业务逻辑似乎能够通过一些递归定义的 observables 进行自然建模。这是一个例子:
interface Demo {
IObservable<CommandId> userCommands;
IObservable<IObservable<IProcessingState>> processes;
IObservable<CommandId> skippedCommands;
IObservable<(CommandId, CommandResult)> RunCommand(CommandId id);
}
interface IProcessingState {
bool IsProcessing {get;}
CommandId? ProcessingId {get;}
}
对于用户输入的每个命令,它应该在 proccess 中触发正在运行的进程,或者在 skippedCommands 中发出一个值。这种逻辑的一些直接翻译可能
var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => RunCommand(c))
如上面的代码所示,assign ofvalidCommands
和processes
是相互递归的,我们可以等效地定义processes
直接使用自身递归
var processes = userCommands.WithLatestFrom(processes)
.Where(x => !x.Item2.IsProcessing)
.Select(c => RunCommand(c))
但是我们不能prcesses
像这样在 C# 中定义 Observable。
我发现了几个可能相关的事情:
Observable.Generate
构造函数。但是,它似乎以同步方式折叠自己的状态,我不知道如何使用userCommands
observable 和RunCommand
inObservable.Generate
;RxJS 中的一些操作符
exhaust
,exhaustMap
虽然 Rx.Net 没有提供这个操作符,但是有一些第三方库提供了这些操作符,比如FSharp.Control.Reactive。实现类似于
let exhaustMap f source =
Observable.Create (fun (o : IObserver<_>) ->
let mutable hasSubscription = false
let mutable innerSub = None
let onInnerCompleted () =
hasSubscription <- false
innerSub |> Option.iter Disposable.dispose
let onOuterNext x =
if not hasSubscription then
hasSubscription <- true
f x |> subscribeSafeWithCallbacks
o.OnNext o.OnError onInnerCompleted
|> fun y -> innerSub <- Some y
source
|> subscribeSafeWithCallbacks
onOuterNext o.OnError o.OnCompleted)
但是,有两个问题。一个。直接使用此运算符不符合上述要求,跳过的命令将被忽略。我们可以稍微修改源代码以适应要求,但还有另一个问题 b. 该实现引入了两个局部可变变量和两个嵌套订阅。我不知道这在所有情况下是否都可以(会有数据竞争的风险吗?),并且更喜欢基于操作符组合而不是可变引用的解决方案
SodiumFRP提供了前向引用类型
StreamLoop
和CellLoop
. 根据功能响应式编程一书,这些前向引用类型的 Rx 替代方案将是Subject
,通过使用Subject
上面的递归构造,将其分为两个阶段。问题是由 Intro to Rx 指出的,使用Subject
需要手动管理更多状态,至少需要 dispose 主题,并且可能被迫使用 hot observables。我想知道是否存在不使用的解决方案Subject
在结果的最后一个值上使用
window
带边界的运算符(就在完成之前)RunCommand
,processes
上面可以构造一些方法,但是这个解决方案需要使用两次结束信号,这需要仔细处理(在尝试和调整时安静一段时间Take(1)
,,,,,重载操作员获得预期结果)同时发生的事件。zip
withLatestFrom
combineLatest
Window
是否有更好的解决方案或对上述解决方案的修改,尤其是仅使用运算符?