问题标签 [monix]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
294 浏览

scala - 重试 Monix 任务 - 为什么这里需要 Task.defer?

我最近发现了一个在使用 Monix 时无法完全理解的案例Task

有两个函数(在队列消息处理程序中):

的 for-comprehension的内部doRollback都是一组返回 monad 的doobie调用ConnectionIO[_],然后transact在其上运行将组合变成 Monix Task

现在,如handle函数所示,我希望整个过程在失败的情况下重试 5 次。神秘的部分是这个简单的调用:

并没有真正重新启动异常操作(在测试中验证)。为了获得这种重试行为,我必须明确地将其包装在 中Task.defer,或者Task以任何其他方式将其置于“上下文”中。

这就是我不完全明白的一点:为什么会这样?doRollback已经给了我Task实例,所以我应该可以调用onErrorRestart它,不是吗?如果不是这种情况,我如何确定Task我从“某处”获得的实例是否可以重新启动?

我在这里想念什么?

0 投票
1 回答
149 浏览

scala - 为什么 Monix Observable 产生的元素比需要的多一个

我正在玩 Monix 流,并获得了我ObservableIterator. 在我看来,运行时它产生的元素比我预期的多 1 个。以下代码表明:

这个案子很简单。Iterator每次产生next值时都会打印日志。下游阶段是简单的延迟任务,并行计数为 5 以查看发生了什么。现在输出如下:

