1

我想增强

public static IObservable<TSource> Create<TSource>(
    Func<IObserver<TSource>, Action> subscribe)
{...}

为了在 F# 中使用,我可以只使用标准 F# 类型,而不是使用 Function 或 Action 调用,即IObserver -> (unit -> unit).

我怎样才能做到这一点?

编辑:

添加完整示例。不知道为什么obsAction不起作用。

open System
open System.Reactive
open System.Reactive.Disposables
open System.Reactive.Linq


type Observable with
  static member Create(subscribe) = 
    Observable.Create(fun observer -> Action(subscribe observer))

let obsDispose (observer:IObserver<_>) =
    let timer = new System.Timers.Timer()
    timer.Interval <- 1000.00
    let handlerTick = new Timers.ElapsedEventHandler(fun sender args ->     observer.OnNext("tick"))
    let handlerElapse = new Timers.ElapsedEventHandler(fun sender args -> printfn "%A" args.SignalTime)
    timer.Elapsed.AddHandler(handlerTick)
    timer.Elapsed.AddHandler(handlerElapse)
    timer.Start()
    Disposable.Empty

let obsAction (observer:IObserver<_>) =
    let timer = new System.Timers.Timer()
    timer.Interval <- 1000.00
    let handlerTick = new Timers.ElapsedEventHandler(fun sender args -> observer.OnNext("tick"))
    let handlerElapse = new Timers.ElapsedEventHandler(fun sender args -> printfn "%A" args.SignalTime)
    timer.Elapsed.AddHandler(handlerTick)
    timer.Elapsed.AddHandler(handlerElapse)
    timer.Start()
    let action() =
        timer.Elapsed.RemoveHandler(handlerTick)
        timer.Elapsed.RemoveHandler(handlerElapse)
        timer.Dispose()
    action

let obsOtherAction (observer:IObserver<_>) =
    let timer = new System.Timers.Timer()
    timer.Interval <- 1000.00
    let handlerTick = new Timers.ElapsedEventHandler(fun sender args -> observer.OnNext("tick"))
    let handlerElapse = new Timers.ElapsedEventHandler(fun sender args -> printfn "%A" args.SignalTime)
    timer.Elapsed.AddHandler(handlerTick)
    timer.Elapsed.AddHandler(handlerElapse)
    timer.Start()
    new System.Action( fun () ->
        timer.Elapsed.RemoveHandler(handlerTick)
        timer.Elapsed.RemoveHandler(handlerElapse)
        timer.Dispose())

let worksNeverStops = obsDispose |> Observable.Create |> Observable.subscribe(fun time -> printfn "Time: %A" time)
let actionWorks = obsOtherAction |> Observable.Create |> Observable.subscribe(fun time -> printfn "Time: %A" time)
let doesNotWork = obsAction |> Observable.Create |> Observable.subscribe(fun time -> printfn "Time: %A" time)
4

1 回答 1

2

您面临的问题是 FP 陷阱。

在,

  static member Create(subscribe) = 
    Observable.Create(fun observer -> Action(subscribe observer))

的类型subscribeIObserver<_> -> unit -> unit

IObserver<_> -> unit -> unit 现在和IObserver<_> -> Actionwhere之间存在细微差别Action : unit -> unit。区别在于前者是咖喱,而后者不是。

当观察者订阅时,subscribe observer返回一个()可以应用于 getunit的方法 - 在应用最后一个之前,您的订阅方法永远不会真正被调用()- 直到它取消订阅时它才会被分离。

你可以通过强制它不被咖喱来克服它:

let action() = ... | let action = (subscribe observer)
Action(action)

更远:

Invoke如果您检查 IL,则FastFunc生成的等效 VB(函数引用在 VB 中更清晰)版本

 static member Create(subscribe) = 
    Observable.Create(fun observer -> Action(subscribe observer))

是:

Friend Function Invoke(ByVal arg As IObserver(Of a)) As Action
    Return New Action(AddressOf New Observable-Create-Static@27-1(Of a)(Me.subscribe, arg).Invoke)
End Function

并为:

  static member Create(subscribe) = 
    Observable.Create(fun observer ->
      let action = subscribe observer
      Action(action))

是:

Friend Function Invoke(ByVal arg As IObserver(Of a)) As Action
    Return New Action(AddressOf New Observable-Create-Static@28-1(Me.subscribe.Invoke(arg)).Invoke)
End Function
  1. AddressOf New Closure(Me.subscribe, arg).Invoke-> 在调用 dispose 操作之前不会调用 subscribe 函数。

  2. AddressOf New Closure(Me.subscribe.Invoke(arg)).Invoke-> subscribe 函数实际上被调用,结果操作按预期返回。

我希望现在清楚为什么第二种情况有效,而不是第一种。

于 2012-12-04T09:56:31.470 回答