问题标签 [spring-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-integration - Sink 组件在 Spring Cloud 数据流中无法使用 kafka 获取正确的数据
我不是以英语为母语的人,但我尽量清楚地表达我的问题。我遇到了这个困扰我两天的问题,但我仍然找不到解决方案。
我已经构建了一个流,它将在 Hadoop YARN 中的 Spring Can 数据流中运行。
流由 Http 源、处理器和文件接收器组成。
1.Http Source
HTTP Source 组件有两个输出通道绑定两个不同的目的地,即在 application.properties 中定义的 dest1 和 dest2。
spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2
以下是 HTTP 源代码片段供您参考。
2.处理器
处理器有两个多输入通道和两个输出通道绑定不同的目的地。目标绑定在处理器组件项目的 application.properties 中定义。
下面是处理器的代码片段。
3.文件接收器组件。
我使用 Spring 的官方 fil sink 组件。maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT
我只是在其 applicaiton.properties 文件中添加目标绑定。spring.cloud.stream.bindings.input.destination=fileSink
4.发现:
我期望的数据流应该是这样的:
Source.handleRequest() -->Processor.handleRequest()
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
应该只将字符串“由 transform2 处理”保存到文件中。
但是经过我的测试,数据流实际上是这样的:
Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
“由transform1处理”和“由transform2处理”字符串都保存到文件中。
5.问题:
虽然 Processor.handleRequest() 中输出通道的目的地绑定到 hdfsSink 而不是 fileSink,但数据仍然流向文件 Sink。我无法理解这一点,这不是我想要的。我只希望来自 Processor.handleRequest2() 的数据流到文件接收器而不是两者。如果我做得不对,谁能告诉我该怎么做以及解决方案是什么?这让我困惑了2天。
感谢您的热心帮助。
亚历克斯
spring-kafka - 从 KafkaTemplate 获取主题元数据
我从 KafkaTemplate 实现中看到,无法访问实际的 Kafka Producer。虽然这种 Producer 包装可能很好,但需要 Kafka Producer 中的一些方法,例如metrics()
和partitionsFor(java.lang.String topic)
。在 KafkaTemplate 中,我们可以使用这些相同的方法来包装实际的 Kafka Producer 方法。这可能会在较新的版本中实现吗?我可以实现它并提出拉取请求吗?
spring-boot - 带有注释的spring boot中的kafka侦听器容器不消耗消息
不确定我是否做错了什么,但我无法让它发挥作用。下面是我的代码:
虽然我能够产生信息。我正在使用 spring boot 版本 1.4.0.RELEASE 和 spring-integration-kafka 版本 2.0.1.RELEASE。
java - 从 Kafka 多次读取同一条消息
我使用Spring Kafka API来实现具有手动偏移管理的 Kafka 消费者:
someCondition
在这里,我希望消费者仅在持有时才提交偏移量。否则消费者应该休眠一段时间并再次阅读相同的消息。
卡夫卡配置:
使用当前配置,如果someCondition == false
,消费者不会提交偏移量,但仍会读取下一条消息。如果没有执行 Kafka,有没有办法让消费者重新阅读消息acknowledgement
?
spring-kafka - Spring kafka消息消费者延迟
我正在使用 Spring kafka 1.0.3 来消费 kafka 消息。kafka中有2个主题,每个主题有1个分区。在 java 代码中,有 2 个 @KafkaListener 来消费每个主题消息。ConcurrentKafkaListenerContainerFactory 的并发设置为 1。但消息有时会延迟 20+ 秒。
@KafkaListenr 代码如下:
kafka 消息由 spring kafka 发送,日志在这里:
日志中有一些partitions assigned:[]
,我不知道为什么assigned是空的。而在 webOutMessage 消息被消费后2016-09-26 16:16:45,870
,下一个动作是消费者在 15+ 秒后重新平衡2016-09-26 16:17:01,099
,所以下一条消息是在 15+ 秒后延迟。
有人知道为什么吗?
添加调试日志,不是每次都有延迟,有时候还可以:</p>
spring-boot - messages published to all consumers with same consumer-group in spring-data-stream project
I got my zookeeper and 3 kafka broker running locally. I started one producer and one consumer. I can see consumer is consuming message.
I then started three consumers with same consumer group name (different ports since its a spring boot project). but what I found is that all the consumers are now consuming (receiving) messages. But I expect the message to be load-balanced in that only messages are not repeated across the consumers. I don't know what the problem is.
Here is my property file
Here the group is timerGroup.
consumer code : https://github.com/codecentric/edmp-sample-stream-sink
producer code : https://github.com/codecentric/edmp-sample-stream-source
java - Spring 集成 Kafka 与 Spring Kafka
我们正在尝试在 worker spring 应用程序和消费者 spring 应用程序之间实现消息通道(在多个 JVM 上会有同一个消费者的副本)
使用 Java Config,spring 集成的文档有限,我能够找到 spring Kafka 的文档。我不确定依赖项是如何工作的,
- Spring Kafka 集成是基于 Spring Kafka 的。请对此提出一个想法?
- 我在哪里可以找到 Spring Integration Kafka 新版本的适当文档?
java - spring cloud stream kafka:“输入”的重复@StreamListener映射
我有一个生产者(Source
)将消息写入 Kafka 输出通道,该生产者发送不同类型的对象。在消费者方面 ( Sink
) 我想@Streamlistener
在同一个输入通道上有多个方法,每个方法都有不同的参数类型,目前我得到它不起作用Duplicate @StreamListener mapping for 'input'
。
这是我的消费者代码:
有没有比使用单个@StreamListener
方法更好的解决方案来测试消息类型以选择要调用的方法?
spring - Spring集成kafka如何在生产到kafka时处理错误
在使用 int-kafka:outbound-channel-adapter 生产到 kafka 时,似乎没有可用的错误通道。在这种情况下,如何处理 reties 次后无法向 kafka 生成消息?
任何可能导致生产卡夫卡失败的错误。(以下代码只是来自互联网的代码片段,只是想知道如何为其添加错误句柄)
spring-integration - 如何优雅地关闭 Kafka 通道适配器?
我正在使用 aKafkaMessageDrivenChannelAdapter
来使用来自 Kafka 主题的消息。这些消息被放置在一个通道上,并由单个线程同步处理。
我希望能够优雅地关闭系统,这意味着不再从主题中消耗消息,并且正在处理的消息应该在关闭之前完成。
我可以stop()
在通道上调用该方法KafkaMessageDrivenChannelAdapter
,但我不确定当前可能正在处理消息的线程会发生什么。为了调查这种行为,我一直在使用一个简单地休眠一段时间的虚拟组件来测试如果我关闭它会发生什么。事实证明,InterruptedException
抛出了 an,这可能意味着下游处理可能无法正常完成。
什么是优雅关闭 Kafka 通道适配器的正确方法,它可以保证处理中的消息能够完成?