问题标签 [reactive-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
3886 浏览

java - 在 Spring Boot 应用程序中实现 Reactive Kafka Listener

我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例: https ://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java

看起来反应式 kafka 中还没有对 Spring 的支持

我了解卡夫卡侦听器如何在 Spring 中的非反应式卡夫卡 API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧

但我现在不确定如何在 Spring 中正确使用响应式 kafka。

基本上我需要一个主题的听众。我应该创建自己的某种循环或调度程序吗?或者,也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?

0 投票
1 回答
632 浏览

java - Reactor - 在处理错误的情况下延迟通量元素

我有一个与这个问题类似的问题,我没有看到一个可接受的答案。我已经研究过了,没有得到满意的答案。

我有一个轮询量为“x”的反应式 Kafka 消费者(Spring Reactor),应用程序使用反应式 Web 客户端将轮询的消息推送到反应式端点。这里的问题是外部服务的超时执行可能不同,当我们看到很多故障时,我将不得不调整 Kafka 消费者以在断路器打开(或启动背压)时轮询更少的消息。当前反应堆中有没有办法自动

  1. 当断路器处于断开状态时做出反应,减少轮询量或减缓消耗。
  2. 当电路关闭时,将轮询量增加到以前的状态(如果它关闭,外部服务将按比例增加)。

我不想使用delayElements,或者delayUntil因为它们本质上是静态的,并且希望应用程序在运行时做出反应。如何配置这些端到端背压?当电路关闭、部分关闭和在应用程序配置中打开时,我会为消费者提供值。

0 投票
1 回答
269 浏览

spring-webflux - 事务同步:如何使用 Reactor Kafka 和 R2DBC 创建 ChainedKafkaTransactionManager bean

我的 Spring Boot (WebFlux/R2DBC/Reactor Kafka) 应用程序中有以下消费者

我想为 Kafka 和 DB 事务添加事务同步。阅读文档和一些 stakoverflow 问题后

似乎ChainedKafkaTransactionManager是要走的路。

但是下面的代码不起作用,因为 ChainedKafkaTransactionManager 需要 type 的事务管理器PlatformTransactionManager。所以参数r2dbcTransactionManager不被接受。

还有另一种方法可以实现这一目标吗?

0 投票
1 回答
91 浏览

apache-kafka - 反应程序在将所有消息发送到 Kafka 之前提前退出

这是上一个响应式 kafka 问题(发送数据通量到响应式 kafka 时发出的问题)的后续问题。

我正在尝试使用响应式方法将一些日志记录发送到 kafka。这是使用响应式 kafka 发送消息的响应式代码。

因此,即使Thread.sleep(0, 5);存在,有时它也不会将所有消息发送到 kafka,并且程序存在早期打印成功消息(log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);)。有没有更具体的方法来解决这个问题。例如,使用某种回调,以便该线程将等待所有消息发送成功。

我有一个弹簧控制台应用程序并通过调度程序以固定速率运行 ExecuteQuery,就像这样

0 投票
1 回答
75 浏览

spring-webflux - 超时后停止从 KafkaReceiver 消费

我有一个共同的休息控制器:

我想要实现的是在一段时间内从 Kafka 主题收集消息,然后停止消费并认为这个通量已完成。如果我删除超时并在浏览器中打开它,我将永远收到消息,下载永远不会停止。并且这个超时消耗在 2 秒后停止,但我遇到了一个异常:

有没有办法在超时后成功完成 Flux?

0 投票
2 回答
890 浏览

spring-kafka - 使用嵌入式 kafka + 自定义序列化测试 Reactive-Kafka 消费者和生产者模板

我们需要一个关于如何测试ReactiveKafkaConsumerTemplateReactiveKafkaProducerTemplate使用embedded-kafka-broker. 谢谢。

正确的代码在这里讨论后

你可以有你的自定义de-serializer相应地使用自定义ReactiveKafkaConsumerTemplate

自定义序列化器:

在 Embedded-kfka-reactive 测试中使用它:

0 投票
0 回答
101 浏览

project-reactor - 使用 Reactor Kafka 和 Reactive Redis 构建反应式管道

我正在使用 Kafka 和 Redis 构建反应式管道。现有服务使用来自 Apache Kafka 的事件,应用业务逻辑并最终更新 Redis 集。我正在重构服务以使用反应式 API。

这是使用反应式 API 的示例代码。

以及过程记录

执行后的日志。

我观察到lettuce-nioEventLoop线程甚至用于处理 kafka 事件和 lettuce 回调。我不明白这种行为。有人可以阐明一下吗?

0 投票
1 回答
207 浏览

java - 嵌入式 Kafka 上的空点异常

我正在使用嵌入式 Kafka 对 Spring Boot 应用程序进行单元测试。我正在使用代码 - https://github.com/spring-projects/spring-kafka/blob/29a3bd2021c49b700d4f3835c7ced642322c2faf/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java# L76作为参考。

这是我的代码:

我在 EmbeddedKafkaCondition.getBroker() 行收到以下异常

我想我错过了一个设置,我的嵌入式Kafka 设置不正确,因此为 NULL。但我一直无法弄清楚原因。如果有人可以帮助我了解我所缺少的内容,那将会有很大帮助。

0 投票
1 回答
146 浏览

apache-kafka - 反应式 Kafka 项目中的多个 Kafka 配置

我正在开发一个有 2 项服务的项目:读取、转换消息,然后写入另一个 Kafka。这两种服务的 Kafka 配置是不同的。这是我的 application.yml

这些是我的两个服务的配置文件:

  1. Service1KafkaConfig

    }

  2. 服务2配置

    }

我在各自的服务中自动装配这些 bean:

Service1:我没有为 service1 添加 ProcessRecord 方法,因为我觉得这个问题不需要它。如果需要,请告诉我。

服务2:

当我运行应用程序时,我只能看到一个开始订阅 topic1 的消费者。是否可以在同一个项目中运行多个 Kafka 消费者。如果是的话,你能告诉我需要做什么才能让它们运行吗?

0 投票
1 回答
159 浏览

apache-kafka - 如何使用 Spring Reactive Kafka 实现重试和恢复逻辑

我们正在使用https://github.com/reactor/reactor-kafka项目来实现 Spring Reactive Kafka。但是我们想利用 Kafka 重试和恢复反应式 Kafka 的逻辑。谁能提供一些示例代码?