问题标签 [spring-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
5688 浏览

java - 将 ObjectMapper 注入 Spring Kafka 序列化器/反序列化器

我正在使用 Spring Kafka 1.1.2-RELEASE 和 Spring Boot 1.5.0 RC,并且我配置了一个自定义值序列化器/反序列化器类扩展org.springframework.kafka.support.serializer.JsonSerializer/ org.springframework.kafka.support.serializer.JsonDeserializer。这些类确实使用了可以通过构造函数提供的 Jackson ObjectMapper。

是否可以从我的 Spring 上下文中注入 ObjectMapper?我已经配置了一个 ObjectMapper,我想在序列化器/反序列化器中重用它。

0 投票
1 回答
19297 浏览

spring - 单个 Spring 的 KafkaConsumer 监听器可以监听多个主题吗?

任何人都知道单个听众是否可以收听以下多个主题?我知道只有“topic1”有效,如果我想添加其他主题怎么办?你能举出下面两个例子吗?谢谢您的帮助!

或者

0 投票
1 回答
7908 浏览

apache-kafka - 为什么Kafka消费者需要很长时间才能开始消费?

我们启动一个 Kafka 消费者,监听一个可能尚未创建的主题(虽然启用了主题自动创建)。

不久之后,生产者发布了关于该主题的消息。

但是,消费者需要一些时间才能注意到这一点:准确地说是 5 分钟。此时消费者撤销其分区并重新加入消费者组。卡夫卡重新稳定了团队。查看消费者与 kafka 日志的时间戳,这个过程是在消费者端启动的。

我想这是预期的行为,但我想了解这一点。这实际上是在进行重新平衡(从 0 分区到 1 分区)吗?如果我们提前创建主题,这不会发生吗?

卡夫卡日志

0 投票
1 回答
6169 浏览

java - kafka @Listener 异常处理 - 无法配置批量重试

我很陌生Kafka。我试图弄清楚并理解错误场景将如何工作@Listener batch consumer factory

我在做什么 ...

我正在使用进程中的记录topic并将batch它们插入DB如下...

我的问题

  1. 如果由于 DB 不可用而导致任何批处理未能插入 DB 时如何重试
  2. 如何测试 Kafka 服务器是否正在运行并能够连接到特定主题 - 为什么我问这个问题是我在本地Kafka Listener停止zookeeperKafka服务器后尝试但没有错误或异常。Kafka Producer我的意思是在停止但没有发现Template错误后发送消息抛出错误Kafka ServerListener

添加

我的配置

我在这里使用org.springframework.retry.support.RetryTemplate

我得到的例外

0 投票
1 回答
2988 浏览

spring-cloud - spring kafka 在运行时优雅地关闭生产者

我们有一个使用 spring-kafka 的 spring boot 应用程序。当我们更新配置属性时,我们想禁用 kafka 生产者(kafkatemplate)。我尝试过使用条件 bean,并使用 applicationcontext 刷新。

有没有办法用 spring-kafka 优雅地关闭 kafkaproducer ?

0 投票
1 回答
212 浏览

apache-kafka - Spring Kafka 1.1.3-SNAPSHOT 与 Kafka 0.10.1.1 的兼容性

我是 Kafka 新手,我想使用支持流媒体的最新版本(0.10.1.1)。我也想使用 Spring Kafka。

我不明白 Spring Kafka 在其文档中所说的版本兼容性:

这个链接说 Apache Kafka 0.10.xx 客户端

http://docs.spring.io/spring-kafka/docs/1.1.3.BUILD-SNAPSHOT/reference/html/whats-new-part.html

这个链接说:Apache Kafka 0.9.0.1

http://docs.spring.io/spring-kafka/docs/1.1.3.BUILD-SNAPSHOT/reference/html/_introduction.html

0 投票
3 回答
3139 浏览

apache-kafka - 当 Broker 关闭时,Kafka Producer 不会撤消

我已经使用 0.9 版设置了 Kafka,基本配置为 1 Broker 1 Topic 和 1 Partition。

下面是我添加的 Producer 配置,用于启用 Producer 的重试。

我从文件中了解到

设置大于零的值将导致客户端重新发送发送失败并可能出现暂时性错误的任何记录。请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。

我的 Broker 和 Zookeeper 都关闭了,重试操作不起作用。

错误 osksLoggingProducerListener - 向主题 TestTopic1| 发送消息时引发异常 org.apache.kafka.common.errors.TimeoutException:500 毫秒后更新元数据失败。

我需要知道我是否在这里遗漏了任何东西以便重试工作。

0 投票
1 回答
3686 浏览

json - Spring-Kafka 反序列化

我正在尝试制作一个 kafka 消费者,它正在收听特定主题并将消费的消息作为 JSON 处理。我尝试按照这里的 spring 文档给出的方法,但无法将消息作为 JSON 获取。

这是我的接收器配置代码:

消费者:

当我尝试在远程服务器上发布主题时,出现以下错误:

但是,如果我从侦听器中删除 containerfactory,我可以接收消息,但它们不是 JSON 格式,而是字符串:

0 投票
2 回答
3369 浏览

apache-kafka - 如何使用 Spring Cloud Stream Kafka 和每个服务的数据库实现微服务事件驱动架构

我正在尝试实现一个事件驱动的架构来处理分布式事务。每个服务都有自己的数据库,并使用 Kafka 发送消息以通知其他微服务有关操作的信息。

一个例子:

订单收到订单请求。它必须将新订单存储在其数据库中并发布一条消息,以便支付服务意识到它必须为该项目收费:

私人订单业务订单业务;

这些是我的疑问:

  1. 步骤 a.-(保存在 Order DB 中)和 b.-(发布消息)应该在事务中以原子方式执行。我怎样才能做到这一点?
  2. 这与上一个有关:我发送消息是: orderSource.output().send(MessageBuilder.withPayload(order).build()); 此操作是异步的,并且始终返回 true,无论 Kafka 代理是否关闭。我如何知道消息已到达 Kafka 代理?
0 投票
1 回答
802 浏览

apache-kafka - 如何使用 Spring Cloud Stream Kafka 在微服务事件溯源架构中查询事件存储库

澄清:请注意,这个问题与这个问题不同:How to implement a microservice Event Driven architecture with Spring Cloud Stream Kafka and Database per service

这个是关于使用Kafka 作为唯一的存储库(事件),不需要数据库,另一个是关于每个服务使用一个数据库(MariaDB)+ Kafka

我想实现一个事件溯源架构来处理分布式事务:

OrdersService 接收订单请求并将新订单存储在代理中。

这是我的主要疑问:如何查询 Kafka 代理?想象一下,我想按用户/日期、状态等搜索订单。