问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
184 浏览

reactive-programming - 反应式流是消息还是事件驱动?

我试图弄清楚 Reactive Streams 是消息驱动的还是事件驱动的。有不同的版本,在宣言中他们说 Reactive Streams 是消息驱动的。但在另一篇文章中,他们只说事件。或者也许这两者都是可以包含事件的消息?

0 投票
1 回答
128 浏览

java - cyclops-react 和 async-retry:如何在超时时重试?

我开始将 cyclops-react 与异步重试一起使用。我还是有点迷茫。

我正在使用 SimpleReact 并模拟来自服务器的超时,但我从未收到类似这样的超时:

那里缺少什么?

0 投票
1 回答
2944 浏览

akka - Akka Streams:Mat 在 Source[out, Mat] 中代表什么

在 Akka 流中,Mat in Source[Out, Mat] 或 Sink[In, Mat] 代表什么。什么时候会真正使用?

0 投票
1 回答
267 浏览

scala - 向数据库提供反应式 API

如何为不支持流式传输的数据库提供反应流 api?就像让我们说例如dynamodb。进行 get 调用时,dynamodb 将返回所有结果。因此,即使我将 get 调用包装在 Source 中,如何处理来自下游阶段的背压?另外我如何在数据库中实现写调用?我的水槽会是什么样子?对此的任何指示都会有所帮助。

0 投票
1 回答
2866 浏览

scala - Akka Streams 对一组键进行过滤和分组

我有一个流

现在我想过滤一个键子集,例如 val filterKeys = Set[Char]('k','f','c')然后Filter(k.exists(filterKeys.contains))) 拆分这些键,以便某些键由不同的流处理,然后在最后合并在一起;

我该怎么做呢?

FlexiRoute以旧的方式似乎是一个不错的方法,但在新的 API 中,我猜我想GraphStage从 DSL 进行自定义或创建我自己的图表,因为我认为无法通过内置阶段做到这一点。 .?

0 投票
1 回答
1622 浏览

scala - Akka Streams 按类型拆分流

我有以下简单的案例类层次结构:

我有一个Flow[Message, Message, NotUsed](来自基于 Websocket 的协议,编解码器已经到位)。

我想将其解复用Flow[Message]为 Foo 和 Baz 类型的单独流,因为它们是由完全不同的路径处理的。

最简单的方法是什么?应该很明显,但我错过了一些东西......

0 投票
1 回答
394 浏览

transactions - Akka 流和事务边界

我仍在掌握 Akka 流的概念,并试图了解当我们有一组需要以原子方式处理的项目时如何将它们映射到场景。假设我们有一个包含多个项目的采购订单,我们需要对每个项目应用一些处理,然后将它们合并为一个值。这样的工作流是否应该成为它自己的独立流(或子流),一旦采购订单被完全处理就关闭?即每个采购订单开始一个新的流?或者我有一个永无止境的采购订单流?但如果是这样,我不会有混合来自不同订单的采购订单的问题吗?

换句话说,我想要实现的是处理不同工作流的隔离,并想知道 Akka 流是否提供了一个很好的匹配。

0 投票
1 回答
808 浏览

rx-java - 在 RxJava 2.0 中使用 Reactive-Streams 处理器

我有一个org.reactivestreams.Processor我想与 RxJava 2.0 一起使用的。然而,虽然有将 aorg.reactivestreams.Publisher与 RxJava 集成的转换,比如io.reactivex.Flowable#fromPublisher,但我不清楚如何最好地集成 a org.reactivestreams.Processor(或org.reactivestreams.Subscriber)。任何人都可以对此有所了解吗?

0 投票
0 回答
967 浏览

scala - Kafka tests failing intermittently if not starting/stopping kafka each time

I'm trying to run some integration tests for a data stream using an embedded kafka cluster. When executing all the tests in a different environment than my local, the tests are failing due to some internal state that's not removed properly.

I can get the all the tests running on the non-local environment when I start/stop the kafka cluster before/after each test but I only want to start and stop the cluster once, at the beginning and at the end of the execution of my suite of tests.

I tried to remove the local streams state but that didn't seem to work:

Is there a way to get my suit of tests running without having to start/stop cluster each time?

Right below there are the relevant classes.

Any help over this would be much appreciated? Thanks in advance.

0 投票
1 回答
13665 浏览

java - RxJava 2.0 - 如何将 Observable 转换为 Publisher

如何在 RxJava 版本 2 中将 Observable 转换为 Publisher?

在第一个版本中,我们有https://github.com/ReactiveX/RxJavaReactiveStreams项目,它完全符合我的需要。但是我如何在 RxJava 2 中做到这一点?