问题标签 [reactor-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1145 浏览

java - 在几次尝试处理失败后,在 reactor-kafka 中向 Kafka 提交偏移量

有一个消息到达的 Kafka 主题。我需要阅读一条消息,对其进行处理并继续处理下一条消息。消息处理可能会失败,如果发生这种情况,则必须重试处理几次(比如说 10 次),然后才能继续处理下一条消息。如果处理失败 10 次,则需要丢弃该消息,我们应该继续处理下一条消息。

我们使用reactor-kafka,所有的处理都需要是反应式的。

这是我试图解决这个问题的方法:

(这里receiver是一个KafkaReceiver<String, String>)。

这适用于没有任何异常的情况,如果有异常,processRecord()则重试 10 次。这里的问题是,如果在 10 次允许的尝试后仍然失败,则偏移量不会被提交(当然),所以下次从 Kafka 读取相同的偏移量时,有效地,处理会永远卡在“错误”偏移量上.

我试图实现以下明显的想法:如果异常“传递得比retryBackoff()操作员更远”,则提交当前偏移量。要提交偏移量,我们需要一个ReceiverRecord,因此我将异常包装ExceptionWithRecord与当前记录一起添加:

extractRecordAndMaybeCommit()从给定的异常中提取ReceiverRecord并提交它:

如果重试次数用尽,这种传递记录并稍后提交记录的方法有效,并且该.commit()方法被调用。但它没有效果。

事实证明,当任何异常进入上面的反应管道时,DefaultKafkaReceiver.dispose()都会被调用,因此任何后续的提交尝试都会被忽略。因此事实证明,一旦发布者“看到”任何异常,就根本不可能提交偏移量。

如何在仍在使用的同时实现“在 N 个错误后提交”行为reactor-kafka

0 投票
0 回答
595 浏览

apache-kafka - 使用 Reactor Kafka 发送事件似乎很慢

我的项目使用 Reactor Kafka 1.2.2.RELEASE 将使用 Avro 序列化的事件发送到我的 Kafka 代理。这很好用,但是发送事件似乎很慢。

我们通过 KafkaSender.send() 将自定义指标插入到 Mono 发送事件的生命周期中,并注意到发送消息大约需要 100 毫秒。

我们是这样做的:

send 方法只是构建记录并发送它:

withMetric 转换器将指标链接到发送单声道生命周期:

也就是这个自定义指标,平均返回 100 毫秒。

我们将它与我们的 Kafka 生产者指标进行了比较,并注意到这些指标返回并且平均 40 毫秒来传递一条消息(0 毫秒的排队和 40 毫秒的请求延迟)。

我们很难理解增量,想知道它是否来自 Reactor Kafka 方法来发送事件。

有人可以帮忙吗?

更新

这是我的生产者配置示例:

还有,maxInFlight 是 256,调度器是单的,我这里没有配置什么特别的。

0 投票
1 回答
561 浏览

java - Reactor Kafka 生产者 - 无法重试

我使用 Reactor Kafka(Kafka 的功能性 Java API)创建了一个 KafkaProducer(reactor.kafka.sender.KafkaSender)。使用以下生产者配置,

当我尝试将记录发布到无效主题时,出现超时异常

正如预期的那样。但是我已经配置了没有发生的重试。我假设在max.block.ms/request.timeout.ms已经昏倒之后,每次retry.backoff.ms直到metadata.max.age.msretries用尽后都会重试。仅供参考,代码:

  • 启用重试的配置是否正确?
  • request.timeout.ms/之后何时重试max.block.ms
  • 需要在上面的代码中进行哪些更改以允许重试?
0 投票
0 回答
802 浏览

reactive-programming - Spring Cloud Sleuth 与 Reactor Kafka

我在 Spring Boot Reactive 应用程序中使用 Reactor Kafka,并使用 Spring Cloud Sleuth 进行分布式跟踪。我已将 Sleuth 设置为使用名为“traceId”的标头中的自定义传播密钥。我还自定义了日志格式以在我的日志中打印标题,所以像这样的请求

123456将从控制器开始在下游任何地方打印每个日志。

我现在也希望通过 Kafka 传播此标头。我知道 Sleuth 也为 Kafka 内置了仪器,因此标题应该自动传播,但是我无法让它工作。

从我的控制器中,我在 Kafka 主题上生成一条消息,然后让另一个 Kafka 消费者拿起它进行处理。

这是我的控制器:

这是生成有关 Kafka 主题的消息的代码

我的消费者看起来像这样:

我希望 Sleuth 自动将内置的 Brave 跟踪和自定义标头传递给消费者,以便跟踪覆盖整个事务。

但是我有两个问题。

  1. 生成器 bean 没有得到与控制器中的相同的跟踪。它对发送的每条消息使用不同的(和新的)跟踪。
  2. 跟踪不会从 Kafka 生产者传播到 Kafka 消费者。

我可以通过用一个简单的 Java 类替换生成器 bean 并在控制器中实例化它来解决上面的 #1。然而,这意味着我不能自动装配其他依赖项,而且无论如何它都不能解决#2。

我能够加载 bean 的一个实例,brave.kafka.clients.KafkaTracing所以我知道它是由 Spring 加载的。但是,它看起来仪器不工作。我使用 Kafka Tool 检查了 Kafka 上的内容,并且没有在任何消息上填充任何标题。事实上,消费者根本没有踪迹。

在上面的日志中,[123-21922,578c510e23567aec,578c510e23567aec][custom-trace-header, brave traceId, brave spanId]

我错过了什么?

0 投票
2 回答
817 浏览

spring-boot - 使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的有效负载

我正在使用 reactor kafka 发送 kafka 消息并接收和处理它们。在接收 kakfa 有效负载时,我会进行一些反序列化,如果出现异常,我只想记录该有效负载(通过保存到 mongo ),然后继续接收其他有效负载。

为此,我使用以下方法 -

在这里,我希望每当我在接收有效负载时遇到异常,onErrorResume 应该捕获它并记录到 mongo,然后当我发送到 kafka 队列时我应该继续接收更多消息。但是,我看到异常发生后,即使调用了 onErrorResume 方法,但我无法再处理发送到 Kakfa 主题的消息。有什么我可能在这里遗漏的吗?

0 投票
2 回答
940 浏览

java - 反序列化异常后继续消费reactor kafka中的后续记录

我正在使用 reactor kafka 并有一个自定义 AvroDeserializer 类用于消息的反序列化。

现在我有一个案例,对于某些有效负载,反序列化类会引发异常。我的 Kafka 侦听器在尝试读取此类记录时立即死亡。我尝试使用onErrorReturn(doOnErroronErrorContinue) 的组合来处理此异常,但是,它有助于记录异常,但未能使用后续记录。

在侦听器端,我正在尝试这样处理->

一种选择是不从 Deserializer 类中抛出异常,但我想在单独的系统中记录此类异常以进行分析,因此希望在 kafka 侦听器端处理此类记录。关于如何实现这一点的任何想法?

0 投票
0 回答
571 浏览

kotlin - 如何让应用程序从 Reactor Kafka 请求超时导致的 RetriableCommitFailedException 中自我恢复?

我有一个这样定义的 Kafka 处理器。

有时,我得到这个错误。

第一次重试看起来像这样。

第二个和第三个看起来像这样。

一旦所有 3 次重试都用完,消息将如下所示。

当我确实收到该错误时,我需要重新启动应用程序才能重新连接到 Kafka 代理并提交记录。

我知道通过设置maxCommitAttempts1意味着一旦它击中 a RetriableCommitFailedException,它就不会再重试了。我认为retryWhen我放在processKafkaMessages()函数末尾的子句可以解决问题,以便管道可以自行恢复。

我设置它的原因maxCommitAttempts是因为它没有这里讨论的回退重试,并且默认的最大 100 次提交尝试是在 10 毫秒内完成的。所以,我想我应该编写自己的带有退避的重试逻辑。

问题是,我应该如何正确地进行自动提交的回退重试?是否可以为此使用编写单元测试EmbeddedKafka

语:Kotlin

反应堆卡夫卡库:io.projectreactor.kafka:reactor-kafka:1.2.2.RELEASE

0 投票
1 回答
3886 浏览

java - 在 Spring Boot 应用程序中实现 Reactive Kafka Listener

我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例: https ://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java

看起来反应式 kafka 中还没有对 Spring 的支持

我了解卡夫卡侦听器如何在 Spring 中的非反应式卡夫卡 API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧

但我现在不确定如何在 Spring 中正确使用响应式 kafka。

基本上我需要一个主题的听众。我应该创建自己的某种循环或调度程序吗?或者,也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?

0 投票
0 回答
237 浏览

apache-kafka - Reactor - Kafka - 消费者在处理消息时因错误而停止

我正在使用 Reactor-Kafka 1.2.4,目前面临的问题是在处理消息时遇到错误,Kafka 消费者停止并且不会继续处理其他消息。

我目前只想记录错误消息并继续处理下一条消息,稍后将添加 DLQ 处理等。

我已经将 Reactor-Kafka 升级到 1.3.0,另外,下面的代码是否足够好?

谢谢你。

0 投票
1 回答
269 浏览

spring-webflux - 事务同步:如何使用 Reactor Kafka 和 R2DBC 创建 ChainedKafkaTransactionManager bean

我的 Spring Boot (WebFlux/R2DBC/Reactor Kafka) 应用程序中有以下消费者

我想为 Kafka 和 DB 事务添加事务同步。阅读文档和一些 stakoverflow 问题后

似乎ChainedKafkaTransactionManager是要走的路。

但是下面的代码不起作用,因为 ChainedKafkaTransactionManager 需要 type 的事务管理器PlatformTransactionManager。所以参数r2dbcTransactionManager不被接受。

还有另一种方法可以实现这一目标吗?