2

来自 Don Syme 博客(http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus -twitter-sample.aspx ) 我试图实现一个 twitter 流监听器。我的目标是遵循 twitter api 文档的指导,该文档说“在构建高可靠性系统时,通常应该在处理之前保存或排队推文”。

所以我的代码需要有两个组件:

  • 堆积并处理每个状态/推文 json 的队列
  • 读取 twitter 流的东西,该流将 json 字符串中的推文转储到队列中

我选择以下:

  • 我发布每条推文的代理,它解码 json,并将其转储到数据库
  • 一个简单的http请求

我还想将插入数据库的任何错误转储到文本文件中。(对于所有错误,我可能会切换到主管代理)。

两个问题:

  • 我的策略在这里有用吗?如果我理解正确,代理的行为就像一个智能队列并异步处理它的消息(如果它的队列中有 10 个人,它将一次处理一堆,而不是等待第一个完成然后第二个等...), 正确的 ?
  • 根据 Don Syme 的帖子,while 之前的所有内容都是隔离的,因此 StreamWriter 和数据库转储是隔离的。但是因为我需要这个,所以我从不关闭我的数据库连接......?

代码看起来像:

let dumpToDatabase databaseName = 
   //opens databse connection 
   fun tweet -> inserts tweet in database

type Agent<'T> = MailboxProcessor<'T>



 let agentDump =
            Agent.Start(fun (inbox: MailboxProcessor<string>) ->
               async{
                   use w2 = new StreamWriter(@"\Errors.txt")
                   let dumpError  =fun (error:string) -> w2.WriteLine( error )
                   let dumpTweet =  dumpToDatabase "stream"
                   while true do 
                       let! msg = inbox.Receive()
                       try 
                           let tw = decode msg
                           dumpTweet tw
                       with 
                       | :? MySql.Data.MySqlClient.MySqlException as ex -> 
    dumpError (msg+ex.ToString() ) 
                        | _ as ex -> () 



                             }
                             )

    let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
    let parameters = "track=RT&"
    let stream_url = filter_url

    let stream = twitterStream MyCredentials stream_url parameters


    while true do 
        agentDump.Post(stream.ReadLine())

非常感谢 !

使用处理器代理编辑代码:

let dumpToDatabase (tweets:tweet list)= 
    bulk insert of tweets in database    

let agentProcessor = 
        Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
           async{
               while true do 
                       let! msg = inbox.Receive()
                       try
                          msg
                          |> List.map(decode)
                          |> dumpToDatabase 
                        with
                        | _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
                 }
                 )



let agentDump =
        Agent.Start(fun (inbox: MailboxProcessor<string>) ->
                  let rec loop messageList count = async{
                      try
                          let! newMsg = inbox.Receive()
                          let newMsgList = newMsg::messageList
                          if count = 10 then 
                               agentProcessor.Post( newMsgList )
                               return! loop [] 0
                          else                    
                               return! loop newMsgList (count+1)
                      with
                      | _ as ex -> Console.WriteLine("Dump "+ex.ToString())

                  }
                  loop [] 0)

let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url

let stream = twitterStream MyCredentials stream_url parameters


while true do 
    agentDump.Post(stream.ReadLine())
4

1 回答 1

5

我认为描述代理的最好方式是它是一个运行的进程,它保持某种状态并且可以与其他代理(或网页或数据库)进行通信。在编写基于代理的应用程序时,您通常可以使用多个相互发送消息的代理。

我认为创建一个从网络读取推文并将它们存储在数据库中的代理的想法是一个不错的选择(尽管您也可以将推文作为代理的状态保存在内存中)。

  • 我不会一直保持数据库连接打开 - MSSQL(和 MySQL 也可能)实现连接池,所以当你释放它时它不会自动关闭连接。这意味着每次需要将数据写入数据库时​​重新打开连接会更安全且同样有效。

  • 除非您希望收到大量错误消息,否则我可能也会对文件流执行相同的操作(写入时,您可以打开它,以便将新内容添加到末尾)。

F# 代理队列的工作方式是它一一处理消息(在您的示例中,您正在使用 等待消息inbox.Receive()。当队列包含多条消息时,您将一一获取它们(在循环中)。

  • 如果您想一次处理多条消息,您可以编写一个代理来等待 10 条消息,然后将它们作为列表发送给另一个代理(然后执行批量处理)。

  • 您还可以timeout为该方法指定参数Receive,因此您最多可以等待 10 条消息,只要它们都在一秒钟内到达 - 这样您就可以非常优雅地实现不长时间持有消息的批量处理。

于 2010-08-25T16:27:50.140 回答