如您所见,最初流产生 6 个元素,而我只期望 5 个(因为下游阶段mapParallelUnordered只需要 5 个元素。实际上这没什么大不了的,但我只是想了解为什么会这样。

还有为什么初始值是在线程中产生的,而后续的值是在线程池中main调用的?execution-context不应该都使用用于运行整个流的调度程序吗?

0 投票
2 回答
152 浏览

scala - 用猫创建抽象集合的有效方法

我有一些使用Monix Observable对文件进行流处理的代码。为了测试这段代码,我希望我对 的操作Observable是类型独立的,所以我也可以在任何其他数据结构上执行它们,List比如 这就是为什么我编写了以下代码来抽象底层数据结构的原因:

让我烦恼的是,这段代码创建了很多中间数据结构。有没有我可以使用的类型类来提高这个过程的效率?无需太多开销即可将一个数据结构提升到另一个数据结构的东西,例如,LiftIO但对于项目集合?

0 投票
1 回答
359 浏览

scala - Monix Observable groupBy 大量没有内存泄漏的键

我尝试Observable在 Monix 中按键进行拆分,然后将每个事件分组到最后一个n事件GrouppedObservable并将它们发送以进行进一步处理。问题是要分组的键的数量可能是无限的,这会导致内存泄漏。

应用环境

我有来自许多对话的消息的 kafka 流。每个对话都有roomId,我想对这个 id 进行分组以获取 Observables 的集合,每个对话只包含来自单个对话的消息。会话室通常是短暂的,即创建一个独特的新会话roomId,在短时间内交换几十条消息,然后关闭会话。为避免内存泄漏,我希望仅保留 100-1000 个最近对话的缓冲区,并删除较旧的对话。因此,如果一个事件来自一个长期未见的对话,它将被视为新对话,因为其先前消息的缓冲区将被遗忘。

Monix中的 groupBy 方法具有keysBuffer指定如何处理键缓冲区的参数。

我认为指定keyBufferDropOld策略将使我能够实现我想要的行为。

以下是所描述用例的简化版本。

但是,在 VisualVM 上观察应用程序堆表明内存泄漏。跑了大约30分钟后,我得到了java.lang.OutOfMemoryError: GC overhead limit exceeded

下面是描述运行我的应用程序大约 30 分钟的堆使用图的屏幕截图。(最后压扁的部分在后面OutOfMemoryError

应用程序的 VisualVM 堆图

我的问题是:如何在不泄漏内存的情况下通过可能无限数量的键对 monix 中的事件进行分组?允许丢弃旧密钥

背景资料:

  • 莫尼克斯版本:3.0.0-RC2
  • 斯卡拉版本:2.12.8
0 投票
2 回答
304 浏览

scala - 如何将流程执行实现为反应式`Observable [String]

我想将外部流程执行表示为Observable[String], where String- 来自流程输出的行。这是我所做的示例,它有效:

您可以看到,该过程每秒都会发出新行。但这无关紧要。只需提供完整的示例,test.py

输出:

问题

我想要超时 - 如果进程冻结(例如sleep 100000)进程应该在超时后被杀死。此外,如果进程强制或失败,则应清理一些资源(guarantee例如)。非零退出代码应该代表失败。

如何像Observable[String]正确的错误处理一样实现流程执行?rx-java欢迎解决方案。

0 投票
1 回答
64 浏览

scala - monix:Task.executeWithFork 阻止执行?

我无法弄清楚为什么executeWithFork在以下示例中添加会阻止任务运行:

0 投票
1 回答
481 浏览

scala - 如何使用 sttp 将响应读取为 Observable[String]

我正在使用sttp客户端。我想将响应解释为除以行的字符串,例如Observable[String]

这里sttp流api:

那我怎么能得到Observable[String]

这里有一些想法:

1.有没有一种简单的split按行观察的方法?
2.或者也许我可以InputStream从响应中得到原始信息,所以我可以很容易地拆分它,但我找不到使用类似asStream[InputStream]
3的方法。或者也许只是使用 http 后端 witoutsttp层?

0 投票
1 回答
120 浏览

scala - 当 Task 从不同的词法范围引用时,Monix TaskLocal 似乎没有反映来自 bind() 调用的值

我有一个代码块,它是对 ScalaDoc 中为 TaskLocal 给出的代码片段的轻微修改[原文:https ://monix.io/api/3.0/monix/eval/TaskLocal.html ]。

我的代码只是将内联任务(在 for 理解范围内)替换为对在 for 理解范围之外定义的 Task 的引用。不幸的是,当我的任务运行时,它没有看到绑定,而是看到了原始值 (0)。我希望我稍微修改过的代码(如下)会产生与 ScalaDoc 中的片段相同的结果。但出乎意料的是,我得到“value3:4”,而不是 value3:200。我对 tlocal 的引用似乎得到了该 threadlocal-ish 变量的不同“版本”,该变量初始化为默认值,并且对绑定一无所知.

代码

理由:

我想这样做的原因是因为我想将一个值绑定到我的本地线程,然后创建一个任务映射链,其中一些任务拉动本地线程的当前值,我理解它是由TaskLocal“前置”的。而且,为了清楚起见,其中一些任务是在我的代码库的其他类中定义的,而不是在任何特定的理解范围内内联。

谢谢 !/克里斯

0 投票
1 回答
204 浏览

scala - 使用流建模多个函数调用(以安全的 FP 方式)

给定一个要被多次调用的函数A => IO[B](又名Kleisli[IO, A, B]),并且有副作用,比如更新数据库,如何将这样的多次调用委托给一个流(我猜Pipe[IO, A, B])(fs2,monix observable/iterant)?这样做的原因是能够在一个时间窗口内累积状态、批量调用等。

更具体地说,http4s 服务器需要一个Request => IO[Response],所以我正在研究如何对流进行操作(为了上述好处),但最终为 http4s 提供了这样的功能。

我怀疑它在幕后需要一些相关 ID,我对此很好,我对如何从 FP 的角度安全和正确地进行操作更感兴趣。

最终,我期望的签名可能是这样的:

Pipe[IO, A, B] => (A => IO[B]), 这样对 Kleisli 的调用就会通过管道传输。

作为事后的想法,是否有可能背压?

0 投票
1 回答
184 浏览

scala - 在 Monix 任务中为空

Project Reactor 有类似 Mono.empty[T]() 的东西,可以在特殊情况下处理,在这种情况下你在评估它时没有任何东西。Monix Task 中有类似的东西吗?

在这里,我试图从项目列表中提取一个项目,从数据库读取时该列表可能不存在。我不想发送像 Mono.just(List()) 这样的东西,因为这需要我在 db 调用端添加另一个空/空检查。