5

我对下面的代码示例和人们的想法有点好奇。这个想法是从 NetworkStream (~20 msg/s) 中读取数据,而不是在 main 中工作,而是将内容传递给 MainboxProcessor 以在完成后处理并取回内容以进行绑定。

通常的方法是使用 PostAndReply,但我想在 C# 中绑定到 ListView 或其他控件。无论如何,必须对 LastN 项和过滤做魔术。另外,Rx 有一些错误处理。

下面的示例观察2..10中的数字并返回“hello X”。在8时,它像 EOF 一样停止。将其设置为 ToEnumerable 是因为其他线程在其他线程之前完成,但它也适用于订阅。

困扰我的是:

  1. 以递归方式传递 Subject(obj)。我认为其中大约 3-4 个没有任何问题。好主意?
  2. 对象的生命周期。

open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq  // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency

type SerializedLogger() = 

    let _letters = new Subject<string>()
    // create the mailbox processor
    let agent = MailboxProcessor.Start(fun inbox -> 

        // the message processing function
        let rec messageLoop (letters:Subject<string>) = async{

            // read a message
            let! msg = inbox.Receive()

            printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
            do! Async.Sleep 100
            // write it to the log    
            match msg with
            | 8 -> letters.OnCompleted() // like EOF.
            | x -> letters.OnNext(sprintf "hello %d" x)

            // loop to top
            return! messageLoop letters
            }

        // start the loop
        messageLoop _letters
        )

    // public interface
    member this.Log msg = agent.Post msg
    member this.Getletters() = _letters.AsObservable()

/// Print line with prefix 1.
let myPrint1 x = printfn "onNext - %s,  Thread: %d" x  Thread.CurrentThread.ManagedThreadId

// Actions
let onNext = new Action<string>(myPrint1)
let onCompleted = new Action(fun _ -> printfn "Complete")

[<EntryPoint>]
let main argv = 
    async{
    printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId

    // test
    let logger = SerializedLogger()
    logger.Log 1 // ignored?

    let xObs = logger
                .Getletters() //.Where( fun x -> x <> "hello 5")
                .SubscribeOn(Scheduler.CurrentThread)
                .ObserveOn(Scheduler.CurrentThread)
                .ToEnumerable() // this
                //.Subscribe(onNext, onCompleted) // or with Dispose()

    [2..10] |> Seq.iter (logger.Log) 

    xObs |> Seq.iter myPrint1

    while true 
        do 
        printfn "waiting"
        System.Threading.Thread.Sleep(1000)

    return 0
    } |> Async.RunSynchronously // return an integer exit code
4

1 回答 1

5

我做过类似的事情,但使用普通的 F#Event类型而不是Subject. 它基本上可以让你创建IObservable和触发它的订阅——就像你使用更复杂的Subject. 基于事件的版本将是:

type SerializedLogger() = 
   let letterProduced = new Event<string>()
   let lettersEnded = new Event<unit>()
   let agent = MailboxProcessor.Start(fun inbox -> 
     let rec messageLoop (letters:Subject<string>) = async {
       // Some code omitted
       match msg with
       | 8 -> lettersEnded.Trigger()
       | x -> letterProduced.Trigger(sprintf "hello %d" x)
       // ...

member this.Log msg = agent.Post msg
member this.LetterProduced = letterProduced.Publish
member this.LettersEnded = lettersEnded.Publish

重要的区别是:

  • Event无法触发OnCompleted,因此我改为公开了两个单独的事件。这是相当不幸的!鉴于这Subject与所有其他方面的事件非常相似,这可能是使用主题而不是普通事件的一个很好的理由。

  • 使用的好处Event是它是标准的 F# 类型,因此您不需要代理中的任何外部依赖项。

  • 我注意到您的评论指出第一次调用Log被忽略。那是因为您仅在此调用发生后才订阅事件处理程序。我认为您可以在此处对主题思想使用 ReplaySubject 变体- 它会在您订阅它时重播所有事件,因此之前发生的事件不会丢失(但缓存有成本)。

总之,我认为 usingSubject可能是一个好主意 - 它与 using 本质上是相同的模式Event(我认为这是从代理公开通知的非常标准的方式),但它可以让你触发OnCompleted. 由于缓存成本,我可能不会使用ReplaySubject- 您只需确保在触发任何事件之前进行订阅。

于 2016-08-19T10:39:45.827 回答