问题标签 [reactor-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
151 浏览

java - Reactor Kafka 中基于分区排序的并发处理

我正在开发一个示例应用程序,该应用程序将从 Kafka 主题的不同分区读取,同时处理基于分区排序的记录并将记录写入另一个主题的不同分区。这是我写的示例代码

当我运行应用程序时,我没有在控制台中看到打印语句。我确实有要在该主题中使用的数据。所以我想知道代码是否与主题有关。我正在寻求帮助,以了解如何检查它是否连接到主题并阅读消息或这里可能出现的问题。

我是 Java、反应式编程和 Kafka 的新手。这是一个自学项目,我很可能遗漏了一些简单明了的东西。

更多信息:这是我的日志的快照。我有一个名为 metrics 的主题,有 3 个分区 在此处输入图像描述

更新:我没有看到我的打印语句,因为我的主题中有数据,但 auto.offset.reset 设置为最新。将其更改为最早消耗现有数据。

0 投票
1 回答
365 浏览

kafka-consumer-api - Kafka Consumer(使用 reactor kafka)重新加入组并在调用取消订阅后立即分配主题分区

我在退订 Kafka 消费者时遇到问题,请注意我们使用的是 reactor kafka API。它在开始时确实成功取消订阅,但随后它立即加入组并分配了该主题的分区,因此基本上它一直保持订阅状态,并且即使不应该这样做,它也会继续消耗来自主题的消息!

以下是我做这项业务的代码,

当我调用 unsubscribe() 方法时,如下日志所示;它撤销分区并向协调器发送离开组请求。

然而,在随后的一组事件发生并记录之后,该消费者被分配了主题分区并开始从那里消费消息。请注意,我没有在这里调用 subscribe 方法!

使用的版本是

org.apache.kafka:kafka-clients:2.8.0

org.apache.kafka:kafka-streams:2.8.0

io.projectreactor.kafka:reactor-kafka:1.1.0.RELEASE

值得注意的是,还有另一个服务正在为该服务正在使用的主题生成消息。

这可能与 kafka 参数有关,也可能与 kafka 参数无关,但如果您遇到这样的问题并且之前已经解决,请告诉我解决方案。

谢谢,

0 投票
1 回答
311 浏览

java - 使用 Reactor Kafka 实现多个接收器以从主题中的多个分区读取

我在一个主题中有 1000 个分区。我想让一个线程从一个主题的一个分区中读取,转换消息并写入另一个主题。我正在添加多线程以获得更好的吞吐量。我正在尝试使用 reactor-Kafka 来实现这一点 - https://projectreactor.io/docs/kafka/1.3.5-SNAPSHOT/reference/index.html#_introduction 我的理解是在反应堆中,每个接收器都有自己的单线程调度程序,所以我必须创建 1000 个接收器来实现上述场景。我一直在寻找这方面的例子,但我找不到任何例子,我也无法弄清楚如何做到这一点。

这是我从一个主题中的所有分区读取、转换消息并写入另一个主题的代码。

如果有人能给我建议或指出示例以让多个接收者使用来自多个分区的消息,我将不胜感激。

更新代码:

根据@nipuna 的建议,我更新了SampleConsumer.java中的示例代码。但是,这些是我在运行应用程序时收到的打印语句:

因此,相同的线程(“eactive-kafka-reactive-group-1”)被用于使用来自分区的消息。我想让不同的线程消耗来自不同分区的消息。

0 投票
0 回答
368 浏览

spring-boot - Azure Kafka Bootstrap 代理已断开连接

我尝试在 Spring Boot 应用程序中使用 reactor kafka 创建与 Azure 事件中心的 Apache Kafka 连接。起初,我按照 Azure 官方教程设置 Azure 事件中心和 Spring 后端:https ://docs.microsoft.com/en-us/azure/developer/java/spring-framework/configure-spring-cloud-stream -binder-java-app-kafka-azure-event-hub 一切正常,我创建了一些更高级的服务。

但是,当尝试让 reactor kafka 与 Azure 事件中心一起工作时,它不起作用。当消费者被触发时,它不能消费任何消息并记录以下内容:

以下是代码:

反应器代码使用相同的主题和组 id 常量。当客户端记录断开的连接时:

我假设缺少将使用者正确连接到 Azure 事件中心的配置。

0 投票
1 回答
217 浏览

apache-kafka - 在 ReactiveKafkaProducerTemplate 发送方法中模拟 SenderResult

我正在尝试模拟reactiveKafkaConsumerTemplate 的发送方法。

我试图模拟reactiveProducerTemplate 的send 方法以返回SenderResult。有可能这样做吗?如果是,有人可以指向我的文档/示例来执行此操作。我花了很多时间寻找解决方案,但找不到任何解决方案。

更新:根据 Gary 的建议,我尝试了以下操作

我在 .thenReturn(Mono.just(new SendResult<>(record, meta)))) 行得到以下异常。它没有提到异常中的空值,我看不到任何空值。

