问题标签 [reactive-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 在 Spring Boot 应用程序中实现 Reactive Kafka Listener
我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例: https ://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java
看起来反应式 kafka 中还没有对 Spring 的支持
我了解卡夫卡侦听器如何在 Spring 中的非反应式卡夫卡 API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧
但我现在不确定如何在 Spring 中正确使用响应式 kafka。
基本上我需要一个主题的听众。我应该创建自己的某种循环或调度程序吗?或者,也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?
java - Reactor - 在处理错误的情况下延迟通量元素
我有一个与这个问题类似的问题,我没有看到一个可接受的答案。我已经研究过了,没有得到满意的答案。
我有一个轮询量为“x”的反应式 Kafka 消费者(Spring Reactor),应用程序使用反应式 Web 客户端将轮询的消息推送到反应式端点。这里的问题是外部服务的超时执行可能不同,当我们看到很多故障时,我将不得不调整 Kafka 消费者以在断路器打开(或启动背压)时轮询更少的消息。当前反应堆中有没有办法自动
- 当断路器处于断开状态时做出反应,减少轮询量或减缓消耗。
- 当电路关闭时,将轮询量增加到以前的状态(如果它关闭,外部服务将按比例增加)。
我不想使用delayElements
,或者delayUntil
因为它们本质上是静态的,并且希望应用程序在运行时做出反应。如何配置这些端到端背压?当电路关闭、部分关闭和在应用程序配置中打开时,我会为消费者提供值。
spring-webflux - 事务同步:如何使用 Reactor Kafka 和 R2DBC 创建 ChainedKafkaTransactionManager bean
我的 Spring Boot (WebFlux/R2DBC/Reactor Kafka) 应用程序中有以下消费者
我想为 Kafka 和 DB 事务添加事务同步。阅读文档和一些 stakoverflow 问题后
- Spring Boot 中的事务同步与 Database+ kafka 示例
- Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步
- Spring Kafka 中的事务同步
- Spring @Transactional 与跨多个数据源的事务
似乎ChainedKafkaTransactionManager
是要走的路。
但是下面的代码不起作用,因为 ChainedKafkaTransactionManager 需要 type 的事务管理器PlatformTransactionManager
。所以参数r2dbcTransactionManager
不被接受。
还有另一种方法可以实现这一目标吗?
apache-kafka - 反应程序在将所有消息发送到 Kafka 之前提前退出
这是上一个响应式 kafka 问题(发送数据通量到响应式 kafka 时发出的问题)的后续问题。
我正在尝试使用响应式方法将一些日志记录发送到 kafka。这是使用响应式 kafka 发送消息的响应式代码。
因此,即使Thread.sleep(0, 5);
存在,有时它也不会将所有消息发送到 kafka,并且程序存在早期打印成功消息(log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
)。有没有更具体的方法来解决这个问题。例如,使用某种回调,以便该线程将等待所有消息发送成功。
我有一个弹簧控制台应用程序并通过调度程序以固定速率运行 ExecuteQuery,就像这样
spring-webflux - 超时后停止从 KafkaReceiver 消费
我有一个共同的休息控制器:
我想要实现的是在一段时间内从 Kafka 主题收集消息,然后停止消费并认为这个通量已完成。如果我删除超时并在浏览器中打开它,我将永远收到消息,下载永远不会停止。并且这个超时消耗在 2 秒后停止,但我遇到了一个异常:
有没有办法在超时后成功完成 Flux?
spring-kafka - 使用嵌入式 kafka + 自定义序列化测试 Reactive-Kafka 消费者和生产者模板
我们需要一个关于如何测试ReactiveKafkaConsumerTemplate
和ReactiveKafkaProducerTemplate
使用embedded-kafka-broker
. 谢谢。
正确的代码在这里讨论后
你可以有你的自定义de-serializer
相应地使用自定义ReactiveKafkaConsumerTemplate
自定义序列化器:
在 Embedded-kfka-reactive 测试中使用它:
project-reactor - 使用 Reactor Kafka 和 Reactive Redis 构建反应式管道
我正在使用 Kafka 和 Redis 构建反应式管道。现有服务使用来自 Apache Kafka 的事件,应用业务逻辑并最终更新 Redis 集。我正在重构服务以使用反应式 API。
这是使用反应式 API 的示例代码。
以及过程记录
执行后的日志。
我观察到lettuce-nioEventLoop线程甚至用于处理 kafka 事件和 lettuce 回调。我不明白这种行为。有人可以阐明一下吗?
java - 嵌入式 Kafka 上的空点异常
我正在使用嵌入式 Kafka 对 Spring Boot 应用程序进行单元测试。我正在使用代码 - https://github.com/spring-projects/spring-kafka/blob/29a3bd2021c49b700d4f3835c7ced642322c2faf/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java# L76作为参考。
这是我的代码:
我在 EmbeddedKafkaCondition.getBroker() 行收到以下异常
我想我错过了一个设置,我的嵌入式Kafka 设置不正确,因此为 NULL。但我一直无法弄清楚原因。如果有人可以帮助我了解我所缺少的内容,那将会有很大帮助。
apache-kafka - 反应式 Kafka 项目中的多个 Kafka 配置
我正在开发一个有 2 项服务的项目:读取、转换消息,然后写入另一个 Kafka。这两种服务的 Kafka 配置是不同的。这是我的 application.yml
这些是我的两个服务的配置文件:
Service1KafkaConfig
}
服务2配置
}
我在各自的服务中自动装配这些 bean:
Service1:我没有为 service1 添加 ProcessRecord 方法,因为我觉得这个问题不需要它。如果需要,请告诉我。
服务2:
当我运行应用程序时,我只能看到一个开始订阅 topic1 的消费者。是否可以在同一个项目中运行多个 Kafka 消费者。如果是的话,你能告诉我需要做什么才能让它们运行吗?
apache-kafka - 如何使用 Spring Reactive Kafka 实现重试和恢复逻辑
我们正在使用https://github.com/reactor/reactor-kafka项目来实现 Spring Reactive Kafka。但是我们想利用 Kafka 重试和恢复反应式 Kafka 的逻辑。谁能提供一些示例代码?