问题标签 [reactive-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
227 浏览

apache-kafka - 如何在 Spring Reactor Kafka 中创建多个 KafkaReceiver 实例

我有一个反应式 kafka 应用程序,它从一个主题读取数据并写入另一个主题。该主题有多个分区,我想创建与主题中的分区相同数量的消费者(在同一个消费者组中)。根据我从这个线程的理解,.receive() 将只创建一个 KafkaReceiver 实例,该实例将从主题中的所有分区中读取。所以我需要多个接收器来并行读取不同的分区。

为此,我想出了以下代码:

当我对此进行测试时,它似乎工作正常,创建了多个并行处理分区的 Kafka Receiver 实例。我的问题是,这是创建多个实例的最有效方法吗?反应式卡夫卡还有另一种方法可以做到这一点吗?

0 投票
0 回答
105 浏览

apache-kafka - 由于 Reactor Kafka 中的错误而导致消费者停止时创建新消费者

我正在开发一个应用程序,其中每个主题分区都有多个使用者,因此从主题中读取是并发的。我按照此链接确保在现有消费者停止时再次创建消费者。.repeat 将创建新的消费者。我一直在尝试测试这种情况:

下面是我的代码和测试:

测试:

当我运行测试时,我得到记录重试异常 - onError(reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3 in a row (3 total)))

我的理解是 onErrorContinue 将捕获异常然后继续下一条记录。但看起来它正在引发异常。由于它引发异常,因此 repeat() 如何工作?如果有人可以帮助我了解如何测试这种情况,我将不胜感激?

0 投票
1 回答
75 浏览

apache-kafka - Kafka 上的多线程在 Spring reactor Kafka 中发送

我有一个反应式 kafka 应用程序,它从一个主题读取数据,转换消息并写入另一个主题。我在主题中有多个分区,因此我创建了多个消费者以并行读取主题。每个消费者在不同的线程上运行。但看起来 kafka send 运行在同一个线程上,即使它是从不同的消费者调用的。我通过记录线程名称进行测试以了解线程工作流程,每个消费者的接收线程名称不同,但在 kafka 发送 [kafkaProducerTemplate.send] 上,所有消费者的线程名称 [线程名称:producer-1] 都是相同的. 我不明白它是如何工作的,我希望它对发送的所有消费者也有所不同。有人可以帮助我了解这是如何工作的。

0 投票
1 回答
72 浏览

apache-kafka - 安排每 1 分钟运行一次反应流

我有一个反应流,它获取一些数据,遍历数据,处理数据,最后将数据写入 Kafka

我希望这个工作流程每分钟运行一次。有人可以帮助我了解如何在流中安排这个吗?

0 投票
1 回答
47 浏览

java - 生产者作为非反应式 kafka 和消费者作为反应式 kafka

如果我在服务 A 中使用非响应式 Kafka 生产者,在服务 B 中使用响应式消费者会发生什么?