问题标签 [mailboxprocessor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
110 浏览

f# - 邮箱处理器,最新消息

设置与类似。一个代理 ( dataSource) 正在生成数据,而单个代理 ( dataProcessor) 正在处理数据。生成的数据多于dataProcessor处理能力,我对处理所有消息不感兴趣,只处理最新的数据。

Jon Harrop 在那里提出的一种可能的解决方案“是在收到邮件时贪婪地吃掉收件箱中的所有邮件,并丢弃除最新邮件之外的所有邮件”。另一种方法不是监听所有消息,而是dataProcessor获取PostAndReply dataSource 最新的数据。

这些方法的优缺点是什么?

0 投票
2 回答
106 浏览

.net - MailboxProcessor 在 Finalize 期间崩溃

此代码在 Mono (5.4.1.7) 上运行。

我正在使用 F# 的代理来处理我的 Web 应用程序中的大量数据处理,其中一条消息是 Shutdown。处理已发布的 Shutdown 消息时,代理会清理一些内容并停止其消息循环。这很好用,但如果我尝试从Finalize(). 我设法重现了这个:

当然,在实际应用程序中,它们与控制台打印无关。

这是我得到的堆栈跟踪:

let console = new ConsoleAgent()更重要的是,如果对象通过 dispose 模式正确处理(例如更改为use console = new ConsoleAgent()),则不会发生这种情况。我不能在我自己的代码中真正做到这一点而不向后弯腰,因为我没有直接引用这些代理(其中有很多同时运行),但我不应该让他们通过反正垃圾收集器?

这是我的错,F# 的错,还是 Mono 的错?现在,我已经将 Dispose() 方法的相关部分包装在一个只记录异常的 try/catch 中,但这感觉真的很脏。

0 投票
1 回答
139 浏览

f# - 使用限制队列将结果返回给调用者

基于片段和答案,是否可以将结果从限制队列返回给调用者?我已经尝试过PostAndAsyncReply在频道上接收回复,但如果我使用 Enqueue 管道它会引发错误。这是代码。

欣赏围绕队列或邮箱设计模式的基于 F# 核心原版的解决方案。

问题

问题是能够根据油门(一次最多 3 个)异步调用函数,从数组中传递每个项目,等待整个队列/数组直到它完成,同时收集所有结果,然后将结果返回给呼叫者,召集者。(将结果返回给调用者是这里待处理的内容)

被叫代码

来电代码

0 投票
1 回答
103 浏览

f# - API 速率限制器间歇性挂起

我编写了一个简单的(我认为...)速率限制器,以将事件驱动系统保持在我们许可的 API 命中限制之下。由于某种原因,它有时会在发送 400-500 个请求后卡住。

我最好的想法是我搞砸了等待功能,所以在某些情况下它永远不会返回,但我无法找到有缺陷的逻辑。另一个想法是我搞砸了导致问题的异步/任务互操作。它总是先工作,然后再工作。单个实例ApiRateLimiter在多个组件之间共享,以便在系统范围内遵守命中限制。

一些请求很大并且需要一点时间,但没有什么可以导致我在这件事上看到的请求发送完全死亡。

感谢您花点时间查看!

0 投票
2 回答
366 浏览

f# - 等待邮箱处理器

是否可以在邮箱处理器上等待,以下代码在 F# 交互中工作,但有没有办法在应用程序或单元测试中等待它?

0 投票
1 回答
231 浏览

asynchronous - F# 事件在异步工作流中不起作用

我想对代理进行事后回复。基本上,代理会触发一个事件,然后回复呼叫者。但是,我要么不断收到超时错误,要么事件无法正确触发。我尝试做 Post-Fire,它停止了超时错误,但事件没有触发。

这是一个简单的实验,它重复创建一个函数来查找 Collat​​z 系列中的下一个数字,然后调用自身返回该值,直到它达到 1。

似乎发生的是触发器只触发一次。我尝试尝试我能想到但没有进展的 Async.RunSynchronously / Async.Start / StartChild / SynchronizationContext 的每种组合。我发现了一个与我正在做的类似的博客,但这对我也没有帮助

编辑 感谢 Fyodor Soikin 指出我的疏忽。最初的问题仍然存在,我希望同时触发事件并回复结果,但会超时。

0 投票
2 回答
384 浏览

multithreading - F# MailboxProcessor 限制并行度

我是 F# 的新手,正在尝试使用 MailboxProcessor 来确保状态更改是单独完成的。

简而言之,我将操作(描述状态更改的不可变对象)发布到 MailboxProcessor,在递归函数中我读取消息并生成新状态(即在下面的示例中将项目添加到集合中)并将该状态发送到下一次递归。

预期输出为:

我得到的是:

阅读 MailboxProcessor 的文档并对其进行谷歌搜索,我的结论是它是一个消息队列,由“单线程”处理,而不是看起来它们都是并行处理的。

我在这里完全不在场吗?

0 投票
1 回答
42 浏览

f# - 使用 TableDependency 和 F# 等待数据库行加载

我有一个 F# 项目,它将一些文件加载​​到外部子系统,然后使用表依赖来等待将某些行添加到表中作为副作用。

下面的类型中使用表依赖来监视数据库更改。当添加/更改/任何行时,它会触发自定义事件:

我想要做的是获取上面的对象,然后等待所有带有我关心的 id 的行被加载。到目前为止,我所拥有的是:

但这只是继续做更多工作,而无需等待我上面需要的所有事件。

我需要使用任务还是异步?F# 代理?

0 投票
1 回答
79 浏览

multithreading - F# / MailBoxProcessor 在接近 100% 的负载下对 PostAndReply 没有响应

我有一个MailBoxProcessor,它执行以下操作:

  1. 主循环(type AsyncRunnerhttps ://github.com/kkkmail/ClmFSharp/blob/master/Clm/ContGen/AsyncRun.fs#L257 - 行号可能会随着我不断更新代码而改变)。它生成一些“模型”,将它们中的每一个编译到特定于模型的文件夹中,将它们作为外部进程生成,然后每个模型使用 WCFAsyncRunner通过调用updateProgress. 一个模型可能需要几天时间才能运行。一旦完成任何模型,跑步者就会生成/产生更多。它被设计为以 100% 的处理器负载运行(但优先ProcessPriorityClass.BelowNormalEnvironment.ProcessorCountMailBoxProcessor目前,我通过使用“异步”编辑了几乎所有内容… |> Async.Start以确保我“永远不会”阻塞主循环。

  2. 我可以通过调用member this.getState () = messageLoop.PostAndReply GetState.

  3. 或者我可以向它发送一些命令(再次使用 WCF),例如member this.start(), member this.stop(), ...</p>

这就是有趣的地方。一切正常!PostAndReply但是,如果我运行一个“监视器”,它会通过在无限循环中有效地调用(暴露为)来请求一个状态this.getState (),过一会儿它就会挂断。我的意思是它最终会返回,但会有一些无法预测的大延迟(比如几分钟)。同时,我可以发出命令,它们确实快速返回,但getState仍然没有返回。

是否有可能使其在接近 100% 的负载下响应?非常感谢!

0 投票
2 回答
112 浏览

asynchronous - 如果程序立即失败,MailboxProcessor 第一个循环将无法运行

我有一个命令定期运行 SFTP 检查并将结果记录到文件中。

它循环一个MailboxProcessor

调用它来将消息写入日志

文件下载是异步的,以及相应的消息,但一切似乎都运行良好。

问题是 - 如果由于某些原因 sftp 连接立即失败,MailboxProcessor则没有时间记录异常消息。

我试图做的——这确实有效——是printfn "%s" ex.Message在最后添加一个:我只是想知道是否有人设想了一个更好的解决方案。

仅供参考,完整的代码在这个 gist中。