问题标签 [reactor-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
reactive-programming - 反应性地从消息队列中拉取并提交偏移量
我正在使用基于拉的消息队列。当前的消费方法是使用 Spring 的@Scheduled
注解定期从队列中拉取,处理消息并返回队列。您可以假设消费者 API 具有以下方法
如何使用反应器重写它?基本上,我想将消息转化为Flux<Message>
具有背压意识的内容。有没有办法简单地使用现有的 API 来完成结果?还是我需要像reactor-kafka
以前那样实现自己的消费者事件循环?
apache-kafka - Reactor kafka如何在flatMap之后保证至少一次
我有一个从 kafka 消费并将数据存储到数据库的服务。简化逻辑如下:
使flatMap
通量无序。根据反应堆 kafka 文档,acknowledge()
可以确认尚未存储到数据库的记录:
https ://projectreactor.io/docs/kafka/snapshot/api/reactor/kafka/receiver/ReceiverOffset.html
确认与此偏移关联的 ReceiverRecord。偏移量将根据提交配置参数 ReceiverOptions.commitInterval() 和 ReceiverOptions.commitBatchSize() 自动提交。当一个偏移量被确认时,假定该分区中直到并包括该偏移量的所有记录都已被处理。当接收者 Flux 终止时,如果可能,所有已确认的偏移量都将被提交。
如何保证至少一次但不阻塞流?
apache-kafka - KafkaReceiver 没有为每个分区并行运行?
我正在尝试使用基于分区的排序同时读取消息。我参考了下面的示例,并添加了延迟(10 秒)来模拟工作负载。但我注意到即使它运行多个线程,它也不会同时运行。它只运行一条消息。
我的主题由 20 个分区组成,因此我希望至少并行运行 20 个线程并等待 10 秒(我放的延迟),然后一次又一次地运行。
它一个一个地依次打印,并且只从一个分区中消耗。
multithreading - Reactor Kafka - 来自多个主题的并行消费?
TL;博士; KafkaReceiver
如果我想并行处理来自不同主题的消息并且来自不同主题的消息处理时间不同,是否需要每个主题的实例?
我正在尝试 1 个KafkaReceiver
订阅 5 个主题,每个主题有 1 个分区。
来自特定主题的消息的处理速度可能会变慢。为了避免让消费者暂停,因为处理来自其他主题的消息可以正常工作,我将每个主题的消息分组并在单独的线程上进行处理。它看起来像这样:
我想防止的是通过不断消耗消息来填充内存,我不确定何时KafkaConsumer#poll
触发。
由于我是新手,reactor-kafka
所以我猜测在记录poll
时会发生这种情况request(NNN)...
,并且调度程序池中的线程似乎正在调用它:
我在这里假设处理线程(P-4)说“我可以处理更多!!!”。
有时我会得到:
r-coordinator-3
之前用于处理的线程在哪里.publishOn
被添加到管道中。
poll
记录时会发生吗request(NNN)...
?- 为什么有时
P-4
会r-coordinator-3
记录onNext
事件? - 如果在处理线程(例如,准备好进一步处理消息)
poll
时调用,是否会从所有主题中获取消息并最终导致内存不足错误?在“1 KafkaReceiver - 5 个主题”示例中,线程可能正在快速处理并且它可能会频繁调用,而如果轮询从所有主题中获取并且它们很慢,则其他线程会填满它们的队列。P-4
poll
P-4
poll
- 在这个用例中,我唯一的选择是
KafkaReceiver
按主题使用吗?
kubernetes - 具有并行 API 调用问题的 Project Reactor Kafka
问题陈述:消耗来自 Kafka 的百万条记录并旋转并行 API 调用(120 TPS)
我正在使用项目反应器 kafka 来处理 Kafka 消息消耗(每小时 200 万条记录)。一旦我收到 kafka 消息,我需要将并行 API 调用(10 TPS)旋转到“abc.com/actuator”。我测试了 kafka 部分 .. 我能够在 20 分钟内消耗数百万条记录(使用 4 个 Kubernetes Pod)。但是当我旋转 API 调用时,一切都是按顺序进行的, 而不是并行的。此外,API 需要 1000 毫秒才能返回响应(这会增加等待时间)。有人可以帮助了解并行 API 调用有什么问题吗?提前致谢。
Kubernetes 配置:
CPU:300m 内存:10Gi
apache-kafka - Reactor Kafka 的乱序提交机制
我最近看到 Spring-Kafka 项目已经实现了 OOO 提交来处理 Kafka 消息。我想知道如何在 Reactor-Kafka 中使用或实现此功能。没有这个,它可能会导致我们提交一个之前的消息,它的处理还没有完成。
我们寻求的解决方案是跟踪轮询消息并仅在所有先前消息(在同一分区中)已被确认后才提交消息。
我还看到了一些关于其他框架的主题的讨论。如前所述,它已在 Spring-Kafka 中实现。除此之外,还有一篇很棒的文章解释了 Quarkus 对这个问题的实现。就在这里。这真的会帮助我了解任何解决方案。谢谢
apache-kafka - 使用 io.projectreactor.kafka:reactor-kafka jsonserializer 问题将“数据类”放入 Kafka
我在 reactor-kafka 中找不到任何 json 序列化程序,所以我使用org.springframework.kafka.support.serializer.JsonSerializer
我也尝试使用com.fasterxml.jackson.databind.annotation.JsonSerialize
,但它没有用。任何机构都可以帮助我吗?我想使用一些标准的 JSON 序列化器,所以可以用其他语言编写的其他服务可以读取数据而不会出现严重问题。
我的配置类
我的发件人方法
我的数据类
我的 build.gradle 文件的一部分
我收到这样的错误
spring - “发现模棱两可的参数类型无效”?Spring 与 Project Reactor 的集成
我正在使用带有 Spring Integration 的 Project Reactor 从 Kafka 读取并写入 MongoDB,我的 Kafka 使用效果很好,但是.handle(MongoDb.reactiveOutboundChannelAdapter(mongoFactory))
卡住了。我已经看到这个函数的内部代码是new ReactiveMongoDbStoringMessageHandler(mongoFactory))
,所以我尝试了以下方法(我有一个transform()
从 转换为 的方法ConsumerRecord
,Mono<String>
带有@Transformer
注释):
代码遵循文档https://docs.spring.io/spring-integration/reference/html/mongodb.html#mongodb-reactive-channel-adapters。我得到的错误是:
java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match:
然后是很长的函数列表。有什么理由会发生这种情况?
spring - Spring Integration 会导致响应式应用程序阻塞吗?
我正在将 Project Reactor 用于非阻塞 IO 数据管道,并且我正在考虑使用 Spring Integration 作为抽象层来管理管道中的所有流和组件。我目前正在使用 Project Reactor 实现 Kafka 消费者,我开始怀疑 Spring Integration 包装是否会在阻塞和背压方面损坏我的管道。
我想知道在完整的非阻塞应用程序中使用 Spring Integration 及其类是否被认为是安全的,以及是否有任何陷阱我应该注意。
spring - 使用 Spring Integration with Project Reactor 时的主要处理流程编程方法
我想定义一个使用 Reactor Kafka 消耗 kafka 并写入 MongoDB 的流,并且只有在成功时才会将 ID 写入 Kafka。我正在将 Project Reactor 与 Spring Integration JavaDSL 一起使用,并且我希望有一个FlowBuilder
类可以在较高级别上定义我的管道。我目前有以下方向:
我在文档中看到支持另一种方法,该方法也适用于 Project Reactor。这种方法不包括使用IntegrationFlows
. 这看起来像这样:
我想知道在使用这两个库时更推荐的处理方式是什么。我想知道如何在第二个示例中使用 Reactive MongoDB 适配器。我不确定如果没有IntegrationFlows
包装器,第二种方法是否可行。