我想在 F# 中实现 CCR 框架端口的概念(因为 .Net 4.0 不正式支持 CCR)。我知道可以使用 F# 中的MailboxProcessor类来执行此操作。这对于简单的接收仲裁器非常有效,但我需要交错仲裁器的概念,即我想控制哪些消息是专门处理的,哪些是同时处理的。到目前为止,我不知道在 F# 中实现这一点,我将感谢您的帮助。
2 回答
我对 CCR 不是很熟悉,但我会尝试回答 - 我的理解是交错仲裁器的行为有点像ReaderWriterLock
. 也就是说,您可以指定一些可以并行运行的操作(读取)和一些独占的操作(写入)。
以下代理是实现它的一种方法(未经测试,但类型检查:-))。代理公开了两个供公众使用的操作。最后一个是内部的:
type Message<'T> =
| PerformReadOperation of ('T -> Async<unit>)
| PerformWriteOperation of ('T -> Async<'T>)
| ReadOperationCompleted
通过发送代理
PerformReadOperation
,你给它一个操作,应该使用状态运行(一次)并且可能与其他读取操作并行。通过发送代理
PerformWriteOperation
,你给它一个计算新状态的操作,并且必须在所有读取操作完成后执行。(如果您使用的是不可变状态,那会让事情变得更简单——您不必等到读者完成!但下面的实现实现了等待)。
代理从一些初始状态开始:
let initial = // initial state
代理的其余部分使用两个循环实现:
let interleaver = MailboxProcessor.Start(fun mbox ->
// Asynchronously wait until all read operations complete
let rec waitUntilReadsComplete reads =
if reads = 0 then async { return () }
else mbox.Scan(fun msg ->
match msg with
| ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1))
| _ -> None)
let rec readingLoop state reads = async {
let! msg = mbox.Receive()
match msg with
| ReadOperationCompleted ->
// Some read operation completed - decrement counter
return! readingLoop state (reads - 1)
| PerformWriteOperation(op) ->
do! waitUntilReadsComplete reads
let! newState = op state
return! readingLoop newState 0
| PerformReadOperation(op) ->
// Start the operation in background & increment counter
async { do! op state
mbox.Post(ReadOperationCompleted) }
|> Async.Start
return! readingLoop state (reads + 1) }
readingLoop initial 0)
只是添加到 Tomas 建议的解决方案中,以防您不想将“ReadOperationCompleted”消息暴露给邮箱的消费者(因为此消息是内部的,并且在当前实现中可以由邮箱的任何消费者发送)可以在主邮箱处理器函数内创建一个单独的邮箱,它将接受两条消息:ReadOperationCompleted 和 WaitForReadCompleted(主邮箱将与 PostAndAsyncReply 一起使用),因为只有在所有已阅读的情况下才会响应此消息操作完成。此外,由“reads”表示的“read”计数将被移动到这个新的内部邮箱,因为该状态被这个内部邮箱封装。