问题标签 [spring-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring - Spring 与 Kafka 的集成
对于与 Kafka 的 Spring 集成,我注意到 maven 存储库中有 2 个不同的工件可用,如下所示
- 春天卡夫卡
- 弹簧集成卡夫卡
我想了解这两个工件之间有什么区别,我应该选择哪一个?
apache-kafka - spring-kafka大批量加工
使用 spring-kafka 1.0.5,我正在从一个繁忙的主题中消费,它有 10 个分区,并发性为 10。
我当前的代码根据分区 ID 将消息添加到队列中,这些分区 ID 都保存在 HashMap 中。
不幸的是,这种设计所花费的处理时间是简单消费的两倍。
我的要求是单独处理分区,但如何避免使用基于@KafkaListener 的分区引用的哈希图。
有没有更有效的方法来解决这个问题?理想情况下,监听器注释中的每个线程都将管理自己的列表。有没有一种方法可以做到这一点,而无需基于分区 ID 进行交叉引用,例如上面提到的 hashmap?
spring - KafkaMessageListenerContainer 如果有 servicedown 异常重新处理消息
我们希望配置一个 Spring Kafka 侦听器,以便如果任何外部服务关闭,我们不希望丢失从 Kafka 消费的消息。我们希望将其还原,直到它被成功处理。
你能帮忙配置一下我可以用来实现相同的配置吗?
如果我批量消费消息,我该如何处理。
我们正在使用 Kafka 0.9
java - 从同一 JVM 运行 kafka 消费者和生产者时生产者速度慢
我正在使用 kafka 0.8 和 spring-integration-kafka 1.2.0.RELEASE
我有 2 个名为主要和次要的主题。我需要从主要主题消费,经过一些处理后需要生成次要主题,以便稍后完成下一组处理。
虽然主要主题的消费工作正常,但几分钟后生产次要主题开始失败。问题从我设置的 500 毫秒后向 kafka 超时发送请求开始。以线程池耗尽结束。
如果我试图为另一个 kafka 集群的次要主题生成事件,它可以正常工作。
我有 4 个消费者运行两个主题,每个主题都有 200 个分区。
我对卡夫卡有点陌生,请原谅缺乏知识。请评论我应该提供的任何缺失信息。
apache-kafka - Kafka - Spring:kafka 消费者根据偏移量从主题中读取消息
有没有办法根据偏移量使用来自 Kafka 主题的消息。我的意思是我有一个之前在主题中发布的偏移量 ID。现在我需要根据我传递的偏移量 Id 从主题中获取消息。
spring - 在@KafkaListener-method 中确认而不会“丢失”消息
我们目前基本上使用以下简化机制来确认消息:
基本上,只要我们暂时无法处理消息(在 IOExceptions 的情况下),我们希望稍后再接收它。
但这不起作用,因为确认假定同一分区中的所有先前消息都已成功处理。在我们的 IOException 案例中,失败的消息将被跳过,但可能会被同一分区上具有更高索引的不同消息确认。
我们有一些解决这个问题的想法,但这意味着一些讨厌的解决方法,以避免直接在 KafkaListener 方法中调用确认。我们的用例是一个非常具体的用例,还是更像是 spring kafka 用户会假设的“默认”行为?
有针对此类问题的 spring-kafka 解决方案吗?或者你有一个“正确”解决这个问题的想法吗?
spring - Spring Integration - 如何在使用声明性方法发送回 SOAP Web 服务的确认之前添加关于主题的消息
我正在使用入站网关使用 SOAP Web 服务。我们需要将消息放入 kafka 主题中,并使用 Spring 集成的声明性方式向请求者返回同步确认。这可能吗 ?
spring-boot - 无法在 Spring-Boot App 中使用 Spring-integration-kafka 禁用对 Kafka 消息的手动提交
由于我是 kafka 的新手,我的团队正在 Spring-boot 应用程序上使用 Spring-Integration-kafka:2.0.0.RELEASE。根据此处的示例,我能够使用我的 KafkaConsumer 使用 kafka 消息。
对于这个 spring-boot 应用程序,我有我的 Application.java
这是我的 KafkaConsumerConfig.java
最后是我的 Listener.java
我的问题是:
1)在ConsumerConfig设置中,我已经将“ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG”设置为false并注释掉“ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG”,为什么它仍然自动提交?(因为如果我重新启动spring-boot,我将不再看到该消息应用程序-我希望它会尝试重新发送,直到它被提交)
2)为了手动提交,我想我只需要在我的 Listener.listen 中添加 ack.acknowledge() (请参阅占位符)?除了这个,我还需要其他什么来确保这是手动提交吗?
3)我想拥有最简单/最干净的 kafkaconsumer,我想知道你是否认为我所拥有的是最简单的方法,最初我正在寻找单线程消费者但没有很多示例,所以我坚持使用并发监听器.
感谢您的所有帮助和投入!
spring - 如何在springboot中为每个主题动态创建单独的Kafka监听器?
我是 Spring 和 Kafka 的新手。我正在研究一个用例 [使用 SpringBoot-kafka],其中允许用户在运行时创建 kafka 主题。Spring 应用程序应在运行时以编程方式订阅这些主题。到目前为止我所知道的是,Kafka 侦听器是设计时间,因此需要在启动之前指定主题。SpringBoot-Kafka集成中有没有办法动态订阅kafka主题?
参考了这个 https://github.com/spring-projects/spring-kafka/issues/132
我计划实施的当前方法是,不要使用 Spring-Kafka 集成,而是自己实施 Kafka 消费者 [使用 java 代码],如此处所述 spring boot kafka consumer - 如何正确使用 spring boot 中的 kafka 消息
docker - Kafka 生产者抛出“TimeoutException: Batch Expired”异常
我正在为 twitter 测试 Spring Cloud Stream 应用程序,使用以下与 Kafka 相关的环境属性启动 docker 容器,
我的 kafka producerConfig 值如下,
2017-01-12 14:47:09.985 信息 1 --- [itterSource-1-1] Oakafka.common.utils.AppInfoParser:Kafka 版本:0.9.0.1
但是生产者不断抛出以下异常,
我可以从我的 docker 容器远程登录到代理 192.168.127.188:9092 和 2181。此外,我的 kafka 服务器不是 docker 容器。
看到了一些解决方案,例如添加“advertised.host.name”,但没有奏效,或者它是我提供环境道具的正确方式。
有什么帮助吗?