问题标签 [reactor-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
216 浏览

kafka-consumer-api - RetryBackoffSpec 不适用于引发异常的 KafkaReceiver

我有一个用例,我想无限地继续接收来自 Kafka 的记录,并对记录进行一些处理,使用processRecord(String record)which can throw a RuntimeException. 我想重试多次(比如 5 次),如果在 5 次重试之前的任何时间都成功,想要手动提交偏移量并继续下一条记录,如果不是,那么想要(记录它-> 提交偏移量)然后继续接下来的记录。我有一个代码,但似乎不能正常工作。将不胜感激一些帮助。

我收到的输出是:

它没有重试 5 次,而且 AtomicInteger 没有更新第二条记录。

我想要实现的是:

0 投票
1 回答
149 浏览

apache-kafka - 发送一批消息后是否应该关闭KafkaSender?

应用程序是一个长期消息接收器,应用程序接收上游消息并将消息转换为小消息,然后将这些小消息放入 Kafka。

我发现 Reactor Kafka 的文件说:

反应堆卡夫卡指南

当不再需要 KafkaSender 时,可以关闭 KafkaSender 实例。底层 KafkaProducer 关闭,关闭所有客户端连接并释放生产者使用的所有内存。

我的问题是:

发送每批消息后是否需要关闭 KafkaSender ?在应用程序中使用 KafkaSender 的单例实例并在应用程序关闭时关闭它是否有意义?

谢谢。

0 投票
1 回答
158 浏览

spring-webflux - spring-cloud-stream Kafka Reactive 流分区分配如何与并发处理一起工作

例如,我配置一个主题有 2 个分区,但在我的应用程序中有 1 个实例,我Flux.parallel(10)用来消费消息,并且该主题有 1000 条消息延迟,会发生什么情况?

  1. 它会每次轮询 10 条消息吗?从 2 个分区还是 1 个分区?
  2. 仅轮询 2 条消息和 1 个分区?

我想知道它是如何工作的,所以我可以正确配置它,使其具有大吞吐量和消耗序列的能力

顺便说一句,我发现了这个问题,但现在在那里回答

0 投票
1 回答
232 浏览

java - 如何通过事务从反应式生产者中无法访问的 Kafka 中恢复?

我有一个简单的反应式 Kafka 生产者,我需要它来手动提交事务。在 Reactor Kafka 中的javadoc 方法 begin()之后,我创建了一个生产者方法

它的配置是

它运行良好,我正在向 Kafka 发送消息。但现在我需要保护它免受我们公司潜在的 Kafka/网络中断的影响。为了模拟这一点,我停止了我的 Kafka docker,发送一条消息,一段时间后再次启动 Kafka。但是奇怪的事情正在发生。我遇到了 3 种可能的情况:

  1. 当我在一分钟左右启动 Kafka 时,生产者恢复了连接和事务,并且发送完成得很好
  2. 当我等待更长时间(约 3 - 4 分钟)时,发件人失败

此状态不可恢复,每次后续尝试均失败

我必须重新启动应用程序

  1. 当用户取消请求(即触发此发送的 REST 调用)然后 Kafka 再次出现时,每个后续请求都会失败

这种状态也是不可恢复的,我要重启应用。

问题

如何从这种情况中恢复过来?我想要实现的是生产者等待 fe 20 秒,如果没有重新建立连接,那么只需抛出异常并丢弃请求。最重要的是,系统不应该因为无法访问的 Kafka 而崩溃。

我试过的

起初我以为我需要手动中止事务kafkaTemplate.transactionManager().abort()in some.doOnError()但这不起作用(在场景 3 中是 fe。代码不会引发任何异常)。然后我尝试了ProducerConfig尝试TRANSACTION_TIMEOUT_CONFIGREQUEST_TIMEOUT_MS_CONFIG或者DELIVERY_TIMEOUT_MS_CONFIG请求没有超时,不确定我做错了什么。

0 投票
2 回答
326 浏览

java - Reactor Kafka 中没有创建订阅错误

消费者配置文件:这里我对键和值都使用 StringDeserializers。并且订阅已针对单个主题进行。

消费者代码:我的登录消费者代码将订阅的主题打印为空。AppUtility 正在将数据转换为字符串。

日志:

pom进口:

有没有人遇到过这样的问题?我无法解决这个问题。

0 投票
2 回答
227 浏览

java - 如何在发送从 reactor kafka 主题消耗的前 100 条消息后有条件地发布额外的 SSE Web Flux 事件

我正在使用带有 spring webflux SSE 流的 reactor-kafka 库来使用来自 kafka 主题的数据。当来自主题的所有消息都被消耗时,我需要返回一个特殊的 ServerSentEvent,即最大主题偏移量等于从 0 偏移量订阅时消耗的当前偏移量。以便客户知道 kafka 主题中没有更多消息。

是否可以使用 Web Flux 实现这样的目标?即,如果我说在从任何有限的元素列表中消耗每 100 个元素并通过 SSE 流作为 ServerSentEvent 发送之后,这个 SSE 流应该再获得一个事件作为 SeverSentEvent 并带有注释“已使用”。

0 投票
1 回答
146 浏览

apache-kafka - 反应式卡夫卡在生产消息时不断收到超时异常

log:reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.kafka.common.errors.TimeoutException: Topic topic not present in metadata after 60000 ms. Caused by: org.apache.kafka.common.errors.TimeoutException: Topic topic not present in metadata after 60000 ms.尝试在 kafka 上生成消息时不断获取。

已经确保我在生产者项目中有 Jackson 核心、Jackson 数据绑定和 Kafka 客户端依赖项。另外我如何在反应堆 kafka SenderOptions 中传递安全协议

0 投票
2 回答
260 浏览

apache-kafka - JHipster 与 kafka 真实应用使用示例

JHipster版本6.6.0中,Kafka 使用模型已从标准的生产者/消费者类更改为 WebResource 级别。没有真实的例子,这种变化的优势是什么,以及如何在实际应用程序中使用这种变化。

假设我们有Service AService B。这两个服务之间的通信必须通过 Kafka 事件来完成。

问题是 - 我必须这样做服务 B开始监听来自服务 A主题的事件。在当前配置中,看起来我必须手动触发/consumes端点,但这没有任何意义,因为我期望服务将在应用程序启动并运行后开始侦听指定的主题列表。

我将不胜感激对此主题的任何评论,以帮助我理解这一点。

示例:jhipster 7.1.0 生成此资源:

服务 A -网关

服务 B -代理

这是唯一可用的与 Kafka 相关的代码。

在 6.6.0 版本之前,JHipster生成标准的生产者/消费者类,我可以使用它们来定义要收听的主题。现在还不清楚如何使用生成的代码来发出/监听事件。

0 投票
0 回答
63 浏览

project-reactor - 在reactor-kafka中立即生效?

尝试实现一个重试记录处理器,该处理器在发生瞬时故障时回溯到最后一条记录的偏移量,而不是在不确定的时间内进入内部重试循环,并且如果错误持续足够长的时间,则冒着消费者会话超时和重新平衡的风险。

将最大轮询记录设置为 1,在此代码开始之前,一个主题中有两条记录,Mono。当一直失败时,我看到它反复寻找 0 和 1,而不是只读取第一条记录,直到它成功。

我错过了什么吗?如何在不切换到阻塞 API 的情况下完全序列化 poll 并寻求调用以处理记录,并根据需要进行尽可能多的重试?

期望具有 SeekToCurrentErrorHandler 的 @KafkaListener 的等效项也可以在 RK 中工作,但发现并非如此

0 投票
1 回答
305 浏览

multithreading - 使用 reactor-kafka 使用不同的线程从 Kafka 中的消费者组中读取数据

我需要从拥有数百万数据的 Kafka 主题中消费。从主题中阅读后,我需要对其进行转换并将其写入另一个主题。我能够使用来自主题的消息,通过多个线程处理数据并写入另一个主题。我按照这里的例子https://projectreactor.io/docs/kafka/1.3.5-SNAPSHOT/reference/index.html#concurrent-ordered

这是我的代码:

由于数据量大,我有多个消费者可供阅读,并且正在考虑添加不同的阅读器线程以从主题中读取。然而,reactor-kafka文档提到 KafkaReceiver 不是线程安全的,因为底层的 KafkaConsumer 不能被多个线程同时访问。

我正在寻找有关同时阅读某个主题的建议。