对不起,这个问题变得如此之大。我从一个简单的问题开始。我有历史报价数据。我想模拟一个交易代理和一个订单簿代理对数据和彼此做出反应。我是否可以使用另一个代理来控制事件流,以便前两个代理没有机会比在 R/T 中更快地对事件做出反应。- 但这似乎太模糊了,所以我经历了
- 文字墙 (TL:DR)
- 两个简短的段落(没有人知道我在问什么,因为没有问题)
- 发布了我不正确的代码和更多详细信息->又太长太模糊了
但是当我看到它时,我真的只是在问“你是怎么做到的?” 这确实不是 SO 的正确格式。正如丹尼尔指出的那样,这个问题太宽泛了,最后,菲尔的提示和良好的睡眠让我想出了一个解决方案,我在下面给出了解决方案。希望其他人能从中受益。我仍然对我的方法不满意,但我认为我的 Code Review 是一个更好的发布位置。
另外,感谢 SO F# 社区没有将我微薄的代表投票给遗忘!
open System
open System.IO
let src = __SOURCE_DIRECTORY__
let myPath = Path.Combine(src, "Test_data.txt")
// Create some test data
let makeTestDataFile path lineCount =
let now = System.DateTime.Now
File.WriteAllLines(path,
seq { for i in 1 .. lineCount do
let dt = now.AddSeconds(float i)
yield sprintf "%s,%d" (dt.ToString("MM/dd/yyyy hh:mm:ss.fff tt")) i })
makeTestDataFile myPath 10
感谢 Phil,我得到了一个工作原型:
type MsgType =
| HistEvent of DateTime * float
| AgentEvent of DateTime * float
type DataPoint = {time:DateTime; value:float}
type Agent<'T> = MailboxProcessor<'T>
type EventTrafficAgent () =
let event = new Event<DataPoint>()
let agent = Agent.Start(fun inbox ->
let rec loop eventQue now () = async {
let! msg = inbox.Receive()
// use some form of que managment to decide
let updatedQue =
match msg with
| HistEvent (dt, v) ->
let now = max now dt // log most recent date
// check if there are any pending events that can now be sent
// if so, send and remove
let updatedQue =
eventQue
|> List.filter(fun e ->
if e.time <= now then
event.Trigger(e)
let now = e.time
printfn "\tDequeing & Sending delayed event: {time = %s, value %f}" (e.time.ToString("mm:ss.fff")) e.value
false
else
true)
// just send the historical event as these are never delayed
event.Trigger({time = dt; value = v})
updatedQue
| AgentEvent (dt, v) ->
let tm = dt.AddSeconds(1.5) // time with lag added i.e. "intended time of arrival"
let cacheIt = tm >= now
// return an updated list of cached orders
(if cacheIt then
printfn "\tAdding to delayed que: {time = %s, value %f}" (tm.ToString("mm:ss.fff")) v
{time = tm; value=v} :: eventQue
else
printfn "\tJust sending without delay: {time = %s, value %f}" (tm.ToString("mm:ss.fff")) v
event.Trigger({time = tm; value = v})
eventQue)
return! loop updatedQue now ()
}
loop List.empty<DataPoint> DateTime.MinValue () )
member x.Post msg = agent.Post msg
member x.EventProduced = event.Publish
type OrderBookAgent () =
let event = new Event<DataPoint>()
let agent = Agent.Start(fun inbox ->
let rec loop () = async {
let! (msg:DataPoint) = inbox.Receive()
if msg.value = 42. then event.Trigger({time = msg.time; value = 99.})
return! loop ()
}
loop () )
member x.Post msg = agent.Post msg
member x.Publish = event.Publish
type TradingAgent () =
let event = new Event<DataPoint>()
let agent = Agent.Start(fun inbox ->
let rec loop () = async {
let! (msg:DataPoint) = inbox.Receive()
if msg.value = 7. then event.Trigger({time = msg.time; value = 42.})
return! loop ()
}
loop () )
member x.Post msg = agent.Post msg
member x.Publish = event.Publish
type StreamData(filePath, eventMgr:EventTrafficAgent) =
let sr = new StreamReader ((filePath:string))
member x.Reply() =
async { while not sr.EndOfStream do
let line = sr.ReadLine ()
let dtVal = line.Split(char(","))
let time =DateTime.Parse (dtVal.[0])
let value = Double.Parse(dtVal.[1])
do! Async.Sleep(250) // here to allow you to see it ticking by. set to 1 for full speed
do eventMgr.Post (HistEvent(time, value))}
|> Async.StartImmediate
let eventCop = new EventTrafficAgent()
let orderBook = new OrderBookAgent()
let tradeBot = new TradingAgent()
eventCop.EventProduced.Add(fun e -> printfn "event Cop publishing {time = %s, value %3f}" (e.time.ToString("mm:ss.fff")) e.value)
eventCop.EventProduced.Add(fun e -> orderBook.Post e )
eventCop.EventProduced.Add(fun e -> tradeBot.Post e )
orderBook.Publish.Add(fun e -> eventCop.Post (AgentEvent( e.time, e.value)) )
tradeBot.Publish.Add(fun e -> eventCop.Post (AgentEvent( e.time, e.value)) )
let stream = StreamData(myPath, eventCop )
do stream.Reply()
输出是
event Cop publishing {time = 26:23.265, value 3.000000}
event Cop publishing {time = 26:24.265, value 4.000000}
event Cop publishing {time = 26:25.265, value 5.000000}
event Cop publishing {time = 26:26.265, value 6.000000}
event Cop publishing {time = 26:27.265, value 7.000000}
Adding to delayed que: {time = 26:28.765, value 42.000000}
event Cop publishing {time = 26:28.265, value 8.000000}
event Cop publishing {time = 26:28.765, value 42.000000}
Dequeing & Sending delayed event: {time = 26:28.765, value 42.000000}
event Cop publishing {time = 26:29.265, value 9.000000}
Adding to delayed que: {time = 26:30.265, value 99.000000}
event Cop publishing {time = 26:30.265, value 99.000000}
Dequeing & Sending delayed event: {time = 26:30.265, value 99.000000}
event Cop publishing {time = 26:30.265, value 10.000000}
我想我剩下的唯一问题是使用 AsyncSeq<'T> 之类的东西将数据吸入事件管理器而不是像我现在所做的那样将其推入会更好。