11

我正在尝试提出一个 Rx Builder 以在 F# 计算表达式语法中使用反应式扩展。如何修复它以使其不会炸毁堆栈?就像下面的 Seq 示例。是否有任何计划将 RxBuilder 的实现作为响应式扩展的一部分或作为 .NET Framework 未来版本的一部分?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore
4

5 回答 5

9

一个简短的回答是,Rx Framework 不支持使用这样的递归模式生成 observables,所以它不容易完成。用于 F# 序列的Combine操作需要一些可观察对象不提供的特殊处理。Rx 框架可能期望您Observable.Generate使用 LINQ 查询/F# 计算生成器生成可观察对象,然后使用它们来处理它们。

无论如何,这里有一些想法 -

首先,您需要替换Observable.mergeObservable.Concat. 第一个并行运行两个 observable,而第二个首先从第一个 observable 产生所有值,然后从第二个 observable 产生值。在此更改之后,代码段将在堆栈溢出之前至少打印约 800 个数字。

堆栈溢出的原因是Concat创建了一个调用Concat以创建另一个调用Concat等的可观察对象的可观察对象。解决此问题的一种方法是添加一些同步。如果您使用的是 Windows 窗体,则可以进行修改Delay,以便它在 GUI 线程上调度 observable(丢弃当前堆栈)。这是一个草图:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

要正确实现这一点,您必须编写自己的Concat方法,这非常复杂。这个想法是:

  • Concat 返回一些特殊类型,例如IConcatenatedObservable
  • 当递归调用该方法时,您将创建一个IConcatenatedObservable相互引用的链
  • Concat方法将查找此链,并且当有例如三个对象时,它将丢弃中间的一个(以始终保持链的长度最多为 2)。

对于 StackOverflow 的答案来说,这有点太复杂了,但对于 Rx 团队来说,这可能是一个有用的反馈。

于 2011-05-28T19:35:29.940 回答
9

请注意,这已在 Rx v2.0 中得到修复(正如这里已经提到的),更普遍地适用于所有排序运算符(Concat、Catch、OnErrorResumeNext),以及命令式运算符(If、While 等)。

基本上,您可以将此类运算符视为订阅终端观察者消息中的另一个序列(例如,Concat 在接收到当前的 OnCompleted 消息时订阅下一个序列),这就是尾递归类比的来源。

在 Rx v2.0 中,所有的尾递归订阅都被扁平化为一个类似队列的数据结构,一次处理一个,与下游观察者对话。这避免了观察者为连续的序列订阅而相互交谈的无限增长。

于 2012-09-05T17:00:55.657 回答
4

这已在Rx 2.0 Beta中得到修复。这是一个测试

于 2011-06-07T19:49:58.780 回答
3

这样的事情呢?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs

let rx = rxBuilder()

let rec f x = rx { yield x 
                   yield! f (x + 1) }

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore

do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/(为试验 RxBuilder 而创建)

xs 一次性没有接线。一旦我尝试连接一次性用品,它就会重新炸毁堆栈。

于 2011-06-01T00:01:53.460 回答
2

如果我们从这个计算表达式(又名 Monad)中删除语法糖,我们将有:

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

或者在 C# 中:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

这绝对不是尾递归。我认为如果您可以使其尾递归,那么它可能会解决您的问题

于 2011-05-29T08:35:39.707 回答