问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
451 浏览

akka - akka 流自定义图形阶段

我有一个来自 web 套接字的 akka 流,比如akka 流使用 web 套接字,并且想构建一个可重用的图形阶段(inlet:流,FlowShape:向 JSON 中添加一个附加字段,指定来源,即

outlet卡夫卡。

我面临以下3个问题:

  • 无法Inlet从 Web 套接字流中创建自定义
  • 无法将 kafka 直接集成到流中(参见下面的代码)
  • 不确定是否需要转换器添加附加字段来反序列化 json 以添加来源

示例项目(仅限流程)如下所示:

0 投票
1 回答
381 浏览

rx-java - 使用反应式流处理器作为事件总线通常可以吗?

我开始学习响应式流是因为我对使用 RxJava 替代更传统的事件总线的新趋势感到好奇。 这篇博客文章是对如何完成此操作的典型描述。如果我理解正确的话,RxJava 1.x 并不是严格意义上的 Reactive Streams 的实现,但它非常相似。2.0 版包含一些兼容的类,或者至少通过了 TCK,因此此代码的更新版本可能看起来有些不同。

在 Reactive Streams 术语中,我认为subject是 a Processor,它既是 aPublisher又是 a Subscriber

问题是调用未订阅任何内容onNext的 aSubscriber似乎违反了 Reactive Streams 规范,尤其是规则 1.9

这仅仅是一个实现细节吗? 我认为您通常不能依赖于与兼容的 Reactive Streams 实现一起工作,我是否正确?

0 投票
1 回答
10647 浏览

kotlin - 反应堆通量到单声道>

如何Flux<MyObject>直接转换为Mono<List<MyObject>>

我正在寻找Single<List<MyObject>> single = observable.toList()来自 RxJava 的等价物。

使用阻塞运算符,我可以这样做:

val just: Mono<List<MyObject>> = Mono.just(flux.toIterable().toList())

但它是在声明时执行的,这似乎是不正确的。

0 投票
1 回答
504 浏览

microservices - 响应式流 Kafka 流扇出到 http 演员

我对 Akka Streaming 和响应式流媒体非常陌生。我有一个问题:是否有可能让一个休息 API 接收到一条消息,将其丢弃在 Kafka Bus 上,然后 Kafka 流式消费者将消息聚合到最大值中。时间窗口并重新返回答案?

如何实施这样的制度?或者从哪里开始?

谢谢

0 投票
1 回答
447 浏览

scala - 如何将来自多个来源的结果组合成流

我使用 slick-3.0.0 并尝试使用streaming

假设有AccountsTablePreferencesTable

我想从中获取一些信息PreferencesTable并将其用于流中AccountsTable。例如(参见 TODO):

0 投票
1 回答
115 浏览

scala - 如何使用 slick-streaming 和 akka-streaming 将一些记录从 TableA 复制到 TableB

有两个表TableATableB

我需要将一些记录从 复制TableATableB。我使用slick-3.0和使用以下方式:

但是我遇到了一个问题 - 如何将批处理写入TableB异步(请参阅 TODO)。现在上面的代码只适用于阻塞到内部未来(见#1评论)。是否有异步执行该任务的正确方法?

0 投票
0 回答
80 浏览

scala - ReactiveKafka 性能问题

我最近从经典的 kafka-clients javaakka-reactive kafka转向提高我的消费者性能。我正在使用最新版本的akka-reactive-kafka (0.16). 我的流程很简单:read from kafka then group by batch then process by batch then commit。我做了一个小基准测试,但我得到的结果很差 about 2 000 msg / sec。对于替补席,我正在运行kafka-producer-perf-test.sh.

在 scala 代码 + 日志下面:

[

有关信息,我在 EC2 t2 中型机器上运行此代码,并使用用 Python 编写的 kafka 客户端,我们的速率为 5000 msg / sec,所以我认为我在使用响应式 API 的方式上做错了,但我找不到什么。阅读这篇关于reactive-kafka-benchmark 的博客,我应该有更好的吞吐量!

0 投票
2 回答
2243 浏览

akka - 使用 Akka Stream 从数据库流式传输记录

我有一个使用 Akka 的系统,该系统目前通过消息队列处理传入的流数据。当一个记录到达然后它被处理时,mq 被确认并且记录被传递以在系统内进一步处理。

现在我想添加对使用 DB 作为输入的支持。
输入源能够处理 DB 的方法是什么(应该以接收器可以处理的速度输入 > 100M 记录 - 所以我假设反应/akka-streams?)?

0 投票
2 回答
1289 浏览

rx-java - 反应式流 - 超时批处理

我正在考虑替换一个看起来非常接近 ReactiveStreams 的本土日志处理库io.projectreactor. 目标是减少我们维护的代码,并利用社区添加的任何新功能(关注运营商融合)。

作为开始,我需要使用标准输入输出并将多行日志条目合并到将沿管道向下流动的文本块中。Filebeat 文档的多行日志条目章节详细解释了用例(除了我们希望它在进程中)。

到目前为止,我拥有的代码是:

当检测到新的日志头时,这会处理多行合并,但在现有库中,我们也会在超时后刷新累积的行(即,如果在 5 秒内没有收到文本,则刷新记录)。

在 Reactor 中对此进行建模的正确方法是什么?我需要编写自己的操作符,还是可以自定义任何现有的操作符?

任何指向在 Project Reactor 或 RxJava 中实现此用例的相关示例和文档的指针都将非常感激。

0 投票
1 回答
1871 浏览

java - Spring WebFlux:block() 方法在 Spring Data Reactive MongoDB 中返回 null

我正在尝试学习 Reactor 项目并遇到问题。

有时 block() 方法返回 null。谁能给我解释一下?谢谢