问题标签 [reactor-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
41 浏览

apache-kafka - 复制因子应该是一个在 Apache Kafka 中提供最多一次交付语义的因素吗?

据我了解,生产者不得重试任何发送失败,消费者必须在执行某些处理之前提交以提供最多一次交付语义。但是复制因子是否也与交付语义相关联?reactor-kafka 中示例项目中的注释如下所示:

复制因子为 1 的主题与 acks=0 且不重试的生产者相结合,可确保删除第一次尝试时无法发送到 Kafka 的消息

复制因子应该是一个在 Apache Kafka 中提供最多一次交付语义的因素吗?

0 投票
1 回答
286 浏览

reactive-programming - ReactiveKafkaConsumerTemplate receiveAutoAck 的消息排序

我在问自己ReactiveKafkaConsumerTemplatespring-kafka 项目是否确实保证了消息的正确顺序。我阅读了 reactor-kafka 项目的文档,它指出应该使用concatMap运算符来使用消息,但是 ReactiveKafkaConsumerTemplateflatMap至少在receiveAutoAck此处的方法的情况下使用运算符:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java#L69

reactor-kafka 项目的参考文档: https ://projectreactor.io/docs/kafka/release/reference/#_auto_acknowledgement_of_batches_of_records

我有兴趣使用receiveAutoAck它,因为它似乎是最简单和最舒适的方法,足以满足我的用例。克服该方法的这种行为的唯一receiveAutoAck方法似乎是继承 ReactiveKafkaConsumerTemplate 并覆盖这种行为。它是否正确?

0 投票
0 回答
115 浏览

project-reactor - Kafka Consumer 中的手动偏移量

我想写一个 Kafka 消费者并在 Bigquery 中写入记录,我想在 Bigquery 中成功插入时手动提交偏移量。我写了一个示例代码,但它不起作用,有人可以帮忙吗

0 投票
0 回答
67 浏览

java - 可以在没有代理的情况下创建 KafkaReceiver 吗?

我正在使用reactor-kafka为 Kafka 消息创建一个反应式消费者,并且我正在寻找一种创建单元测试的方法。

例如,在以下消费者中,我想验证是否已执行提交调用。

我知道我可以设置 EmbeddedKafka 并根据代理属性创建 KafkaReceiver。这有点开销,因为我希望我的测试专注于反应部分而不是 kafka 部分。

我可以在没有代理的情况下创建 KafkaReceiver 吗?

还是有其他方法可以测试这种设置?

0 投票
2 回答
1077 浏览

spring-boot - Spring Boot Kafka - /actuator/prometheus 中不提供 Kafka 指标

我想监控 Kafka 指标,但不幸的是 /actuator/prometheus 端点下没有任何与 Kafka 相关的内容。我的设置中有什么遗漏吗?

应用程序依赖项:Kotlin 1.4.31、Spring Boot 2.3.9、Spring Kafka 2.6.7、Reactor Kafka 1.2.5、Kafka Clients 2.5.1

应用程序配置

我的接收器看起来像

和听众

0 投票
0 回答
29 浏览

java - 单个服务范围内每个主题的不同 Kafka Producer 配置

我的问题是关于是否有可能以及如何在单个服务范围内为每个主题配置 Kafka Producer 设置?我在文档中发现,您必须在每个主题的 C/C++、.NET、Python、Go 的生产者配置中定义消息持久性,但我没有看到提到 Java,也找不到关于 Java 的任何实现。

一般来说,我想实现特定主题将具有最高的持久性(acks=all),其余主题将在单个服务范围内具有 acks=1。

0 投票
1 回答
159 浏览

project-reactor - Reactor Flux - 仅在完成时从 Publisher 发出

我有一些 Reactor Kafka 代码,它通过 a 读取事件KafkaReceiver并通过 1 或多个KafkaSenders连接到单个Publisher. 一切都很好,但我想做的只是Flux在完成时从这个连接的发送者发出一个事件(即,它已完成写入任何给定事件的所有下游主题,因此它不会为每个元素发出任何内容它向下游写入 Kafka 直到完成)。通过这种方式,我可以sample()并定期提交偏移量,因为我知道每当发生这种sample()情况时,我都会为传入事件提交偏移量,我已经为每个要提交偏移量的事件处理了所有下游消息。看来我可以使用pauseUntilOther()或者then()不知何故,但我不太清楚我的代码和具体用例是如何给出的。任何想法或建议表示赞赏,谢谢。

主要发布者代码:

通过调用返回的串联 KafkaSenders processEvent()

0 投票
1 回答
143 浏览

elasticsearch - 反应式 Kafka 接收器可以与非反应式 Elasticsearch 客户端一起使用吗?

下面是一个示例代码,它使用 reactor-kafka 并从一个主题(带有重试逻辑)读取数据,该主题具有通过非反应性生产者发布的记录。在我的doOnNext()消费者内部,我使用的是非反应性弹性搜索客户端,它对索引中的记录进行索引。所以我有几个问题我仍然不清楚:

  1. 我知道消费者和生产者是独立的解耦系统,但是是否建议同时拥有反应性生产者以及其消费者是反应性的?
  2. 如果我使用的是非反应性的东西,在这种情况下是 Elasticsearch 客户端org.elasticsearch.client.RestClient,代码的“反应性”是否有效?如果有或没有,我该如何测试它?(通过“反应性”,我的意思是它的非阻塞 IO 部分,即如果我产生三个反应性消费者并且一个由于某种原因是潜在的,那么线程应该被解除阻塞并用于其他反应性消费者)。
  3. 一般来说,问题是,如果我用响应式客户端包装一些 API,API 也应该是响应式的吗?

0 投票
1 回答
308 浏览

apache-kafka - Kafka reactor - 如何禁用自动启动的 KAFKA 消费者?

下面是我的 KAFKA 消费者

我的 KAFKA 消费者正在自动启动。但是我想禁用自动启动的 KAFKA 消费者。

我知道了,在春天的 KAFKA 我们可以做这样的事情

但是,我不确定如何在 Kafka 反应器中实现(控制自动启动/停止行为)。我想要下面的东西

引入一个属性来处理自动启动/停止行为

使用上述属性,我应该能够在 Kafka 反应器中设置 KAFKA 自动启动标志,就像这样

注意:.setAutoStart(start);

这在 Kafka 反应器中是否可行,如果可以,我该怎么做?

更新:

我可以做这样的事情吗?

0 投票
1 回答
167 浏览

apache-kafka - 如何在 Spring webflux 和 Kafka 中创建 Bulk 消费者

我需要轮询 kafka 并批量处理事件。在 Reactor kafka 中,由于它是一个蒸汽 API,我将事件作为流获取。有没有办法组合并获得固定的最大事件大小。

这就是我目前正在做的事情。

但是代码只是在 collectList 之后挂起,并且永远不会到达最后一个 flatMap。

提前致谢。