问题标签 [monix]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
107 浏览

monix - 将 Observable 写入文件

我目前有以下代码:

我的目标是消除上述代码段中的所有副作用。

从功能的角度来看,writer应该充当future.

因为我使用的是 Monix 版本3.0.0-RC2,所以有新的bracket方法可用于Observableand Task(可能甚至更多的类),这可能是我正在寻找的 - 但我不太明白如何。

0 投票
2 回答
284 浏览

scala - 使用 Monix Observable 处理错误的更好方法

我正在尝试使用 monix 3.0.0-RC1 构建反应式应用程序。

例如 a 有一个 Int 的 Seq,而第二个元素是错误的。我可以Oservable.raiseError(...)用来处理这个:

我不喜欢上面代码中抛出的异常。

另一方面,我可以使用 Scala 的Either

either => either.map(...)在每一步中都不爽。

处理错误的更好方法是什么?

0 投票
1 回答
96 浏览

scala - 如何在 .mapParallelUnordered 中处理错误并使 Observable 保持活动状态

我正在使用Monix 3并且有一种这样的代码:

这段代码运行良好,但我有很多长时间的 IO 操作,所以决定重构.mapParallelUnordered

我试图获得与第一个示例相同的结果,但在并行处理中。问题是Task.raiseError杀死了整个 observable,所以它停止在i = 2.

如何处理错误并保持 Observable 活着?

0 投票
0 回答
149 浏览

reactive-programming - 如何为反应式网络库实现背压?

关于我的套接字反应库工作过程中的问题,有一个很长的背景故事。

socket 库主要基于一个名为Monix(类似 ReactiveX)的响应式库。Monix 具有通过(扩展 Future)类型处理背压的最佳实践,Ack该类型在处理当前消息时发出下一条消息。这是一个很好的机制来保护系统的大消息匆忙。

目前设计,每一个socket连接都是一个Observable(或Stream),Observable在解析TCP/IP网络字节流时会创建一个协议消息,然后将协议消息推送给Subscriber。

问题是 Monix 库只能为每个 Observable 做背压。如果您认为有数千个客户端连接,那么云会有这么多的 Observable 和背压是没有意义的。

那么,如何针对单个 Observable 之外的全局系统,为这样的响应式系统设计背压机制呢?

谢谢

0 投票
1 回答
228 浏览

scala - Monix 如何通过 flatMap 运算符使用背压?

Monix 使用 Ack 同步发出的消息,但如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随source.

请参阅此测试代码:

输出:

其中出现一些背压不匹配:
after: sleep 2 second for every message ...backpressure give 3 of itemafter backpressure map - ...

就背压而言, 怎么可能sleep 2 second for every message ...有一对一的关系?after backpressure map - ...

还有一个疑惑:为什么要记录sleep 2 second for every message 输出(0, 72), (0, 75), (0,78)但这样的事情(0, 72), (1, 73), (2,74)

谢谢。

莫尼克斯版本: "io.monix" %% "monix" % "3.0.0-RC1"

0 投票
0 回答
477 浏览

scala - 哪些操作在流库中的拉取模型和推送模型中更容易实现(反之亦然)?

Monix 的作者说Monix 与 FS2 的比较

FS2更好的地方:

  • 生产者和消费者之间的通信模型是基于拉的,有时更容易实现新的运营商

Monix 更好的地方:

  • 生产者和消费者之间的通信模型是基于推送的(带有背压),这使得它本质上更高效

几个问题出现:

  • 在基于拉的模型中哪些操作更容易实现?
  • 是否有更难以这种方式实施的操作?
  • 为什么基于拉的方法本质上较慢?
0 投票
1 回答
268 浏览

scala - Monix 从并行任务写入单个输出流

我们如何使用 Monix Tasks 从一系列任务中写入单个输出流(可以并行运行):例如,我有 N 个任务可以并行运行并从中获得一些响应HTTP call/server,,我正在写回响应到文件流(因为有限制内存),但我stream closed error,想知道是否有办法使用 Monix 任务或任何其他 API 来实现这一点

示例代码片段:

0 投票
2 回答
525 浏览

scala - 从 Task[Either[A, Task[B]]] 转换为 Task[Either[A, B]]

我正在使用 monix 来解决副作用并以这种类型结束

Task[Either[A, Task[B]]],有没有办法得到Task[Either[A, B]]

到目前为止,我所能做的就是转换Task[Either[A, Task[B]]]Task[Any],基本上Either使用模式匹配和展平进行删除,但在过程中缺少类型信息

0 投票
0 回答
83 浏览

scala - 我从 Monix firstOptionL 得到不一致的结果 - 竞争条件?

我从对已转换为 Observable 的同一(MongoDB)数据库调用的重复调用中得到间歇性缺失值。我已经删除了所有数据库代码,以获得一个只有 Monix 位的最小测试用例,而且我偶尔仍然会丢失值 - 通常每 2,000 次测试一到两个。

根据文档 ConcurrentSubject 的意思是“不需要遵循背压合同”,但无论我是否这样做,我都会遇到类似的失败。

在某些运行中,所有 20x100 循环都正确完成 - firstOptionL isDefined 用于所有 2,000 个结果。但是,超过 50% 的时间 assert(emptyCount === 0) 在值为 1 或有时为 2 时触发,这表明我偶尔会得到一个 None 值,好像 onComplete 发生在 onNext 之前?

这可能发生在 20 个循环中的任何一个中,因此它看起来像是一种竞争条件,或者我误解了所需的输入。我已经尝试了几乎所有主题 - PublishSubject,有和没有 BufferedSubscriber,并且都给出了相似的结果。

我也尝试过将 onComplete 延迟到 Ack via

这似乎会稍早失败。

我也试过 MulticastStrategy.replay 没有区别。

我在 Scala 2.12.8 上使用 Monix 3.0.0-RC3。

0 投票
1 回答
177 浏览

scala - 即使未使用原子 compareAndSet 参数,也会对其进行评估

我有以下代码设置 Atomic 变量(两者java.util.concurrent.atomicmonix.execution.atomic行为相同:

第二次 compareAndSet 时,它没有更新值,但仍然调用了 foo。这导致了问题,因为foo它有副作用(在我的真实代码中,它创建了一个 Akka 演员并给我错误,因为它试图创建重复的演员)。

除非实际使用,否则如何确保不评估第二个参数?(最好不要使用同步)

我需要将隐式参数传递给 foo 所以惰性 val 不起作用。例如