问题标签 [monix]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
92 浏览

scala - 从 NuProcess ByteBuffer 创建 Monix Observable

所以我有一个使用 Java 的当前项目,Process我试图用 NuProcess 替换它(即https://github.com/brettwooldridge/NuProcess)。为了处理 JavaProcess的 STDOUT/STDERROR,您有一个InputStream并且由于 Monix 提供了一个方便的 interopt 方法Observable.fromInputStream,这使您可以轻松地创建一个Observable[String]/ Observable[Array[Byte]]

但是问题是它NuProcess不适用于InputStream,而是使用 javas NIO ByteBuffer。除此之外,它稍微复杂一些,因为它使用事件处理机制,即在 Monix 任务中收听STDOUT/因为你必须执行类似的操作STDERRORNuProcess

所以问题是你如何将它连接到一个可观察的(即一个Observable[String]Observable[Array[Byte]]没有效率低下?

请注意,我使用的是 Monix 3.x

0 投票
0 回答
472 浏览

multithreading - 将 Monix `Task` 转换为 Cats `IO` 后丢失了什么?

这个简化的案例是我的问题发生在...

另一种选择是,但这听起来与的做法Await.result(task)不太一致。IOApp

我知道的

根据这篇很棒的帖子Monix vs Cats Effect
我发现两者之间的区别Task IO是:

  • TaskAPI比丰富IO
  • Task在线程调度公平方面更智能、更自动化
  • Task获得更多可取消选项
  • 和更多 ...

问题和我的看法

我转换Monix Task成后失去了什么Casts IO

它似乎工作正常,Task.gather并且timeout- 所有那些特定于任务的东西。
那么我在转换中失去了什么?
如果否,则没有理由创建多个数据类型。

因此,考虑到差异是否意味着我在与交谈后失去了调度公平性TaskIO


好吧,我应该自己验证它,但我不知道如何测试它的公平性。

0 投票
1 回答
141 浏览

multithreading - 故障超过阈值时停止所有异步任务?

我正在使用Monix Task进行异步控制。

设想

  1. 任务并行执行
  2. 如果故障发生 X 次以上
  3. 停止所有尚未完成的任务(越快越好)

我的解决方案

我提出了在 1. 结果和 2. 错误计数器之间竞争的想法,并取消失败者。
通过Task.race如果错误计数器首先达到阈值,那么任务将被取消Task.race

实验

关于菊石REPL


问题

结果取决于线程池大小!?

对于池大小 1
,结果几乎总是任务成功,即没有停止。

对于池大小 2
,成功和失败之间的不确定性非常不确定,并且取消也不准确。例如:

取消最晚至 98 个任务已完成。
错误计数很奇怪,小到阈值。

默认全局调度程序获得相同的结果行为。

对于池大小 200
,它更具确定性,并且停止更早,因此在完成更少任务的意义上更准确。

池大小越大越好。


如果我Task.gather改为Task.sequence执行,所有问题都消失了!


这种依赖池大小的原因是什么?一旦发生太多错误,如何改进它或者是否有更好的选择来停止任务?

0 投票
2 回答
198 浏览

scala - 如何从 monix observable 获取下一个元素?

我有一个随意记录的数据流,但是在我的程序的某个状态下,我需要来自流的数据,而不是到目前为止观察到的最新数据(我可以这样做),而是之后的最新数据:

我该如何实施def awaitOneElement(Observable[Data]): Data = ???

我知道这可能是惯用的不正确,但肮脏的同步等待正是我所需要的。我也很好Observable[Data] => Future[Data],将Await在下一步结束。

0 投票
4 回答
479 浏览

scala - akka.streams.Source 可以发出值(类似于 monix.BehaviorSubject)

我正在寻找akka.stream.scaladsl.Source可以让我简单地从不同的代码位置发出下一个值的构造方法(例如,监视系统事件)。

  • 我需要类似的东西Promise。Promise 向Future. 我需要向Source.
  • monix.reactive.subjects.BehaviorSubject.onNext(_)
  • 我不太关心背压。

目前我已经使用 monix 和 akka-streams(下面的代码)实现了这个,但我希望应该只有 akka-streams 解决方案:

0 投票
1 回答
199 浏览

scala - 并行执行 Monix 任务

并行运行多个 Monix 任务然后得到一个结果的最佳方法是什么?
我有以下任务并想要获得一个 ResultClass。

官方文档只有一些方法可以对相同类型的任务进行并行执行。
有没有针对不同类型的任务并行执行的好方法?
谢谢。

0 投票
1 回答
146 浏览

scala - Monix 的扇入/扇出并发

我正在尝试学习 Scala 并从中获得一些乐趣,但我遇到了这个经典问题。它让我想起了 NodeJS 早期的很多嵌套回调地狱。

这是我的伪代码程序:

  1. 获取 S3 存储桶列表的任务。
  2. 任务一完成后,我想以十个为一组批量处理桶。
  3. 对于每批:
  4. 获取每个存储桶的区域。
  5. 过滤掉不在区域内的桶。
  6. 列出每个存储桶中的所有对象。
  7. 打印一切

在某一时刻,我最终选择了以下类型:Task[Iterator[Task[List[Bucket]]]]

本质上:

外部任务是列出所有 S3 存储桶的初始步骤,然后内部迭代器/任务/列表尝试批处理返回列表的任务。

我希望有某种方法可以删除/展平外部任务以到达Iterator[Task[List[Bucket]]].

当我尝试将我的处理分解为多个步骤时,深度嵌套会导致我做许多嵌套映射。这是正确的做法还是有更好的方法来处理这种嵌套?

0 投票
1 回答
75 浏览

mongodb - 带有 Mongo 集合的 Monix 任务:错误处理

我正在尝试将 MonixTaskmongo-scala-driver. 我有点难以理解Error Handling

当没有错误时,这非常有效。我想为此添加错误处理(例如处理不正确databasecollection名称)。我基于文档的初步尝试是尝试:

但这给了我一个io.Serializable. 我如何保留一段ChangeStreamObservable[DomainModel]时间同时进行某种简洁的错误处理?感谢我可以研究的任何模式的指针。

BR

0 投票
1 回答
95 浏览

scala - 如何在处理速率限制时异步发送 HTTP 请求?

免责声明:我是sttpMonix的新手,这是我尝试更多地了解这些库的尝试。我的目标是通过 HTTP GET 请求从给定 API 获取数据(客户端)-> 解析 JSON 响应-> 将此信息写入数据库。我的问题仅与第一部分有关。我的目标是以异步(希望是快速)的方式运行获取请求,同时有办法避免或处理速率限制。

以下是我已经尝试过的片段,似乎适用于单个请求:

我的问题:

  1. 如何通过使用 Monix 对多个 GET 请求(而不是单个请求)进行操作,同时保持代码异步和可组合
  2. 如何避免或处理 api 服务器施加的速率限制

附带说明一下,如果这将支持速率限制目标,我在使用另一个后端方面也很灵活。

0 投票
1 回答
164 浏览

scala - 如何运行 Monix 的 parSequenceUnordered 并处理每个任务的结果?

我目前正在努力实现对 API 的客户端 http 请求,并决定探索 sttp 和 monix 来完成这项任务。由于我是 Monix 的新手,我仍然不确定如何运行任务并检索它们的结果。我的目标是获得一系列 http 请求结果,我可以并行调用 -> 解析 -> 加载。

以下是我迄今为止尝试过的一个片段:

我的困惑相当简单(我猜)。如何运行Task.parSequenceUnordered我创建的任务,并处理(解析 http 结果)序列中的任务?

很高兴:出于好奇,在处理请求的任务序列时是否可以天真地引入速率限制/节流?我并不是真的在寻找构建复杂的东西。它可以像间隔批量请求一样简单。想知道 Monix 是否已经为此提供了帮助。