问题标签 [monix]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 并行处理两个 Observable 并转换为单个 Observable
我有两个从单个 Observable 创建的 Observable,如下所示
现在如何并行运行 leftStream 和 rightStream 并将它们组合在一起以获得可以订阅的新 Observable?执行 Observable.merge 是按顺序执行它们。
scala - Monix:InputStreamObservable 不支持多个订阅者
我正在尝试将 (String, Date) 的 Observable 拆分为两个不同的 Observable 并将它们压缩在一起,如下所示
但我收到以下异常
scala - What is `alsoTo` analogue of akka-streams in monix ?
Monix looks like great framework but documentation is very sparse.
What is alsoTo
analogue of akka-streams in monix ?
Basically I want stream to be consumed by two consumers.
scala - Monix Debounce Observable 的使用
我正在尝试一些我可以在 Monix 的 Observable 上执行的操作。我遇到了这个 debounce 运算符,无法理解它的行为:
上面的这个只是每 5 秒发出一个 Long 。
然而,这个根本不发射任何东西。那么 debounce 运算符的真正目的是什么,在哪些情况下我可以使用它?
scala - Akka Actor 搜索和流媒体事件
我有一个场景,我有一堆 Akka Actor 运行,每个 Actor 代表一个 IoT 设备。我有一个基于 Play 的 Web 应用程序,这些 Actor 在其中运行并连接到这些 IoT 设备。
现在我想通过 WebSocket 端点将来自 Actor 的信号暴露给外界。每个 Actor 都有某种机制,我可以使用它来询问最新的信号状态。
我的想法是执行以下操作:
在我的控制器中添加一个 WebSocket 端点,该端点需要它需要信号的 IoT 设备的 ID。在这个控制器中,我将执行一个actor选择来获取与传入的 IoT 设备的 id 对应的 Actor 实例。
使用步骤 1 中获取的 ActorRef 并实例化 WebSocketActor
在这个 WebSocketActor 中,我将实例化一个 Monix Observable,它将定期使用 actorRef 并要求它提供信号。
收到这些信号后,我会将其传递给 WebSocket 端点
现在我的问题是:
如果客户端打开了 WebSocket 流并且在一段时间后代表物联网设备的 Actor 死了,会发生什么。我可能应该在我的 WebSocketActor 中处理这种情况。但是这会是什么样子呢?
如果代表物联网设备的 Actor 重新活跃起来(假设我设置了一些监督),我可以继续处理在 Actor 死亡之前打开套接字连接的客户端吗?我的意思是客户端是否需要以某种方式关闭并再次打开连接?
请建议?
scala - Monix Coeval.memoize 炸毁了堆栈
定义
现在
吹堆栈。如果我们.memoize
从递归调用中删除 ,它会起作用(如预期的那样)。为什么?
scala - Scala Monix,如何杀死正在运行或计划的任务
我将Monix用于异步任务工作流。
我们如何杀死跑步者Task
?
@> Failure(java.util.concurrent.TimeoutException: Task timed-out after 100 milliseconds of inactivity)
sleep
canceled
effect <--- what !? , task is running. Isn't it canceled !?
我认为我目前的解决方案很丑(标志检查阻碍了代码重用):
如果不可能,我们如何在 not-yet-ran 时杀死计划的Task
?
我失败的尝试是:
可悲的是,它仍然显示取消发生后的效果。我希望可以取消已计划且尚未运行的任务(这.map(...)
是另一个Task
,对吗?)
scala - 理解monix中的观察者
我正在阅读有关观察者广告的Monix 文档,我遇到了以下示例:
或者您可以快速构建一个仅记录它接收到的事件的观察者。我们将在其他示例中使用它:
下一个非法示例:
喂食两个元素,然后停止。这是不合法的:
所以我们可以看到相同的onNext -> onNext -> onComplete
链条。这不合法吗?为什么?
scala - 在 Scala 中处理多个并发流的惯用方法
我有一个流列表,在调用它们next()
时会随机休眠一段时间,然后从不同的来源读取一个字符。
我正在尝试编写一个消费者,它将继续调用这些流,直到EOF
在运行时构建这些流的公共字典。
到目前为止,我使用的ConcurrentHashMap
是字典,并且只是为每个流消费者创建了一个新线程。
monix
虽然我的解决方案有效,但它似乎很天真,我想知道流媒体库是否有更好的用途,例如fs2
scala - 我可以使用 Monix 的背压机制来确认消息的传递吗
当我收到来自 3rd 方系统的消息时,我遇到了一种情况,它可以保证消息的传递。为此,它需要客户端确认收到的每条消息。这种系统的一个例子可能是 RibbitMQ。
现在,我知道 Monix 的 Observables 中有背压机制,这立即让我想用它来确认消息。换句话说,我想创建一个 observable,这样当onNext
返回时Ack.Continue
,我向外部系统确认该消息。贝娄是我想法的简单概述。
当然,整个想法是避免丢失任何消息,并且仅在观察者实际返回 Ack.Continue 时才确认它们。但是,我注意到了几种情况,当 Monix 负责管理可观察对象的背压时,例如,当 Imulticast
等share
观察者时。在这种情况下,原始 observableAck.Continue
无需等待预期订阅者的响应即可接收。
这一切都让我想到了一个问题,如果实际上使用 Monix 背压机制确认第 3 方系统上的消息是一个好主意,如果是这样,是否有任何警告我需要注意(比如不要对multicast
这样的观察者)
我将感谢您的帮助,并非常感谢您的提前。