问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
80 浏览

spring-boot - 为什么 KStreamBinder 没有 DefaultBinding 的生命周期?

我在@StreamBuilder 中使用 KStream 参数。

这将通过 KStreamBinder 创建一个 DefaultBinding。

我的要求是使用Binding可视化和控制。

然而

您无法通过 springboot /actuator/bindings 控制状态,因为生命周期为空。

(POST /actuator/bindings/{bindings-name} {"state":"PAUSED"})

如何控制绑定的状态?

我使用的版本如下。

  • org.springframework.cloud:spring-cloud-stream:3.0.1.RELEASE
  • org.springframework.cloud:spring-cloud-stream-binder-kafka-core:3.0.1.RELEASE
  • org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.0.1.RELEASE
  • org.apache.kafka:kafka-streams:2.3.1

请回答我的问题。谢谢你。

0 投票
0 回答
56 浏览

spring-boot - @StreamListener 入站消息列表

我目前正在使用@StreamListener一对一的入站消息,然后在我们的服务类中对其进行处理以将其保存在数据库中。

相反,我想一次入站一个消息列表(比如 100 条),然后处理它以一次保存所有这 100 条消息,以避免重复调用 DB。

我们如何使用@StreamListener.

0 投票
1 回答
499 浏览

apache-kafka - 将 KStream 实体化为全球共享存储?

我在 Java 应用程序(Spring Cloud Stream)中使用 Kafka Streams API。我有一个特殊的用例,如下:

  • 我的应用程序将从主题 A 消费,并从主题 B 产生和消费。
  • 对于主题 A 上的每条消息,都会为主题 B 生成一组对应的消息,应用程序使用这些消息来跟踪内部状态变化。它使用来自主题 B 的 KStream 来将该状态具体化为可查询的存储。

由于应用程序的多个实例将运行,并且无法保证将任一主题的哪些特定分区分配给实例,因此必须在应用程序之间共享状态存储。否则,如果主题 B 发生重新平衡,则应用程序实例可能会丢失它们正在跟踪的主题 A 消息的状态信息。考虑以下场景:

  • 实例 1 具有主题 A 的分区 1 和主题 B 的分区 1。
  • 发生主题 B 的分区重新平衡。
  • 实例 1 现在具有主题 A 的分区 1(未更改),但具有主题 B 的分区 2。
  • 实例 1 现在无法访问它在为主题 B 拥有分区 1 时创建的状态存储中的数据。

如果仅针对主题 A 进行再平衡,则会发生相同的情况。

是否有可能实现为“全球状态存储”?我了解 GlobalKTable 的概念,但我需要使用 KStream 抽象,因为我需要访问完整的事件流。作为参考,我的 KStream 消费者如下:

0 投票
2 回答
1004 浏览

apache-kafka - 无法在 Spring Cloud kafka 中为每个主题/绑定器级别设置 serde、生产者和消费者属性

我正在尝试使用 spring cloud kafka binder 来启动简单的 pub-sub 应用程序。但是我无法在 application.yml 中设置 Serializer、DeSerialzer 属性和其他生产者和消费者属性。我一直收到序列化/反序列化错误。甚至 spring boot 项目中的 kafka 日志显示生产者和消费者配置仍然是用户 ByteArraySerializer。下面是相同的代码。

pom.xml

处理器.java

KRest.java

@RestController 公共类 KRest {

}

消息.java

最后是 application.yml

版本

  • 弹簧启动:2.2.4
  • 弹簧云:Hoxton.SR1
  • spring-cloud-stream-kafka-binder:3.0.1
0 投票
2 回答
426 浏览

apache-kafka - 是否有一个选项可以为特定主题而不是全部配置 num 流线程?

在我们的应用程序中,我们有多个主题,其中一些主题将使用 16 个分区创建,而一些主题将使用 1 个分区创建。是否有任何spring.cloud.stream.kafka.bindings可用的属性/选项来实现这一目标?

0 投票
1 回答
1139 浏览

apache-kafka-streams - num.stream.threads 创建空闲线程

我有一个带有 2 个主题的 Spring Boot kafka 流应用程序,考虑主题 A 和 B。主题 A 有 16 个分区,主题 B 有 1 个分区。考虑应用程序部署在num.stream.threads= 16 的 1 个实例中。我运行 kafka-consumer-groups.bat 命令来检查线程是如何分配给组中的分区的,得到以下输出。主题 A 和 B 分配了 16 个线程,其中主题 B 中的 14 个线程空闲。

如何避免主题 B 中的空闲线程,或者是否有任何选项可用于设置每个主题的 num.stream.threads?

0 投票
2 回答
2463 浏览

spring-boot - 使用 Spring Cloud Stream Kafka Binder 时无法设置 groupId 和 clientId

我在处理 Spring Cloud Stream Kafka Binder 时遇到了严重问题。Spring Cloud 3.0.2.RELEASE 的配置设置存在很多歧义和一致性问题。我一直在尝试为 Kafka 主题设置组 ID 和客户端 ID,但尽管尝试了各种不同的组合,但我无法正确配置组 ID。

该文档声称我们应该能够通过配置以下设置之一来设置组 ID 和客户端 ID: https ://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference /html/spring-cloud-stream.html#binding-properties

以上配置均不适用于为生产者设置客户端 ID 或为消费者设置组 ID。我得到的唯一进展是通过完全不同的配置设置客户端 ID。

在使用这些设置设置成功设置客户端 ID 后,我尝试为消费者设置组 ID,但令人惊讶的是没有用。

编辑:这是应用程序设置。

应用程序.java

应用程序.yaml

pom.xml

SpringIntegrationService.java

0 投票
0 回答
375 浏览

spring - 如何处理弹簧云蒸汽反应中的背压

我正在使用 Spring Cloud 来使用 Kafka 主题,进行一些处理并将结果存储在 Mongo DB 中。我注意到,如果我的消费者处理速度很慢,那么内存消耗会迅速攀升,直到服务停止。

进一步分析表明,Spring Cloud 默认使用 BUFFER 背压策略,因此缓冲区会填满并吃掉所有内存。

我的问题是,Spring 云中有没有办法指定背压策略(而不仅仅是缓冲)?如果没有,有没有办法限制/配置缓冲区大小?

0 投票
2 回答
553 浏览

apache-kafka-streams - 检查 StateStore 是否已完全填充

我有一个紧凑的主题,大约有 30 个 Mio 键。我App将此主题具体化为KeyValueStore.

如何检查是否KeyValueStore已完全填充?如果我通过查找密钥,InteractiveQuery我需要知道密钥是否不存在,因为它还StateStore没有准备好,或者密钥是否确实不存在。

我以这种方式实现 StateStore:

0 投票
1 回答
41 浏览

spring-cloud-stream-binder-kafka - Kakfa 消息在数据库事务之前提交

我无法让 spring-cloud-stream-binder-kafka 用于以下用例:

启动@transaction(Rest 控制器)数据库更新/插入发送 Kafka 消息

在事务提交之前,消费者(使用@EnableBinding 和@StreamListener 配置)能够读取记录。此使用者已经配置了 read_committed 隔离级别。

我不确定这是一个问题还是我这边的任何配置。

尝试配置 bean,ChainedTransactionManager,但有一些其他问题。