5

我正在学习 F# 代理 ( MailboxProcessor)。

我正在处理一个非常规的问题。

  • 我有一个代理 ( dataSource),它是流数据的来源。数据必须由一组代理 ( dataProcessor) 处理。我们可以将dataProcessor其视为某种跟踪设备。
  • 数据流入的速度dataProcessor可能比处理其输入的速度要快。
  • 有一些延迟是可以的。但是,我必须确保代理始终专注于其工作,并且不会因过时的观察结果而堆积如山

我正在探索解决这个问题的方法。

一个想法是在. 当可用于接收和处理数据时,将发送可用的最新观察结果。此解决方案可能有效,但可能会变得复杂,因为可能需要阻止并重新激活;并将其状态传达给,从而导致双向通信问题。这个问题可能归结为消费者-生产者问题,但我不确定..dataSourcedataSourcedataProcessordataProcessordataSourceblocking queue

第二个想法dataProcessor处理消息排序。在这种架构中,将简单地在's 队列dataSource中发布更新。将用于获取其队列中可用的最新数据。这可能是要走的路。但是,我不确定在当前设计中是否可以清除消息队列,删除旧的过时消息。此外,这里写道:dataProcessordataProcessorScanMailboxProcessor

不幸的是,当前版本的 F# 中的 TryScan 函数在两个方面被破坏了。首先,重点是指定超时,但实现实际上并没有兑现它。具体来说,不相关的消息会重置计时器。其次,与其他 Scan 函数一样,消息队列在锁定下进行检查,该锁定防止任何其他线程在扫描期间发布消息,这可以是任意长的时间。因此,TryScan 函数本身往往会锁定并发系统,甚至会引入死锁,因为调用者的代码是在锁内评估的(例如,当锁下的代码阻塞等待时,从函数参数发布到 Scan 或 TryScan 可能会使代理死锁获取它已经处于的锁)。

让最新的观察结果反弹可能是个问题。这篇文章的作者@Jon Harrop 建议

我设法围绕它进行架构设计,最终的架构实际上更好。本质上,我急切地Receive使用我自己的本地队列来过滤所有消息和过滤器。

这个想法当然值得探索,但在开始使用代码之前,我会欢迎一些关于如何构建我的解决方案的意见。

谢谢你。

4

3 回答 3

2

听起来您可能需要邮箱处理器的破坏性扫描版本,我在您可能感兴趣的博客系列中使用 TPL Dataflow 实现了这一点。

我的博客目前正在维护中,但我可以向您指出降价格式的帖子。

第 1 部分第 2 部分第 3
部分

你也可以查看github上的代码

我还在潜伏的恐怖帖子中写了有关扫描的问题

希望有帮助...

于 2014-01-29T14:30:24.050 回答
1

tl;博士我会试试这个:从 FSharp.Actor 或 Zach Bray 的博客文章中获取邮箱实现,用 ConcurrentStack 替换 ConcurrentQueue(加上一些有界容量逻辑),并使用这个更改后的代理作为调度程序将消息从 dataSource 传递到军队作为普通 MBP 或 Actor 实现的数据处理器。

tl;dr2如果工作人员是稀缺且缓慢的资源,并且我们需要在工作人员准备好时处理最新消息,那么这一切都归结为具有堆栈而不是队列的代理(有一些有界容量逻辑)加上一个 BlockingQueue 的工人。Dispatcher 将一个就绪的 worker 出列,然后从堆栈中弹出一条消息并将这条消息发送给 worker。工作完成后,工作人员在准备好时(例如之前let! msg = inbox.Receive())将自己排入队列。调度程序消费者线程然后阻塞,直到任何工作人员准备好,而生产者线程保持有界堆栈更新。(有界堆栈可以用一个数组+偏移量+锁内的大小来完成,下面太复杂了)

细节

MailBoxProcessor 设计为只有一个消费者。这甚至在 MBP 的源代码中也有注释搜索单词 'DRAGONS' :))

