问题标签 [monix]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 从 NuProcess ByteBuffer 创建 Monix Observable
所以我有一个使用 Java 的当前项目,Process
我试图用 NuProcess 替换它(即https://github.com/brettwooldridge/NuProcess)。为了处理 JavaProcess
的 STDOUT/STDERROR,您有一个InputStream
并且由于 Monix 提供了一个方便的 interopt 方法Observable.fromInputStream
,这使您可以轻松地创建一个Observable[String]
/ Observable[Array[Byte]]
。
但是问题是它NuProcess
不适用于InputStream
,而是使用 javas NIO ByteBuffer
。除此之外,它稍微复杂一些,因为它使用事件处理机制,即在 Monix 任务中收听STDOUT
/因为你必须执行类似的操作STDERROR
NuProcess
所以问题是你如何将它连接到一个可观察的(即一个Observable[String]
或Observable[Array[Byte]]
没有效率低下?
请注意,我使用的是 Monix 3.x
multithreading - 将 Monix `Task` 转换为 Cats `IO` 后丢失了什么?
这个简化的案例是我的问题发生在...
另一种选择是,但这听起来与的做法Await.result(task)
不太一致。IOApp
我知道的
根据这篇很棒的帖子Monix vs Cats Effect
我发现两者之间的区别Task
IO
是:
Task
API比丰富IO
Task
在线程调度公平方面更智能、更自动化Task
获得更多可取消选项- 和更多 ...
问题和我的看法
我转换Monix
Task
成后失去了什么Casts
IO
?
它似乎工作正常,Task.gather
并且timeout
- 所有那些特定于任务的东西。
那么我在转换中失去了什么?
如果否,则没有理由创建多个数据类型。
因此,考虑到差异是否意味着我在与交谈后失去了调度公平性?Task
IO
好吧,我应该自己验证它,但我不知道如何测试它的公平性。
multithreading - 故障超过阈值时停止所有异步任务?
我正在使用Monix Task
进行异步控制。
设想
- 任务并行执行
- 如果故障发生 X 次以上
- 停止所有尚未完成的任务(越快越好)
我的解决方案
我提出了在 1. 结果和 2. 错误计数器之间竞争的想法,并取消失败者。
通过Task.race
如果错误计数器首先达到阈值,那么任务将被取消Task.race
。
实验
关于菊石REPL
问题
结果取决于线程池大小!?
对于池大小 1
,结果几乎总是任务成功,即没有停止。
对于池大小 2
,成功和失败之间的不确定性非常不确定,并且取消也不准确。例如:
取消最晚至 98 个任务已完成。
错误计数很奇怪,小到阈值。
默认全局调度程序获得相同的结果行为。
对于池大小 200
,它更具确定性,并且停止更早,因此在完成更少任务的意义上更准确。
池大小越大越好。
如果我Task.gather
改为Task.sequence
执行,所有问题都消失了!
这种依赖池大小的原因是什么?一旦发生太多错误,如何改进它或者是否有更好的选择来停止任务?
scala - 如何从 monix observable 获取下一个元素?
我有一个随意记录的数据流,但是在我的程序的某个状态下,我需要来自流的数据,而不是到目前为止观察到的最新数据(我可以这样做),而是之后的最新数据:
我该如何实施def awaitOneElement(Observable[Data]): Data = ???
?
我知道这可能是惯用的不正确,但肮脏的同步等待正是我所需要的。我也很好Observable[Data] => Future[Data]
,将Await
在下一步结束。
scala - akka.streams.Source 可以发出值(类似于 monix.BehaviorSubject)
我正在寻找akka.stream.scaladsl.Source
可以让我简单地从不同的代码位置发出下一个值的构造方法(例如,监视系统事件)。
- 我需要类似的东西
Promise
。Promise 向Future
. 我需要向Source
. - 像
monix.reactive.subjects.BehaviorSubject.onNext(_)
- 我不太关心背压。
目前我已经使用 monix 和 akka-streams(下面的代码)实现了这个,但我希望应该只有 akka-streams 解决方案:
scala - 并行执行 Monix 任务
并行运行多个 Monix 任务然后得到一个结果的最佳方法是什么?
我有以下任务并想要获得一个 ResultClass。
官方文档只有一些方法可以对相同类型的任务进行并行执行。
有没有针对不同类型的任务并行执行的好方法?
谢谢。
scala - Monix 的扇入/扇出并发
我正在尝试学习 Scala 并从中获得一些乐趣,但我遇到了这个经典问题。它让我想起了 NodeJS 早期的很多嵌套回调地狱。
这是我的伪代码程序:
- 获取 S3 存储桶列表的任务。
- 任务一完成后,我想以十个为一组批量处理桶。
- 对于每批:
- 获取每个存储桶的区域。
- 过滤掉不在区域内的桶。
- 列出每个存储桶中的所有对象。
- 打印一切
在某一时刻,我最终选择了以下类型:Task[Iterator[Task[List[Bucket]]]]
本质上:
外部任务是列出所有 S3 存储桶的初始步骤,然后内部迭代器/任务/列表尝试批处理返回列表的任务。
我希望有某种方法可以删除/展平外部任务以到达Iterator[Task[List[Bucket]]]
.
当我尝试将我的处理分解为多个步骤时,深度嵌套会导致我做许多嵌套映射。这是正确的做法还是有更好的方法来处理这种嵌套?
mongodb - 带有 Mongo 集合的 Monix 任务:错误处理
我正在尝试将 MonixTask
与mongo-scala-driver
. 我有点难以理解Error Handling
当没有错误时,这非常有效。我想为此添加错误处理(例如处理不正确database
的collection
名称)。我基于文档的初步尝试是尝试:
但这给了我一个io.Serializable
. 我如何保留一段ChangeStreamObservable[DomainModel]
时间同时进行某种简洁的错误处理?感谢我可以研究的任何模式的指针。
BR
scala - 如何运行 Monix 的 parSequenceUnordered 并处理每个任务的结果?
我目前正在努力实现对 API 的客户端 http 请求,并决定探索 sttp 和 monix 来完成这项任务。由于我是 Monix 的新手,我仍然不确定如何运行任务并检索它们的结果。我的目标是获得一系列 http 请求结果,我可以并行调用 -> 解析 -> 加载。
以下是我迄今为止尝试过的一个片段:
我的困惑相当简单(我猜)。如何运行Task.parSequenceUnordered
我创建的任务,并处理(解析 http 结果)序列中的任务?
很高兴:出于好奇,在处理请求的任务序列时是否可以天真地引入速率限制/节流?我并不是真的在寻找构建复杂的东西。它可以像间隔批量请求一样简单。想知道 Monix 是否已经为此提供了帮助。