问题标签 [spring-cloud-stream-binder-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 使用 enable.idempotence true 时 Spring Cloud Stream Kafka 应用程序的启动极慢
我的 Scs 应用程序有两个具有此配置的 Kafka 生产者:
它在大约 10 秒内开始:
我需要我的生产者是幂等的,所以我设置了enabled.idempotence: true
. 通过此更改,启动时间会慢 7 倍(有时甚至超过 10 倍):
如何加快启动速度?
更新:
我在启动过程中发现了Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.
一个问题(当它没有出现时,启动速度和以前一样快。
在以下日志中,它仅发生在一个生产者中:
然后在被卡住 30 秒后ProducerId set to 32029 with epoch 0
,它记录了第二个生产者的信息消息Proceeding to force close...
并初始化了第二个生产者,没有任何问题:
更新 2:
我已经调试了这背后的逻辑,它发生在doBindProducer()
方法期间。它获取主题的分区,并为此在KafkaMessageChannelBinder
.
正确检索此列表后List<PartitionInfo> partitionsFor
,它会卡在 KafkaProducer.destroy() 中,直到 30 秒超时到期:
为什么会在那里阻塞?会不会是活页夹的bug?
spring - 原因:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.springframework.core.convert.support.Defa
我正在研究Spring Cloud Stream Apache Kafka
示例。我正在开发参考以下代码的代码:https ://www.youtube.com/watch?v=YPDzcmqwCNo 。
代码:
应用程序属性
pom.xml
spring-integration - 抽象 Spring Cloud Stream 生产者和消费者代码
我有一个服务,它正在生产和使用来自不同 Spring Cloud Stream Channels 的消息(绑定到 EventHub/Kafka 主题)。有几个这样的服务设置类似。
配置如下所示
Producer/Publisher 代码如下所示
同样,我有多个其他发布者发布到不同的事件中心/主题。请注意,为每个发布的消息设置了一个租户 ID 标头。这是我的多租户应用程序特有的,用于跟踪租户上下文。另请注意,我在发送消息时正在获取要发布的频道。
我的消费者代码如下所示
同样,我有几个这样的消费者,并且基于发布者发送的传入租户 ID 标头在每个消费者中设置了一个租户上下文。
我的问题是
如何摆脱在 Publisher 中设置tenant-id 标头并在 Consumer 中设置租户上下文的样板代码,方法是将其抽象到一个库中,该库可以包含在我拥有的所有不同服务中。
此外,是否有一种方法可以根据正在发布的消息的类型动态识别通道。对于给定场景中的 ex IngestionStatusMessage.class
apache-kafka - 如何通过 YAML 中的 Spring Cloud Stream 提供 Kafka Streams 属性?
我想搬到spring.kafka.streams.*
下面spring.cloud.stream
——这可能吗?我想过streams-properties
类似于consumer-properties
or producer-properties
,但它不起作用。
spring-boot - 适用于 Spring Boot >1.5.20 的 Spring Cloud Stream Kafka Binder 和 Spring Cloud Azure EventHub 兼容版本
我已成功使用 Spring Cloud Stream Kafka Binder (org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.1.RELEASE) 和 Spring Cloud Azure Event Hubs (com.microsoft.azure:spring-cloud-starter -azure-eventhubs:1.2.3) 与 Spring Boot 2.2.6 一起发布和使用来自 Azure 事件中心的消息(启用 Kafka API)。
但是,当我尝试将相同版本的 Spring Cloud 库与 Spring Boot 1.5.22 集成时,我遇到了 java.lang.NoClassDefFoundError: org/springframework/integration/support/converter/ConfigurableCompositeMessageConverter 问题
当我使用 spring-cloud-starter-stream-kafka:1.3.4.RELEASE 和 com.microsoft.azure:spring-cloud-starter-azure-eventhubs:1.1.0 时,我遇到 zookeeper 连接问题可能是由于不同的配置所需的一组属性
我的 application.yaml 如下所示
我想知道我使用的哪些版本的库与 Spring Boot >= 1.5.20 兼容
spring-cloud-stream-binder-kafka - 如何使用 Spring Cloud Stream 和 Kafka 绑定 Store?
我想在使用Spring Cloud Stream 的 Kafka Binder 的示例应用程序中使用类型为KeyValueStore的 Kafka 状态存储。根据文档,它应该很简单。这是我的主要课程:
我想 KeyValueStore 应该作为“process”方法的第二个参数传递,但应用程序无法启动并显示以下消息:
apache-kafka - 从 Spring Cloud Streams Kafka Stream 应用程序中的处理器写入主题
我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入一个主题。如何在 Spring Cloud Streams Kafka 应用程序中完成?
docker - Spring Cloud Stream Kafka 无法连接
我正在使用这个 docker compose:
我的属性是:
但我收到此错误:
我的 Spring Boot 版本是 2.1.4.RELEASE,我的 Spring Cloud 版本是 Greenwich.SR1。
提前致谢!
spring-cloud-stream - Spring Cloud Function 和 Kafka
我很难理解我应该如何去测试一个使用 Kafka Binder 同时还使用 Spring Cloud 功能的应用程序。
让我们使用这个非常简单的例子:
在我的 application.yaml 上:
我将如何进行测试?如果我使用 @StreamListener 和 Channels 列表,我会这样做:
但是,对于 Spring Cloud Function,情况并非如此。非常欢迎任何帮助,因为我在官方文档或示例中找不到任何内容!
spring-kafka - Spring Cloud Stream - 将fasterxml ObjectNode添加到kafka的可信包
我正在使用 Spring-cloud-stream 版本 3.0.4
我正在编写一个 JSON 合并器,它侦听多个流,将 JSON 存储在状态存储中,然后将其合并以生成输出 JSON。由于我的服务只是一个 JSON 聚合器,我不想将 JSONS 转换为 Java 对象。所以我想做的是将上游服务的 JSON 发布为 JsonNode。进行合并并将其发布到下游主题上。
要将 JsonNode 添加为受信任的包,我已经在我的应用程序类中声明了一个 bean,如下所示。
并在我的 application.yml 中添加了以下条目
但是,此配置不起作用,我收到以下错误。
我有 2 个查询
- 如何解决上述错误。
- 我的方法是最优的还是我应该以不同的方式进行聚合。