1

我有以下推文流类。它有 TweetReceived 事件,可以与我系统的其他组件一起使用。

它似乎工作正常,但我觉得它比它应该的更复杂。

是否有任何工具可以为我提供此功能而无需自己实现 mbox/事件机制?

您还建议使用 asyncSeq 而不是 IObservable 吗?

谢谢!

type TweetStream ( cfg:oauth.Config) =
    let token = TwitterToken.Token (cfg.accessToken,
                                    cfg.accessTokenSecret,
                                    cfg.appKey, 
                                    cfg.appSecret)

    let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json")

    let event = new Event<_>()

    let agent = MailboxProcessor.Start(fun (mbox) ->
        let rec loop () =
            async {
                let! msg = mbox.Receive()
                do event.Trigger(msg)
                return! loop()
            }
        loop ()) 

    member x.TweetReceived = event.Publish

    member x.Start () =
        Task.Factory.StartNew(fun () -> stream.StartStream(token, agent.Post))
        |> ignore

    member x.Stop = stream.StopStream

更新:感谢 Thomas 对第二个问题的快速(一如既往)回答。

我的第一个问题可能有点不清楚,所以我重构了代码以使类 AgentEvent 可见,我重新表述第一个问题:有没有办法更容易地实现 AgentEvent 中的逻辑?这个逻辑是否已经在某个地方实现了?

我问这个是因为它感觉像是一种常见的使用模式。

type AgentEvent<'t>()=
    let event = new Event<'t>()

    let agent = MailboxProcessor.Start(fun (mbox) ->
        let rec loop () =
            async {
                let! msg = mbox.Receive()
                do event.Trigger(msg)
                return! loop()
            }
        loop ()) 
    member x.Event = event.Publish
    member x.Post = agent.Post

type TweetStream ( cfg:oauth.Config) =
    let token = TwitterToken.Token (cfg.accessToken,
                                    cfg.accessTokenSecret,
                                    cfg.appKey, 
                                    cfg.appSecret)

    let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json")

    let agentEvent = AgentEvent()

    member x.TweetReceived = agentEvent.Event

    member x.Start () =
        Task.Factory.StartNew(fun () -> stream.StartStream(token, agentEvent.Post))
        |> ignore

    member x.Stop = stream.StopStream
4

1 回答 1

4

我认为这IObservable是发布事件的正确抽象。至于处理它们,我会使用 Reactive Extensions 或 F# Agents ( MailboxProcessor),这取决于你想要做什么。

请注意,F# 自动将事件表示为IObservable值(实际上IEvent,但它继承自 observable),因此您可以直接在TweetReceived.

什么是正确的表示?

  • 的要点asyncSeq是它可以让你控制数据生成的速度——就像async你必须启动它才能真正完成工作并获得一个值——所以如果你可以开始一些操作(例如下载接下来的几个字节)以获得下一个值

  • IObservable当您无法控制数据源时很有用 - 当它只是不断产生值并且您无法暂停它时 - 这似乎更适合推文。

至于处理,我认为 Reactive Extensions 在它们已经实现了您需要的操作时很好。当您需要编写一些自定义逻辑(在 Rx 中不容易表达)时,使用 Agent 是编写自己的类似 Rx 函数的好方法。

于 2014-01-11T18:44:11.517 回答