问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1039 浏览

apache-kafka - 使用 enable.idempotence true 时 Spring Cloud Stream Kafka 应用程序的启动极慢

我的 Scs 应用程序有两个具有此配置的 Kafka 生产者:

它在大约 10 秒内开始:

我需要我的生产者是幂等的,所以我设置了enabled.idempotence: true. 通过此更改,启动时间会慢 7 倍(有时甚至超过 10 倍):

如何加快启动速度?

更新:

我在启动过程中发现了Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.一个问题(当它没有出现时,启动速度和以前一样快。

在以下日志中,它仅发生在一个生产者中:

然后在被卡住 30 秒后ProducerId set to 32029 with epoch 0,它记录了第二个生产者的信息消息Proceeding to force close...并初始化了第二个生产者,没有任何问题:

更新 2:

我已经调试了这背后的逻辑,它发生在doBindProducer()方法期间。它获取主题的分区,并为此在KafkaMessageChannelBinder.

正确检索此列表后List<PartitionInfo> partitionsFor,它会卡在 KafkaProducer.destroy() 中,直到 30 秒超时到期:

在此处输入图像描述

为什么会在那里阻塞?会不会是活页夹的bug?

0 投票
1 回答
2068 浏览

spring - 原因:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.springframework.core.convert.support.Defa

我正在研究Spring Cloud Stream Apache Kafka示例。我正在开发参考以下代码的代码:https ://www.youtube.com/watch?v=YPDzcmqwCNo 。

代码:

应用程序属性

pom.xml

0 投票
1 回答
214 浏览

spring-integration - 抽象 Spring Cloud Stream 生产者和消费者代码

我有一个服务,它正在生产和使用来自不同 Spring Cloud Stream Channels 的消息(绑定到 EventHub/Kafka 主题)。有几个这样的服务设置类似。

配置如下所示

Producer/Publisher 代码如下所示

同样,我有多个其他发布者发布到不同的事件中心/主题。请注意,为每个发布的消息设置了一个租户 ID 标头。这是我的多租户应用程序特有的,用于跟踪租户上下文。另请注意,我在发送消息时正在获取要发布的频道。

我的消费者代码如下所示

同样,我有几个这样的消费者,并且基于发布者发送的传入租户 ID 标头在每个消费者中设置了一个租户上下文。

我的问题是

如何摆脱在 Publisher 中设置tenant-id 标头并在 Consumer 中设置租户上下文的样板代码,方法是将其抽象到一个库中,该库可以包含在我拥有的所有不同服务中。

此外,是否有一种方法可以根据正在发布的消息的类型动态识别通道。对于给定场景中的 ex IngestionStatusMessage.class

0 投票
1 回答
3099 浏览

apache-kafka - 如何通过 YAML 中的 Spring Cloud Stream 提供 Kafka Streams 属性?

我想搬到spring.kafka.streams.*下面spring.cloud.stream——这可能吗?我想过streams-properties类似于consumer-propertiesor producer-properties,但它不起作用。

0 投票
1 回答
524 浏览

spring-boot - 适用于 Spring Boot >1.5.20 的 Spring Cloud Stream Kafka Binder 和 Spring Cloud Azure EventHub 兼容版本

我已成功使用 Spring Cloud Stream Kafka Binder (org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.1.RELEASE) 和 Spring Cloud Azure Event Hubs (com.microsoft.azure:spring-cloud-starter -azure-eventhubs:1.2.3) 与 Spring Boot 2.2.6 一起发布和使用来自 Azure 事件中心的消息(启用 Kafka API)。

但是,当我尝试将相同版本的 Spring Cloud 库与 Spring Boot 1.5.22 集成时,我遇到了 java.lang.NoClassDefFoundError: org/springframework/integration/support/converter/ConfigurableCompositeMessageConverter 问题

当我使用 spring-cloud-starter-stream-kafka:1.3.4.RELEASE 和 com.microsoft.azure:spring-cloud-starter-azure-eventhubs:1.1.0 时,我遇到 zookeeper 连接问题可能是由于不同的配置所需的一组属性

我的 application.yaml 如下所示

我想知道我使用的哪些版本的库与 Spring Boot >= 1.5.20 兼容

0 投票
1 回答
519 浏览

spring-cloud-stream-binder-kafka - 如何使用 Spring Cloud Stream 和 Kafka 绑定 Store?

我想在使用Spring Cloud Stream 的 Kafka Binder 的示例应用程序中使用类型为KeyValueStore的 Kafka 状态存储。根据文档,它应该很简单。这是我的主要课程:

我想 KeyValueStore 应该作为“process”方法的第二个参数传递,但应用程序无法启动并显示以下消息:

0 投票
1 回答
216 浏览

apache-kafka - 从 Spring Cloud Streams Kafka Stream 应用程序中的处理器写入主题

我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入一个主题。如何在 Spring Cloud Streams Kafka 应用程序中完成?

0 投票
2 回答
628 浏览

docker - Spring Cloud Stream Kafka 无法连接

我正在使用这个 docker compose:

我的属性是:

但我收到此错误:

我的 Spring Boot 版本是 2.1.4.RELEASE,我的 Spring Cloud 版本是 Greenwich.SR1。

提前致谢!

0 投票
1 回答
1104 浏览

spring-cloud-stream - Spring Cloud Function 和 Kafka

我很难理解我应该如何去测试一个使用 Kafka Binder 同时还使用 Spring Cloud 功能的应用程序。

让我们使用这个非常简单的例子:

在我的 application.yaml 上:

我将如何进行测试?如果我使用 @StreamListener 和 Channels 列表,我会这样做:

但是,对于 Spring Cloud Function,情况并非如此。非常欢迎任何帮助,因为我在官方文档或示例中找不到任何内容!

0 投票
1 回答
415 浏览

spring-kafka - Spring Cloud Stream - 将fasterxml ObjectNode添加到kafka的可信包

我正在使用 Spring-cloud-stream 版本 3.0.4

我正在编写一个 JSON 合并器,它侦听多个流,将 JSON 存储在状态存储中,然后将其合并以生成输出 JSON。由于我的服务只是一个 JSON 聚合器,我不想将 JSONS 转换为 Java 对象。所以我想做的是将上游服务的 JSON 发布为 JsonNode。进行合并并将其发布到下游主题上。

要将 JsonNode 添加为受信任的包,我已经在我的应用程序类中声明了一个 bean,如下所示。

并在我的 application.yml 中添加了以下条目

但是,此配置不起作用,我收到以下错误。

我有 2 个查询

  1. 如何解决上述错误。
  2. 我的方法是最优的还是我应该以不同的方式进行聚合。