更新 2:我可以使用 Gary 的代码片段创建模拟。这是我要测试的代码

当我从测试中调用此方法时,我可以看到模板是模拟模板但是,我在 .doOnSuccess(senderResult -> log.info("sent {} offset : {} ", 指标, senderResult.recordMetadata().offset()))

该异常没有给出关于什么是空的任何细节。我确认了 consumerRecord 和 metrics 不为空。

发现问题出在设置上。实际代码需要 3 个参数,并且在设置中我只模拟了 send 方法的 2 个参数。将代码更新为:

0 投票
1 回答
50 浏览

apache-kafka - 使用 reactor Kafka 从主题读取并将消息批量写入 REST 端点

我正在使用响应式 Kafka 开发一个项目,该项目使用来自 Kafka 主题的消息并将消息批量发布到 REST 端点。我被困在批处理部分并将该批处理发送到端点。我需要从主题中读取 N 条消息(此处 N 是可配置的),然后将这 N 条消息发送到 REST 端点。如何使用 reactor Kafka 读取 N 条消息?我查看了https://projectreactor.io/docs/kafka/release/reference/#_overview中的示例,但找不到与我的问题类似的示例。解决这个问题的任何指示都会非常有帮助。

这是我到目前为止阅读来自主题的消费消息的代码

0 投票
2 回答
365 浏览

apache-kafka - Reactor Kafka 中的提交偏移量

我有一个反应器 Kafka 项目,它使用来自 Kafka 主题的消息,转换消息,然后写入另一个主题。

我的理解是只有在反应器中成功完成所有序列步骤后才会提交偏移量。那是对的吗?我想确保不处理下一条记录,除非当前记录成功发送到目标 Kafka 主题。

0 投票
1 回答
189 浏览

spring-kafka - 如何在reactor-kafka中重试失败的ConsumerRecord

我正在尝试使用 reactor-kafka 来消费消息。其他一切正常,但我想为失败的消息添加重试(2)。spring-kafka 默认已经重试了 3 次失败记录,我想使用 reactor-kafka 来实现同样的效果。

我正在使用 spring-kafka 作为响应式 kafka 的包装器。以下是我的消费者模板:

让我们考虑一下consume方法如下

我正在使用以下逻辑在失败时重试使用方法。

如果当前消费者记录因异常而失败,我想重试使用该消息。我试图用另一个 retry(3) 包装消费方法,但这并没有达到目的。最后一次 retryWhen 仅用于在 kafka rebalances 上重试订阅。

@simon-baslé @gary-russell

0 投票
1 回答
158 浏览

java - Reactor onErrorContinue 运算符是否让原始序列继续?

Reactor 错误处理文档 ( https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling ) 指出错误处理运算符不会让原始序列继续。

在学习错误处理运算符之前,您必须记住,反应序列中的任何错误都是终端事件。即使使用了错误处理运算符,它也不会让原始序列继续下去。相反,它将 onError 信号转换为新序列的开始(回退序列)。换句话说,它取代了它上游的终止序列。

但是 onErrorContinue 的 javadoc 声明如下(https://projectreactor.io/docs/core/3.4.10/api/index.html) -

通过从序列中删除犯罪元素并继续处理后续元素,让上游兼容的运算符从错误中恢复。

onErrorContinue 不被视为“错误处理运算符”吗?

它似乎确实允许原始序列继续 -

结果(删除了 3 个但继续使用后续元素)

文档确实指出 onErrorContinue 依赖于运营商的支持。有没有其他方法可以让原始序列(源 Flux)继续适用于所有操作员?如果出现错误(onErrorResume 行为),我不希望使用备用助焊剂来替换我的源助焊剂 - 我只想忽略问题元素并继续使用源助焊剂。

编辑 1(我的用例)

我有一个反应堆 kafka 源通量 & 我想无限地消耗它,不管错误如何。我使用的是 onErrorContinue,但根据在这篇文章中收到的反馈,我已将其替换为 onErrorResume。下面是我目前拥有的代码,但我不确定它是否在所有情况下都有效(通过“工作”,我不断地从 kafka 流式传输,不管任何错误)。请问有什么建议吗?

0 投票
1 回答
243 浏览

spring-webflux - Spring webflux 应用程序中的 Reactor Kafka 健康检查

我有一个 Reactor Kafka 应用程序,它无限期地使用来自某个主题的消息。我需要公开一个健康检查 REST 端点,它可以指示此进程的健康状况 - 主要是想知道 Kafka 接收器通量序列是否已终止,以便可以采取一些措施来启动它。有没有办法知道助焊剂的当前状态(完成/终止等)?该应用程序是 Spring Webflux + Reactor Kafka。

编辑 1 - doOnTerminate/doFinally 不执行