来自 Don Syme 博客(http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus -twitter-sample.aspx ) 我试图实现一个 twitter 流监听器。我的目标是遵循 twitter api 文档的指导,该文档说“在构建高可靠性系统时,通常应该在处理之前保存或排队推文”。
所以我的代码需要有两个组件:
- 堆积并处理每个状态/推文 json 的队列
- 读取 twitter 流的东西,该流将 json 字符串中的推文转储到队列中
我选择以下:
- 我发布每条推文的代理,它解码 json,并将其转储到数据库
- 一个简单的http请求
我还想将插入数据库的任何错误转储到文本文件中。(对于所有错误,我可能会切换到主管代理)。
两个问题:
- 我的策略在这里有用吗?如果我理解正确,代理的行为就像一个智能队列并异步处理它的消息(如果它的队列中有 10 个人,它将一次处理一堆,而不是等待第一个完成然后第二个等...), 正确的 ?
- 根据 Don Syme 的帖子,while 之前的所有内容都是隔离的,因此 StreamWriter 和数据库转储是隔离的。但是因为我需要这个,所以我从不关闭我的数据库连接......?
代码看起来像:
let dumpToDatabase databaseName =
//opens databse connection
fun tweet -> inserts tweet in database
type Agent<'T> = MailboxProcessor<'T>
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
async{
use w2 = new StreamWriter(@"\Errors.txt")
let dumpError =fun (error:string) -> w2.WriteLine( error )
let dumpTweet = dumpToDatabase "stream"
while true do
let! msg = inbox.Receive()
try
let tw = decode msg
dumpTweet tw
with
| :? MySql.Data.MySqlClient.MySqlException as ex ->
dumpError (msg+ex.ToString() )
| _ as ex -> ()
}
)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
非常感谢 !
使用处理器代理编辑代码:
let dumpToDatabase (tweets:tweet list)=
bulk insert of tweets in database
let agentProcessor =
Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
async{
while true do
let! msg = inbox.Receive()
try
msg
|> List.map(decode)
|> dumpToDatabase
with
| _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
}
)
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
let rec loop messageList count = async{
try
let! newMsg = inbox.Receive()
let newMsgList = newMsg::messageList
if count = 10 then
agentProcessor.Post( newMsgList )
return! loop [] 0
else
return! loop newMsgList (count+1)
with
| _ as ex -> Console.WriteLine("Dump "+ex.ToString())
}
loop [] 0)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())