问题标签 [monix]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
178 浏览

scala - 并行处理两个 Observable 并转换为单个 Observable

我有两个从单个 Observable 创建的 Observable,如下所示

现在如何并行运行 leftStream 和 rightStream 并将它们组合在一起以获得可以订阅的新 Observable?执行 Observable.merge 是按顺序执行它们。

0 投票
2 回答
317 浏览

scala - Monix:InputStreamObservable 不支持多个订阅者

我正在尝试将 (String, Date) 的 Observable 拆分为两个不同的 Observable 并将它们压缩在一起,如下所示

但我收到以下异常

0 投票
1 回答
192 浏览

scala - What is `alsoTo` analogue of akka-streams in monix ?

Monix looks like great framework but documentation is very sparse.

What is alsoTo analogue of akka-streams in monix ?

Basically I want stream to be consumed by two consumers.

0 投票
1 回答
133 浏览

scala - Monix Debounce Observable 的使用

我正在尝试一些我可以在 Monix 的 Observable 上执行的操作。我遇到了这个 debounce 运算符,无法理解它的行为:

上面的这个只是每 5 秒发出一个 Long 。

然而,这个根本不发射任何东西。那么 debounce 运算符的真正目的是什么,在哪些情况下我可以使用它?

0 投票
1 回答
117 浏览

scala - Akka Actor 搜索和流媒体事件

我有一个场景,我有一堆 Akka Actor 运行,每个 Actor 代表一个 IoT 设备。我有一个基于 Play 的 Web 应用程序,这些 Actor 在其中运行并连接到这些 IoT 设备。

现在我想通过 WebSocket 端点将来自 Actor 的信号暴露给外界。每个 Actor 都有某种机制,我可以使用它来询问最新的信号状态。

我的想法是执行以下操作:

  1. 在我的控制器中添加一个 WebSocket 端点,该端点需要它需要信号的 IoT 设备的 ID。在这个控制器中,我将执行一个actor选择来获取与传入的 IoT 设备的 id 对应的 Actor 实例。

  2. 使用步骤 1 中获取的 ActorRef 并实例化 WebSocketActor

  3. 在这个 WebSocketActor 中,我将实例化一个 Monix Observable,它将定期使用 actorRef 并要求它提供信号。

  4. 收到这些信号后,我会将其传递给 WebSocket 端点

现在我的问题是:

  1. 如果客户端打开了 WebSocket 流并且在一段时间后代表物联网设备的 Actor 死了,会发生什么。我可能应该在我的 WebSocketActor 中处理这种情况。但是这会是什么样子呢?

  2. 如果代表物联网设备的 Actor 重新活跃起来(假设我设置了一些监督),我可以继续处理在 Actor 死亡之前打开套接字连接的客户端吗?我的意思是客户端是否需要以某种方式关闭并再次打开连接?

请建议?

0 投票
1 回答
126 浏览

scala - Monix Coeval.memoize 炸毁了堆栈

定义

现在

吹堆栈。如果我们.memoize从递归调用中删除 ,它会起作用(如预期的那样)。为什么?

0 投票
2 回答
722 浏览

scala - Scala Monix,如何杀死正在运行或计划的任务

我将Monix用于异步任务工作流。

我们如何杀死跑步者Task

@> Failure(java.util.concurrent.TimeoutException: Task timed-out after 100 milliseconds of inactivity) sleep canceled effect <--- what !? , task is running. Isn't it canceled !?

我认为我目前的解决方案很丑(标志检查阻碍了代码重用):


如果不可能,我们如何在 not-yet-ran 时杀死计划的Task

我失败的尝试是:

可悲的是,它仍然显示取消发生后的效果。我希望可以取消已计划且尚未运行的任务(这.map(...)是另一个Task,对吗?)

0 投票
1 回答
426 浏览

scala - 理解monix中的观察者

我正在阅读有关观察者广告的Monix 文档,我遇到了以下示例:

或者您可以快速构建一个仅记录它接收到的事件的观察者。我们将在其他示例中使用它:

下一个非法示例:

喂食两个元素,然后停止。这是不合法的:

所以我们可以看到相同的onNext -> onNext -> onComplete链条。这不合法吗?为什么?

0 投票
1 回答
301 浏览

scala - 在 Scala 中处理多个并发流的惯用方法

我有一个流列表,在调用它们next()时会随机休眠一段时间,然后从不同的来源读取一个字符。

我正在尝试编写一个消费者,它将继续调用这些流,直到EOF在运行时构建这些流的公共字典。

到目前为止,我使用的ConcurrentHashMap是字典,并且只是为每个流消费者创建了一个新线程。

monix虽然我的解决方案有效,但它似乎很天真,我想知道流媒体库是否有更好的用途,例如fs2

0 投票
0 回答
124 浏览

scala - 我可以使用 Monix 的背压机制来确认消息的传递吗

当我收到来自 3rd 方系统的消息时,我遇到了一种情况,它可以保证消息的传递。为此,它需要客户端确认收到的每条消息。这种系统的一个例子可能是 RibbitMQ。

现在,我知道 Monix 的 Observables 中有背压机制,这立即让我想用它来确认消息。换句话说,我想创建一个 observable,这样当onNext返回时Ack.Continue,我向外部系统确认该消息。贝娄是我想法的简单概述。

当然,整个想法是避免丢失任何消息,并且仅在观察者实际返回 Ack.Continue 时才确认它们。但是,我注意到了几种情况,当 Monix 负责管理可观察对象的背压时,例如,当 Imulticastshare观察者时。在这种情况下,原始 observableAck.Continue无需等待预期订阅者的响应即可接收。

这一切都让我想到了一个问题,如果实际上使用 Monix 背压机制确认第 3 方系统上的消息是一个好主意,如果是这样,是否有任何警告我需要注意(比如不要对multicast这样的观察者)

我将感谢您的帮助,并非常感谢您的提前。