问题标签 [reactive-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
764 浏览

apache-kafka - Reactor Kafka:ReactiveKafkaProducerTemplate

我刚刚开始使用 Reactor Kafka。我想知道何时以及为什么使用ReactiveKafkaProducerTemplate而不是 KafkaSender 官方反应堆 kafka 参考指南中找到的?

0 投票
1 回答
1271 浏览

apache-kafka - Reactor Kafka:恰好一次处理样本

我读过很多文章,其中有许多不同的配置可以实现一次处理。

这是我的生产者配置:

这是我的消费者配置:

我读了这个[示例场景][1]

我尝试跟随,但我遇到了一些问题:

这是我的生产者代码:

我的消费者代码:

尝试生成和使用消息时,我总是遇到以下错误:

0 投票
1 回答
2065 浏览

apache-kafka - Kafka Producer Idempotence - Exactly Once 还是 Just Producer Transaction 就足够了?

从这篇文章https://www.confluent.io/blog/transactions-apache-kafka/

使用为至少一次交付语义配置的 vanilla Kafka 生产者和消费者,流处理应用程序可能会以下列方式丢失恰好一次处理语义:

  1. 由于内部重试,producer.send() 可能导致消息 B 的重复写入。这是由幂等生产者解决的,不是本文其余部分的重点。

2.我们可能会重新处理输入消息A,导致重复的B消息被写入输出,违反了恰好一次处理的语义。如果流处理应用程序在写入 B 之后但在将 A 标记为已使用之前崩溃,则可能会发生重新处理。因此,当它恢复时,它将再次消耗 A 并再次写入 B,从而导致重复。

3. 最后,在分布式环境中,应用程序将崩溃,或者——更糟糕的是!——暂时失去与系统其余部分的连接。通常,新实例会自动启动以替换那些被认为丢失的实例。通过这个过程,我们可能有多个实例处理相同的输入主题并写入相同的输出主题,从而导致重复输出并违反恰好一次处理的语义。我们称之为“僵尸实例”问题。</p>

问题

在第 2 点,它提到当应用程序崩溃时,它将消耗 A 并再次写入 B 。但是生产者幂等性不是已经处理了这种发送重复的情况吗?就像第 1 点一样?

#3 点也会导致重复发送,#2 和 #3 不应该与 #1 相同吗?哪些可以使用生产者幂等处理?

0 投票
1 回答
532 浏览

apache-kafka - 反应式 Kafka:使用事务处理一次

最初通过 api 调用触发

这是我的生产者配置:

这是我的消费者配置:

我读了这个示例场景

我尝试跟随,但我遇到了一些问题:

这是我的生产者代码:

我的消费者代码:

在快乐的情况下测试生成和使用单个分区主题的消息时,我总是遇到以下错误。

有人可以告诉我如何正确开始交易,以及何时提交和关闭交易吗?

请注意,如果我不使用事务性-receive() 而不是 receiveExactlyOnce,这一切都可以正常工作

0 投票
0 回答
129 浏览

java - 如何使用 Reactive Kafka 将字符串数据发送到 kafka 生产者?

我有一个字符串格式的数据。我需要将它发送给 kafka 生产者到特定分区,但使用反应式 kafka,即进行异步调用。如何使用代码实现这一点?我检查了响应式 kafka 文档,但不幸的是我对 Java Rx(Mono/Flux) 等不太了解。

如果有人可以提供帮助,那就太好了。欢迎任何需要更多细节的问题。

我能够以非矫正的方式做到这一点。

0 投票
0 回答
253 浏览

spring-cloud - 从 Flux 创建反应式 Kafka SenderRecord

我正在做一个将 Spring Cloud Gateway 与 Reactive Kafka 集成的原型。我的目标是将帖子主体推向卡夫卡。我可以使用Flux,这可能会给我大块的身体。需要专家建议如何将 DataBuffer 的 Flux 转换为 SenderRecord 并将其推送到 kafka 非阻塞。

我尝试使用DataBufferUtils.join合并 DataBuffer 以获取字节 [] 并使用它创建一个 SenderRecord。有没有更好的方法(我们可以避免分配字节数组)吗?

实现细节:我有kafka的Spring Cloud Gateway Global Filter(url结构:kafka://<producer-template>?topic=<topicname>。然后读取body,即flux并创建kafkaSender。

谢谢你。

0 投票
0 回答
374 浏览

apache-kafka - 将数据通量发送到反应式 kafka 时出现问题

我正在使用反应器库从网络中获取大量数据流,并使用反应式 kafka 方法将其发送到 kafka 代理。

下面是我正在使用的 Kafka Producer

Flux 中的记录总数(fCount)和发送到 Kafka 主题的记录(sentCount)不匹配,它没有给出任何错误并成功完成。

例如:在一种情况下,Flux 中的记录总数为 2758,而发送到 kafka 的计数为 256。是否有任何 kafka 配置需要修改,或者我错过了什么?

==================================================== =========

根据评论更新

上面的代码在一个系统中运行良好,但未能在另一个系统中发送所有消息。

0 投票
0 回答
446 浏览

apache-kafka - 卡夫卡消费者线程无限期订阅 - ReactiveKafkaConsumerTemplate

我看到一个问题,一段时间后,消费者随机停止消费来自主题的消息,并且没有抛出任何错误。该主题中没有太多消息,因为这是一个测试环境。

该主题的消费者滞后将> 0,并且会无限期地停留在该数字上(就像没有消费者消费它一样)。我使用 kafka 工具检查了该主题,我可以看到该分区已分配给消费者。

我正在运行多个消费者线程来消费一个主题,这里是代码:

}

我的问题:如果一段时间内没有消息推送到该主题,kafkaConsumperTemplate.receive() 是否有可能自行停止/退出/取消订阅?或者订阅结束后线程会以某种方式退出?

使用反应式 kafka 消费者,没有更多的@KafkaListener注释,所以我使用 ApplicationRunner 并产生消费者线程。是否有另一种启动反应式 kafka 消费者的正确方法?

0 投票
1 回答
76 浏览

reactive-programming - 一个通量的流输出到另一个通量

我从 messageHandler(kafka) 获取数据并通过 servicehandler 处理它给我一些结果

serviceHandler.process(message).getMessage().toString()

现在我需要输出这个结果流,这样每当一条新记录通过消息处理程序时,它就会得到处理并将输出推送到前端(角度)

有谁知道我需要什么的方法?

0 投票
1 回答
121 浏览

apache-kafka - 反应式卡夫卡生产者

我正在探索反应式 kafka,只是想确认反应式 kafka 是否等同于同步生产者。使用同步生产者,我们通过 ACK all 获得消息传递保证,并保持生产者顺序。但是,ASYNC 不能保证交付和排序。反应式生产者是否等同于 SYNC 或 ASYNC?