如果您将数据发布到 MBP,那么只有一个线程可以从内部队列或堆栈中获取它。在您的特定用例中,我将直接使用 ConcurrentStack或更好地包装到BlockingCollection中:

  • 它将允许许多并发消费者
  • 它非常快且线程安全
  • BlockingCollection具有BoundedCapacity允许您限制集合大小的属性。它会抛出Add,但你可以抓住它或使用它TryAdd。如果 A 是主堆栈而 B 是备用堆栈,则TryAdd到 A,对 B 为 false并用Interlocked.ExchangeAdd交换两者,然后在 A 中处理所需的消息,清除它,建立一个新的备用 - 或者如果处理则使用三个堆栈A 可能比 B 再次变满的时间长;通过这种方式,您不会阻塞也不会丢失任何消息,但可以丢弃不需要的消息,这是一种受控方式。

BlockingCollection 具有 AddToAny/TakeFromAny 之类的方法,这些方法适用于 BlockingCollections 数组。这可能会有所帮助,例如:

  • dataSource 使用 ConcurrentStack 实现 (BCCS) 向 BlockingCollection 生成消息
  • 另一个线程使用来自 BCCS 的消息并将它们发送到处理 BCCS 的数组。你说有很多数据。您可以牺牲一个线程来无限期地阻塞和发送您的消息
  • 每个处理代理都有自己的 BCCS 或实现为调度程序向其发布消息的代理/参与者/MBP。在您的情况下,您只需要向一个处理器代理发送消息,因此您可以将处理代理存储在循环缓冲区中,以始终将消息发送到最近最少使用的处理器。

像这样的东西:

            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)

您可能想阅读堆数据结构而不是 ConcurrentStack 。如果您需要通过消息的某些属性(例如时间戳)来获取最新消息,而不是通过它们到达堆栈的顺序(例如,如果在传输和到达顺序 <> 创建顺序中可能存在延迟),您可以获得最新的使用堆消息。

如果您仍然需要代理语义/API,除了 Dave 的链接之外,您还可以阅读几个来源,并以某种方式对多个并发消费者采用实现:

  • Zach Bray 撰写的一篇关于高效 Actor 实现的有趣文章。在那里,您确实需要用一行或类似的行替换(在注释下// Might want to schedule this call on another thread.)该行,因为否则生产线程将消耗线程 - 对单个快速生产者不利。但是,对于如上所述的调度程序来说,这正是所需要的。execute trueasync { execute true } |> Async.Start

  • FSharp.Actor(aka Fakka开发分支和 FSharp MPB 源代码(上面的第一个链接)对于实现细节可能非常有用。FSharp.Actors 库已经冻结了几个月,但在开发分支中有一些活动。

  • 在这种情况下,不应错过Google Groups 中有关 Fakka 的讨论。

我有一个有点相似的用例,在过去的两天里,我研究了我在 F# Agents/Actors 上能找到的所有东西。这个答案是我自己尝试这些想法的一种TODO,其中一半是在编写它时诞生的。

于 2014-01-30T00:07:53.963 回答
1

最简单的解决方案是在收到邮件时贪婪地吃掉收件箱中的所有邮件,并丢弃除最新邮件之外的所有邮件。使用以下方法轻松完成TryReceive

let rec readLatestLoop oldMsg =
  async { let! newMsg = inbox.TryReceive 0
          match newMsg with
          | None -> oldMsg
          | Some newMsg -> return! readLatestLoop newMsg }
let readLatest() =
  async { let! msg = inbox.Receive()
          return! readLatestLoop msg }

当面临同样的问题时,我构建了一个更复杂、更高效的解决方案,我称之为可取消流,并在此处的 F# Journal 文章中进行了描述。这个想法是开始处理消息,然后如果它们被取代则取消该处理。如果正在进行大量处理,这会显着提高并发性。

于 2014-02-04T16:10:28.757 回答