问题标签 [spring-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1279 浏览

apache-kafka - Spring Cloud-Stream Kafka 禁用检查应用程序启动时运行的 Kafka 和 Zookeeper 实例

我有 Spring Boot 应用程序,我在其中配置了 Spring Cloud Stream Kafka,所以当我启动应用程序时,在容器启动时它正在寻找 Zookeeper 和 Kafka 服务器运行实例,如果找不到,它将终止应用程序容器.

所以我的要求是我想要一个类似开关的东西,所以我可以让它可配置,所以当我想让我的应用程序检查 Zookeeper 和 Kafka 实例运行时,我可以说是真的,当我不想时我可以说假的,我希望我的应用程序容器都应该启动。

0 投票
1 回答
1127 浏览

apache-kafka - Spring Cloud Stream Kafka - EmbeddedHeadersMessageConverter - java.lang.StringIndexOutOfBoundsException: String index out of range in

I have a Spring Boot app (app0) that uses Spring Cloud Stream Kafka to read from a topic.

There are two other apps (app1, app2) that produce messages into that topic.

app1 publishes messages using an interface OrderSource:

For instance:

In this case, app0 reads the messages from app1 without any problem.

app2 publishes its messages using KafkaTemplate:

In this case I am observing the following exception from EmbeddedHeadersMessageConverter:

Apparently it is trying to extract headers from the payload of the message. How can I prevent this exception from happening while supporting both sources of messages (KafkaTemplate and OrderSource).

0 投票
2 回答
2415 浏览

spring-cloud - Spring Cloud Stream Kafka - 如何实现幂等性以支持分布式事务管理(最终一致性)

我有以下典型场景:

  • 用于购买产品的订购服务。充当分布式事务的指挥官。
  • 包含产品列表及其库存的产品服务。
  • 一种支付服务。

    /li>

正常的流程是:

  1. 用户订购产品。
  2. 订单服务在数据库中创建订单并在 Kafka 主题“订单”中发布消息以预订产品(PRODUCT_RESERVE_REQUEST)。
  3. 产品服务在其数据库中减少一个单位的产品库存,并在“订单”中发布一条消息说 PRODUCT_RESERVED
  4. 订单服务获取 PRODUCT_RESERVED 消息并命令支付发布消息 PAYMENT_REQUESTED
  5. 支付服务订购付款并回复一条消息 PAYED
  6. 订单服务读取 PAYED 消息并将订单标记为 COMPLETED,完成交易。

我在处理错误情况时遇到了麻烦,例如:让我们假设:

  1. 支付服务未能为产品收费,因此它发布消息 PAYMENT_FAILED
  2. 订单服务响应发布消息 UNDO_PRODUCT_RESERVATION
  3. 产品服务增加数据库中的库存以取消预订并发布 PRODUCT_UNRESERVATION_COMPLETED
  4. 订单服务完成交易,将订单的最终状态保存为 CANCELLED_PAYMENT_FAILED。

在这种情况下,假设无论出于何种原因,订单服务发布了 UNDO_PRODUCT_RESERVATION 消息但没有收到 PRODUCT_UNRESERVATION_COMPLETED 消息,因此它重新尝试发布另一个 UNDO_PRODUCT_RESERVATION 消息。

现在,假设同一订单的这两个 UNDO_PRODUCT_RESERVATION 消息最终到达 ProductService。如果我同时处理它们,我最终可能会为产品设置无效库存。

在这种情况下如何实现幂等性?

更新:

按照 Artem 的说明,我现在可以检测到重复的消息(通过检查消息头)并忽略它们,但可能仍然存在以下情况,我不应该忽略重复的消息:

  1. 订单服务发送 UNDO_PRODUCT_RESERVATION
  2. 产品服务收到消息并开始处理它,但在更新库存之前崩溃。
  3. 订单服务未收到响应,因此它重试发送 UNDO_PRODUCT_RESERVATION
  4. 产品服务知道这是一条重复的消息,但在这种情况下,它应该再次重复处理。

你能帮我想出一种方法来支持这种情况吗?我如何区分何时应该丢弃消息或重新处理它?

0 投票
1 回答
1299 浏览

spring-cloud - Spring Cloud Stream Kafka - 最终一致性 - Kafka 是否自动重试未确认的消息(使用 autocommitoffset=false 时)

实现一个最终一致的分布式架构被证明是一件痛苦的事情。有大量的博客文章讲述了如何做到这一点,但没有展示(代码)如何实际做到这一点。

我遇到的一个方面是在消息没有被确认时必须手动重试消息。

例如:我的订单服务向 Kafka 发送支付事件。支付服务订阅它并处理它,回答支付正常或支付失败

  1. 要求付款:Order Service ----Pay event----> Kafka ----Pay Event ----> Payment Service

  2. 付款OK:->Payment Service ----Payment ok event ----> Kafka ----Payment ok Event ----> Order Service

  3. 支付失败 ->Payment Service ----Payment failure event ----> Kafka ----Payment failure Event ----> Order Service

重点是:

我确定何时使用同步发送将消息传递给 Kafka。但是,我必须知道付款服务已处理付款的唯一方法是期待一个应答事件(付款正常|付款失败)。

这迫使我在订单服务器中实现重试机制。如果在一段时间内没有得到答复,请使用新的 Pay 事件重试。

更重要的是,这也迫使我在支付服务中处理重复的消息,以防它们被实际处理但没有得到订单服务的答案。

我想知道如果消费者没有确认消息的新偏移量,Kafka 是否具有发送重试的内置机制

在 Spring Cloud Stream 中,我们可以将autoCommitOffset属性设置为 false 并处理消费者中偏移量的确认:

如果我们不执行acknowledgment.acknowledge();会发生什么 消息会被 Kafka 自动重新发送给这个消费者吗?

如果有可能,我们将不再需要手动重试,并且可以执行以下操作:

收款人服务:

如果这是可能的,那么 Kafka 中的重试周期是如何配置的?

在最坏的情况(也是最有可能的)情况下,这不受支持,我们将不得不手动重试。您知道 Spring Cloud Stream 应用程序使用 Kafka 处理最终一致性的任何真实示例吗?

0 投票
2 回答
1092 浏览

spring - 从 Spring 组件扫描集成测试中排除特定类

我有一个使用 Spring Boot 框架构建的 Spring Rest 应用程序。现在在编写 Spring 集成测试时,我想排除一个类,使其不被扫描组件。我的这个类包含 Apache Kafka 的依赖项。如果此类在容器启动时加载,它将开始寻找 Kafka 运行实例。

所以在运行集成测试时我不会启动我的 Kafka 服务器,所以我想运行集成测试使 Kafka 关闭。

任何帮助表示赞赏。

0 投票
2 回答
2308 浏览

spring-kafka - Spring Cloud Contract 与 Spring Kafka

Spring Cloud Contract 是否支持开箱即用的 Spring Kafka 消息合约验证?

0 投票
1 回答
570 浏览

apache-kafka - 当卡夫卡生产者设置为同步时,卡夫卡消费者没有被调用

我有一个要求,其中需要维护 2 个主题,其中 1 使用同步方法,其他使用异步方法。异步调用消费者记录按预期工作,但是在同步方法中,消费者代码没有被调用。

下面是配置文件中声明的代码

我在这里启用了 autoFlush true

在返回 recordMetadataResults 对象后,控件将停止,不再对使用者进行任何调用

消费者守则

基于这个问题,我有2个问题

  1. 当监听器类是同步生产者时,我们是否应该手动调用监听器类中的监听()。如果是,该怎么做?
  2. 如果 listener( @KafkaListener) 被自动调用,我需要添加哪些其他设置/配置才能使其正常工作。

感谢您提前输入

-斯里坎特

0 投票
2 回答
11653 浏览

spring-boot - KafkaException:无法实例化类 JsonDeserializer

我正在尝试按照本参考指南使用 Kafka 客户端启动 Spring Boot 应用程序,但出现以下错误。
你能告诉我如何解决吗?

--

0 投票
1 回答
1156 浏览

apache-kafka - 我如何知道我的记录是否已使用 Spring Kafka 手动提交

我想知道在 spring kafkaAckMode中设置为时提交是如何工作的。MANUAL

下面是我在KafkaConfig containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);中设置的属性

listener代码_

我正在acknowledgement按照 spring kafka 文档进行操作,但这仅意味着我的消息被标记为已发送但未使用(这是我的理解)。

  1. 在那种情况下,我应该调用该commitSync()方法吗?如果,我从哪里调用它,因为我需要获取对KafkaConsumer. 如果否,它在内部是如何工作的,我可以跟踪它吗?

  2. 是否有commitId返回值?我的想法是知道是否消费了特定的消费者记录。我想存储该值以用于内部跟踪目的。

  3. kafka 是否在内部维护消费者记录上的任何状态,例如(已确认、已提交未提交),这有助于分类。

这真的可以帮助我区分有多少记录被消耗,有多少正在等待处理以及它们的状态。

0 投票
1 回答
1385 浏览

apache-kafka - 如何在多个消费者处读取相同的 kafka 消息

我对 Kakfa Spring 集成非常陌生。我已经实现了 Kafka 消息发送和 One Listener,它对我来说工作正常。但我想在两个地方收听相同的消息。谁能帮我。下面是我的代码。

发件人代码:

接收者:

任何人都可以帮助我如何在不同的地方阅读相同的消息。