问题标签 [monix]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
236 浏览

scala - 如何限制通过 Monix 发送 HTTP 获取请求?

在我之前的问题的基础上,根据Artem的见解,我的目标是将 get 请求发送到给定的 url,并使用 Monix 的限制功能来分隔请求(以避免达到速率限制)。

预期的工作流程如下所示:

这是我到目前为止所尝试的(下面是我实际代码的简化片段):

这就是获取数据的函数的样子:

我已经尝试运行上述代码,但我仍然看到所有 get 请求都被触发,中间没有任何间距。

为了说明,我当前的 api 调用日志如下所示:

我正在尝试实现类似于:

更新:

  • 我已经使用Beeceptor将 api 设置为可模拟的。在我看来,打印语句是由调用函数发出的,但实际上并未发送请求。我还更新了我的函数调用以解析为字符串(只是为了简单起见)但是,当我尝试限制对模拟 api 的请求时,它仍然没有收到任何请求。
0 投票
1 回答
249 浏览

scala - 完成请求后如何关闭 STTP 后端?

我目前正在使用 Monix 后端学习和玩 STTP。在处理完所有请求(每个请求都是一个任务)后,我主要坚持关闭后端。

我创建了类似于我的问题的示例/模拟代码(据我了解,我的问题更普遍,而不是特定于我的代码):

我的 fetch (api call maker) 函数看起来像:

由于我的主要任务包含稍后需要处理的其他任务,因此我需要找到一种替代方法来从外部关闭 Monix 后端。在我消费请求后,有没有一种干净的方法来关闭后端List[Task[Response[Either[String, String]]]]

0 投票
2 回答
57 浏览

scala - How can I block terminating my program until the Observable consumption is complete?

I am currently trying to use Monix for throttling api get requests. I have tried using STTP's Monix backend, and it worked fine until couldn't shut down the Monix backend after I was done... As this seems more like an sttp issue than a Monix one, I tried to re-approach the problem by using sttp's default backend, while still using Monix to throttle.

I am mainly struggling with closing the monix backend once I am done with consuming the observable

I have tried to simplify the problem through:

However, I am still not sure how to turn off the program after the Observable is consumed, as it terminates prematurely here (unlike the monix backend case)...

In other words, how can I block terminating program until the Observable iterable is complete?

0 投票
1 回答
117 浏览

scala - 正确使用 Monix 3.2.2 Observable 和 Doobie 0.9.0

我想将 Monix Observable 与 Doobie (fs2) 流一起使用,但似乎无法使其正常工作。如果没有流式传输,我的测试应用程序可以正常退出,但是在使用流式传输后,我的 TaskApp 似乎挂在关机状态并且无法弄清楚原因。

这是重现问题的最小示例:

我已根据 Monix 文档将 fs2 流转换为 Monix observable:https ://monix.io/docs/current/reactive/observable.html#fs2

我是否需要以某种方式关闭 fs2 流或 Observable 才能干净地退出应用程序?感谢任何提示以使其正常工作或提示如何正确调试它。

0 投票
1 回答
209 浏览

scala - 不同的单子用于理解

我有以下代码

到目前为止,这一切都很好。但是现在,我想从 sendMessage 中获取布尔值,然后只有当 sendMessage 返回 true 时,我才想调用另一个方法。

我知道它们是不同的单子。请让我知道如何以更简洁的方式添加所有三个调用以进行理解。感谢帮助

0 投票
1 回答
39 浏览

scala - 相同的 scala 任务代码在沙箱中有效,但在 intelliJ 中无效

只需尝试一些简单的任务示例。以下代码工作正常

但使用 runToFuture 仅适用于沙箱,而不是当我在 intelliJ 中运行它时(当然在 intelliJ 中我在对象内运行它)

在 intelliJ 中没有打印 2,只是

“C:\Program Files\Java\jdk1.8.0_192\bin\java.exe”

进程以退出代码 0 结束

可能是什么原因,我没想到这么早卡住了。提前致谢

0 投票
0 回答
73 浏览

scala - 如何在 monix 中为每个任务添加固定延迟而不增加延迟

我正在使用 monix 接收来自 kafka 的消息。

我想在从主题中读取消息后 10 秒处理消息。

这 10 秒分钟的延迟不应阻止阅读更多消息。

我尝试使用以下代码测试此行为,使用 Task.delayExecution 延迟 10 秒

我也试过Observable.delayOnExecutionObservable.delayOnNext

但是上面的代码增加了每条消息的延迟。第一条消息延迟 10 秒,但第二条消息延迟 20 秒,第三条消息延迟 30 秒,依此类推。

使用 monix 可以做这样的事情吗?

我正在考虑基于 monix 的解决方案的其他替代方案,例如使用内存队列。消费者将继续推送到队列,直到达到限制。

更新 :

我找到了一个解决方案Task.eval(<current_time>).restartUntil(<condition>)

在下面添加代码。

我不完全确定它是否是理想的解决方案,因为它似乎正在进行主动 CPU 计算以使其工作。

想看看有没有更好的选择。

0 投票
1 回答
24 浏览

task - 在 Monix 中处理来自任务的错误

我有以下代码片段,它对一组 URL 进行简单的 HTTP 查找。

我有一个播放控制器,我在其中调用此函数并运行如下任务:

对于导致 SSLHandshakeException 的 URL 之一,我进入了 Failure(err) 案例块,但我想获得以下信息:

  1. 无论错误是什么,我都想进入 Success 块,在那里我已经捕获了任何失败 URL 的错误消息。

我如何调整我的任务实现来做我需要的?有任何想法吗?我尝试了 onErrorRecoverWith 处理程序,但似乎没有任何效果。有任何想法吗?

0 投票
0 回答
85 浏览

scala - 使用 Monix 时,使用 mariadb 的 Quill.io 批量插入会引发错误

在 mariadb 10.5.9 和 scala 2.13 上使用MysqlMonixJdbcContextgetquill.io 并尝试使用

我收到以下错误

我该如何解决这个问题?

0 投票
0 回答
52 浏览

scala - 在递归循环中处理和共享 Monix 任务的状态

我有以下代码递归迭代并通过网络执行某些操作。当它通过网络时,我想做一些优化,其中第一个优化是避免通过网络来处理我已经尝试过的某些元素。

例如,在下面的例子中,我调用一个 URL,提取在该 URL 中找到的 HREF,然后调用这些 URL 并报告状态。由于可能会再次获取某些 URL,对于那些失败的 URL,我想将它们添加到全局状态,以便下次遇到此 URL 时,我将避免那些网络调用。

这是代码:

您可以在案例 Left(....) 块中看到我将失败的案例类添加到缓存中,我在此函数的封闭类上全局定义为:

这是我执行递归操作的函数:

可以看出,我正在检查我作为当前元素的 url 是否已经在缓存中,这意味着我已经尝试过但失败了,所以我想避免再次为它进行 HTTP 调用。

但是发生的事情是,httpResultErrorCache 总是空的。我不确定 Task 块是否导致了这种行为。关于如何让缓存工作的任何想法?