问题标签 [spring-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
4087 浏览

spring - spring-kafka (not integration) consumer not consuming message

I am integrating my application with spring-kafka (not spring-integration-kafka). Here is spring documentation for project : http://docs.spring.io/spring-kafka/docs/1.0.1.RELEASE/reference/htmlsingle

My producer works perfectly but consumer is not consuming any messages. Any pointers.

Here is my configuration:

** Edited with more information **

Thanks Gary for response. I don't see any exceptions in log. Also I tried KafkaTemplate with similar configuration and i am able to publish message to queue but for consumer, no luck. I am changing code to use String instead of my Message object. Here are parts of log:

Also I do see following in log:

0 投票
0 回答
127 浏览

java - Kafka 订阅者仅获得备用消息

我有一个主题汽车,只有 1 个分区和 1 个复制因子,如下所示:

Topic:cars PartitionCount:1 ReplicationFactor:1 Configs: Topic: cars Partition: 0 Leader: 0 Replicas: 0 Isr: 0

我有两个经纪人在运行,一个在 9092 上,另一个在我的本地主机上的 9093 上。

我的 java 应用程序向主题汽车发送消息。我看到一个非常奇怪的行为。

  1. 我的 Java 应用程序只接收备用消息,例如消息 #1、消息 #3.. 等等

为了调试这个问题,我启动了一个控制台消费者。现在我只能看到我的 java 应用程序没有收到的那些消息。例如,消息 #2、消息 #4 .. 等等

很明显,我的消息已正确发布到主题,那么,是什么导致我的 java 应用程序只接收备用消息?

最后,为什么消费者控制台只展示上述行为?

0 投票
2 回答
8954 浏览

spring - Spring Kafka,使用嵌入式 Kafka 进行测试

我们正在观察我们的 Servicetest 和嵌入式 Kafka 的奇怪行为。

测试是一个 Spock 测试,我们使用 JUnit 规则 KafkaEmbedded 并传播 brokersAsString,如下所示:

通过检查 KafkaEmbedded 的代码,构造一个实例KafkaEmbedded(int count)会导致一个 Kafka 服务器,每个主题有两个分区。

为了解决测试中的分区分配和服务器-客户端同步问题,我们遵循 spring-kafka 的 ContainerTestUtils 类中的策略。

当我们观察日志时,我们注意到以下模式:

请注意 [..] 指示省略行

我们设置metadata.max.age.ms为 3000 毫秒,结果它会尝试频繁地刷新元数据信息。

现在让我们感到困惑的是,如果我们等待两个分区连接,等待就会超时。仅当我们等待一个分区连接时,一段时间后一切都会成功运行。

我们是否理解错误的代码,嵌入式 Kafka 中每个主题有两个分区?只有一个分配给我们的 Listeners 是否正常?

0 投票
0 回答
1071 浏览

apache-kafka - Spring Cloud Stream 和 Kafka 集成错误处理

我正在尝试使用 Spring Cloud Stream 和 Kafka 集成创建一个 Spring Boot 应用程序。我在 Kafka 中创建了一个带有 1 个分区的示例主题,并已从根据此处给出的方向创建的 Spring Boot 应用程序发布到该主题

http://docs.spring.io/spring-cloud-stream/docs/1.0.2.RELEASE/reference/htmlsingle/index.html

https://blog.codecentric.de/en/2016/04/event-driven-microservices-spring-cloud-stream/

Spring Boot 应用程序 -

卡夫卡生产者类

除了我想处理异常时,一切都运行良好。

如果 Kafka 主题出现故障,我可以在应用程序的日志文件中看到 ConnectionRefused 异常,但内置的重试逻辑似乎是在不停地重试!

根本没有抛出异常供我处理和做进一步的异常处理。我已经阅读了上面 Spring Cloud Stream 文档中 Kafka 的 Producer 选项和 Binder 选项,我看不到任何自定义选项可以让我一直捕获这个异常。

我是 Spring Boot / Spring Cloud Stream / Spring Integration 的新手(这似乎是云流项目的底层实现)。

你们还有什么知道可以将此异常级联到我的 Spring Cloud Stream 应用程序的吗?

0 投票
1 回答
9918 浏览

java - SpringBoot/spring-kafka 应用程序中的 Autowired KafkaTemplate 抛出空指针

我正在尝试在 Spring Boot 应用程序中使用spring-kafka KafkaTemplate 将消息写入 Kafka 主题。

我创建了一个 KafkaConfig 类:

...并在我写信给 Kafka 的课程中​​自动装配了 KafkaTemplate:

出于某种原因,自动装配似乎不起作用。当我在调试中运行时,我注意到 KafkaTemplate 为空:

模板为空

该对象不应为空;它应该是一个 KafkaTemplate 对象。这会引发空指针异常:

kafka-spring 过去使用几乎相同的代码对我来说效果很好,所以我有点困惑为什么这次自动装配不起作用。你能看出我做错了什么吗?

0 投票
2 回答
3642 浏览

apache-kafka - kafka 消费者第一次获得记录时,只获得一条记录?

我正在使用 spring-kafka 和 spring-kafka-test 版本 1.0.2.RELEASE。

在我的一个测试中,我的应用程序使用 KafkaTemplate 和大多数默认配置设置连续发送 100 条记录到 EmbeddedKafka 实例上的单个 TopicPartion。

我使用 KafkaTestUtils.getRecords(consumer) 方法尝试从 Kafka 实例中获取记录并验证它们是否已全部发送。

第一次调用 getRecords 时,我只收到一条记录。如果我再次调用它,我会得到另一个 99。

如果我明确地将消费者的位置设置为 TopicPartition 的开头,然后调用 getRecords,我会得到全部 100。

为什么 getRecords 第一次只能得到一条记录?有没有更好的方法来一次获得所有 100 个,然后通过显式调用消费者的 seekToBeginning ?

0 投票
1 回答
1553 浏览

kafka-consumer-api - 如何使用 spring KafkaMessageListenerContainer.java 确保消息不会丢失

一旦我收到来自 kafka 的消息,我需要运行一个长时间运行的进程(最多需要 20 秒),只有当这个进程完成时,我才需要认为一条消息是成功的。

我还需要确保每条消息至少处理一次。

考虑使用具有以下属性的 KafkaMessageListenerContainer:

  1. listenerTaskExecutor 的 ThreadPoolTask​​Executor

  2. 使用 AcknowledgeingMessageListener 类型的 MessageListener

  3. 将确认模式设置为 MANUL_IMMEDIATE。

但我唯一的问题是,如果首先成功处理偏移量为 15 的特定消息,但仍在处理 14 的消息,会发生什么情况。所以在这种情况下,我的偏移量将更新为 15 ,即使 14 尚未处理

如何处理这类情况?

0 投票
1 回答
1348 浏览

spring-integration - 如何在 Spring XD 中从 Kafka 接收器模块获取确认时手动提交 Kafka 源模块上的偏移量?

在 XD 流中,消息通过源模块从 Kafka 主题消费,然后发送到接收器 Kafka 模块。开发自定义源和接收器 Kafka 模块背后的原因是,我只想在成功发送消息时从下游接收器模块得到确认时更新源模块的偏移量。

我正在使用 Spring Integration Kafka 2.0.1.RELEASE 和 Spring Kafka 1.0.3.RELEASE 以及 Kafka 0.10.0.0 环境中的主题。我尝试了以下方法:

源模块配置:

源模块:InboundKafkaMessageDrivenAdapter

接收器模块:配置

接收器模块:SinkActivator

源成功接收消息并将它们发送到接收器,但是当我尝试在接收器中获取确认时:

确认确认 = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

抛出以下异常:

原因:java.lang.IllegalArgumentException:为标头“kafka_acknowledgment”指定的类型不正确。预期 [interface org.springframework.kafka.support.Acknowledgment] 但实际类型是 [class org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment]

在 spring-integration-kafka-2.0.1.RELEASE 的源代码中,当 AckMode=MANUAL 时,KafkaMessageListenerContainer 类将 kafka_acknowledgment 标头添加到消息中,但是类型是 ConsumerAcknowldgment 的内部静态类。

那么如何从源发送的消息上从接收模块获得确认?

0 投票
1 回答
7196 浏览

spring-integration - Spring Kafka 监听所有主题并调整分区偏移量

根据spring-kafka的文档,我使用基于注释的@KafkaListener 来配置我的消费者。

我看到的是——

  1. 除非我将偏移量指定为零,否则在开始时,Kafka 消费者会选择未来的消息而不是现有的消息。(我知道这是一个预期的结果,因为我没有指定我想要的偏移量)

  2. 我在文档中看到一个选项来指定主题 + 分区组合以及零偏移量,但如果我这样做 - 我必须明确指定我希望我的消费者收听的主题。

使用上面的方法 2,这就是我的消费者现在的样子 -

虽然我知道有一个选项可以指定“topicPattern”通配符或“主题”列表作为注释配置的一部分,但我没有看到可以提供从零开始的偏移值的地方列出的主题/主题模式。有没有办法将两者结合起来?请指教。

0 投票
1 回答
1297 浏览

spring-integration - 在使用 Spring Kafka 与 XML 配置集成的 Spring XD 模块中找不到类 org.apache.kafka.clients.consumer.RangeAssignor

当我尝试在我的 xd 模块中构建一个 Kafka 消费者时,我收到了以下异常:

这是我的xml:

依赖项:

有谁知道为什么我得到这个类没有发现异常?我检查了 Kafka 0.10.0 的 API 并且该类存在,所以我不确定为什么会收到此异常。这可能与 Spring XD 加载类的方式有关吗?我知道 Spring XD 在其 lib 文件夹中附带了 Kafka 0.8.2,所以也许 Spring XD 先加载 Kafka 0.8.2,然后它因此无法找到该类?

任何见解将不胜感激!