问题标签 [spring-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
113 浏览

apache-kafka - 有什么办法我只能从旧的偏移量中寻找 1 条消息?

我想知道有什么办法只能从旧的偏移量中寻找 1 条消息吗?因为我只想重新处理 1 条消息。

0 投票
1 回答
57 浏览

apache-kafka - SpringXD/Spring Integration:生产者和消费者使用不同版本的spring-integration-kafka

我有以下配置:

  • spring-integration-kafka 1.3.1.RELEASE
  • 我有一个自定义的 kafka-sink 和一个自定义的 kafka-source

我想要的配置:

  • 我想仍然将 Spring-integration-kafka 1.3.1.RELEASE 与我的自定义 kafka-sink 一起使用。
  • 我正在更改我的 kafka 源逻辑以使用 Spring-integration-kafka-2.1.0.RELEASE。我注意到实现消费者/生产者的方式与以前版本的 Spring-integration-kafka 不同。

我的问题是:我可以面对一些兼容性问题吗?

0 投票
2 回答
7381 浏览

apache-kafka - apache kafka 的异常 - 获取 apikey 请求时出错:3 和 apiVersion:2

我对 Apache Kafka 和 Spring Kafka 相当陌生,并且正在编写一个应用程序,它使用来自主题的信息并执行一些操作。

我使用 @KafkaListener 注释编写了一个简单的消费者,并针对我的本地 Kafka Server 0.10.1.1 对其进行了测试,并且一切正常。

现在,当我指向我们的 Kafka 开发服务器(版本 - 0.10.0.2.5)时,我看到客户端日志中出现以下异常:

此外,我在服务器端日志中看到以下错误:

我搜索了一些,发现这基本上是因为客户端和服务器之间的版本控制问题,但找不到解决这个问题的方法。

0 投票
0 回答
1314 浏览

spring - 当我在消费者属性中设置 client.id 时出现异常

我收到 javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info, id=foo。我现在使用的是 1.1.3 版本。如果我没有在消费者属性中设置 client.Id 一切正常:(

加里,我认为这也是错误。你能看看吗?

0 投票
1 回答
403 浏览

apache-kafka - Spring kafka 性能与原生 kafka api

我是一名使用 Spring Kafka 进行生产/消费的开发人员,现在我对性能有疑问。Spring Kafka 与原生 Kafka API 之间的性能差异是什么?与原生 Kafka API 相比,使用 Spring Kafka 是否有任何性能下降?任何人都曾尝试过这种表演并知道答案吗?谢谢。

0 投票
1 回答
10752 浏览

junit - 如何使用嵌入在 Spring Cloud Stream 中的 kafka 创建单元测试

抱歉这个问题太笼统了,但是有人有一些关于如何使用嵌入的 kafka 执行生产者和消费者测试的教程或指南。我已经尝试了几个,但是有几个版本的依赖项并且没有一个真正有效=/

我正在使用 Spring Cloud Stream 卡夫卡。

0 投票
2 回答
145 浏览

java - 部署 Stream 时出现 SpringXD 错误:找不到 StringDeserializer 类

我正在配置我的自定义 kafka 源,我收到与 value.deserializerkafka 属性相关的错误。

这是我的配置:

我确实看到org.apache.kafka.common.serialization.StringDeserializer了类(我可以单击类名,然后将我带到 jar 文件。

以防万一这是我的 pom 文件中的内容:

这些是我得到的日志:https ://gist.github.com/columb1a/2833f1ac751436df1caa730ce1a0eb37

0 投票
1 回答
1500 浏览

spring - SpringXD:auto-startup=false 不适用于 Kafka 消息驱动通道适配器

我正在使用SpringXD,并且我有以下配置:

  • spring-integration-kafka 2.1.0.RELEASE
  • 卡夫卡客户端 0.10.0.1
  • 卡夫卡 0.10.xx
  • spring-xd-1.3.1.RELEASE

我的 xml 文件中有以下配置:

这是我用来启动/停止通道的 Java 类:

然后我创建了一个基本流来检查我发送到该主题的某些消息是否通过

我检查了创建的文件,它包含了我发送到 Kafka 主题的所有消息:

hola_que_tal que_bonito bridgeStream.out (END)

同样在日志中我发现了这个:

2017-04-10T22:37:06-0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 在阶段 0 中启动 bean 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 支持.DefaultLifecycleProcessor - 启动 bean 'container1' 类型 [class org.springframework.kafka.listener.KafkaMessageListenerContainer] 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 成功启动 bean ' container1' 2017-04-10T22:37:06-0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 在阶段 100 中启动 bean 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache- 0 support.DefaultLifecycleProcessor - 启动 bean 'kafka-inbound-channel-adapter-testing'类型 [class org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter] 2017-04-10T22:37:06-0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 inbound.KafkaMessageDrivenChannelAdapter - 开始 kafka-inbound-channel-adapter-testing 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor -成功启动 bean 'kafka-inbound-channel-adapter-testing'

我的问题是:为什么频道会自动启动?

0 投票
1 回答
813 浏览

java - Spring Integration & Kafka Consumer:在成功获取记录后立即停止消息驱动的通道适配器

我正在使用以下配置:

  • spring-integration-kafka 2.1.0.RELEASE
  • 卡夫卡客户端 0.10.0.1
  • 卡夫卡 0.10.xx
  • spring-xd-1.3.1.RELEASE

我为 SpringXD 创建了我的自定义 Kafka 源模块。我设置了我的消费者逻辑和我的message-driven-channel-adapter(我与 a 一起使用control-bus来停止我的通道适配器)。到目前为止,一切都很好。此外,我还使用 kafka 属性max.poll.record=10来获取每次轮询的 10 条记录。

我想确保在成功获取所有记录(在本例中为 10 条记录)后立即停止我的频道。

所以举个例子:我想避免在不是所有的记录都被成功获取和处理的时候停止读取(即当记录没有被发送到输出通道时)。

有没有办法说出来?

这是我的 xml 配置,以防万一:

[更新 N°1] 我为什么要这样做?这些是详细信息:

  • 我想每Y分钟从 Kafka 主题中读取最多X条消息。
  • max.poll.records用来确保每次投票最多获取X条消息。
  • 我想处理的一种情况是:如果在一次特定的消息轮询中,我轮询的消息少于X会发生什么。这意味着我应该停止通道而不等待X消息,否则我将不得不等到未来的消息轮询才能到达那些X消息。

这些是关于这个场景的一些细节。还有更多场景,但我不想使用相同的 SO 问题来混合它。

[更新 N°2]

阿尔乔姆回答后的一些想法。

  • 如果我没有定义 amax.poll.records并且只是等到达到Y分钟并计算了X条消息,然后stop是通道,会发生什么?
  • 是不是有些消息会因为无法阅读而丢失,或者那些无法阅读的消息在我start重新频道时会被阅读?

我想避免将消息保存在内存中,这就是我使用message-driven-channel-adapter+的原因max.poll.records

0 投票
1 回答
495 浏览

spring-kafka - 如何使用带有 Spring-kafka kafkaTemplate 的 sendDefault 方法的 DefaultPartitioner

我正在尝试向具有 4 个分区的主题发送消息。并且我希望消息按照 DefaultPartitioner 的决定(使用密钥的哈希)进入分区

kafkaTemplate.sendDefault(DefaultPartitioner(job.getId()),job.getId(),job); 我不确定如何让 kafkaTemplate 使用 DefaultPartitioner 来获取分区号。

有人可以帮我解决这个问题。