问题标签 [spring-cloud-stream-binder-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-boot - 为什么 KStreamBinder 没有 DefaultBinding 的生命周期?
我在@StreamBuilder 中使用 KStream 参数。
这将通过 KStreamBinder 创建一个 DefaultBinding。
我的要求是使用Binding可视化和控制。
然而
您无法通过 springboot /actuator/bindings 控制状态,因为生命周期为空。
(POST /actuator/bindings/{bindings-name} {"state":"PAUSED"})
如何控制绑定的状态?
我使用的版本如下。
- org.springframework.cloud:spring-cloud-stream:3.0.1.RELEASE
- org.springframework.cloud:spring-cloud-stream-binder-kafka-core:3.0.1.RELEASE
- org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.0.1.RELEASE
- org.apache.kafka:kafka-streams:2.3.1
请回答我的问题。谢谢你。
spring-boot - @StreamListener 入站消息列表
我目前正在使用@StreamListener
一对一的入站消息,然后在我们的服务类中对其进行处理以将其保存在数据库中。
相反,我想一次入站一个消息列表(比如 100 条),然后处理它以一次保存所有这 100 条消息,以避免重复调用 DB。
我们如何使用@StreamListener
.
apache-kafka - 将 KStream 实体化为全球共享存储?
我在 Java 应用程序(Spring Cloud Stream)中使用 Kafka Streams API。我有一个特殊的用例,如下:
- 我的应用程序将从主题 A 消费,并从主题 B 产生和消费。
- 对于主题 A 上的每条消息,都会为主题 B 生成一组对应的消息,应用程序使用这些消息来跟踪内部状态变化。它使用来自主题 B 的 KStream 来将该状态具体化为可查询的存储。
由于应用程序的多个实例将运行,并且无法保证将任一主题的哪些特定分区分配给实例,因此必须在应用程序之间共享状态存储。否则,如果主题 B 发生重新平衡,则应用程序实例可能会丢失它们正在跟踪的主题 A 消息的状态信息。考虑以下场景:
- 实例 1 具有主题 A 的分区 1 和主题 B 的分区 1。
- 发生主题 B 的分区重新平衡。
- 实例 1 现在具有主题 A 的分区 1(未更改),但具有主题 B 的分区 2。
- 实例 1 现在无法访问它在为主题 B 拥有分区 1 时创建的状态存储中的数据。
如果仅针对主题 A 进行再平衡,则会发生相同的情况。
是否有可能实现为“全球状态存储”?我了解 GlobalKTable 的概念,但我需要使用 KStream 抽象,因为我需要访问完整的事件流。作为参考,我的 KStream 消费者如下:
apache-kafka - 无法在 Spring Cloud kafka 中为每个主题/绑定器级别设置 serde、生产者和消费者属性
我正在尝试使用 spring cloud kafka binder 来启动简单的 pub-sub 应用程序。但是我无法在 application.yml 中设置 Serializer、DeSerialzer 属性和其他生产者和消费者属性。我一直收到序列化/反序列化错误。甚至 spring boot 项目中的 kafka 日志显示生产者和消费者配置仍然是用户 ByteArraySerializer。下面是相同的代码。
pom.xml
处理器.java
KRest.java
@RestController 公共类 KRest {
}
消息.java
最后是 application.yml
版本
- 弹簧启动:2.2.4
- 弹簧云:Hoxton.SR1
- spring-cloud-stream-kafka-binder:3.0.1
apache-kafka - 是否有一个选项可以为特定主题而不是全部配置 num 流线程?
在我们的应用程序中,我们有多个主题,其中一些主题将使用 16 个分区创建,而一些主题将使用 1 个分区创建。是否有任何spring.cloud.stream.kafka.bindings
可用的属性/选项来实现这一目标?
apache-kafka-streams - num.stream.threads 创建空闲线程
我有一个带有 2 个主题的 Spring Boot kafka 流应用程序,考虑主题 A 和 B。主题 A 有 16 个分区,主题 B 有 1 个分区。考虑应用程序部署在num.stream.threads
= 16 的 1 个实例中。我运行 kafka-consumer-groups.bat 命令来检查线程是如何分配给组中的分区的,得到以下输出。主题 A 和 B 分配了 16 个线程,其中主题 B 中的 14 个线程空闲。
如何避免主题 B 中的空闲线程,或者是否有任何选项可用于设置每个主题的 num.stream.threads?
spring-boot - 使用 Spring Cloud Stream Kafka Binder 时无法设置 groupId 和 clientId
我在处理 Spring Cloud Stream Kafka Binder 时遇到了严重问题。Spring Cloud 3.0.2.RELEASE 的配置设置存在很多歧义和一致性问题。我一直在尝试为 Kafka 主题设置组 ID 和客户端 ID,但尽管尝试了各种不同的组合,但我无法正确配置组 ID。
该文档声称我们应该能够通过配置以下设置之一来设置组 ID 和客户端 ID: https ://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference /html/spring-cloud-stream.html#binding-properties
以上配置均不适用于为生产者设置客户端 ID 或为消费者设置组 ID。我得到的唯一进展是通过完全不同的配置设置客户端 ID。
在使用这些设置设置成功设置客户端 ID 后,我尝试为消费者设置组 ID,但令人惊讶的是没有用。
编辑:这是应用程序设置。
应用程序.java
应用程序.yaml
pom.xml
SpringIntegrationService.java
spring - 如何处理弹簧云蒸汽反应中的背压
我正在使用 Spring Cloud 来使用 Kafka 主题,进行一些处理并将结果存储在 Mongo DB 中。我注意到,如果我的消费者处理速度很慢,那么内存消耗会迅速攀升,直到服务停止。
进一步分析表明,Spring Cloud 默认使用 BUFFER 背压策略,因此缓冲区会填满并吃掉所有内存。
我的问题是,Spring 云中有没有办法指定背压策略(而不仅仅是缓冲)?如果没有,有没有办法限制/配置缓冲区大小?
apache-kafka-streams - 检查 StateStore 是否已完全填充
我有一个紧凑的主题,大约有 30 个 Mio 键。我App
将此主题具体化为KeyValueStore
.
如何检查是否KeyValueStore
已完全填充?如果我通过查找密钥,InteractiveQuery
我需要知道密钥是否不存在,因为它还StateStore
没有准备好,或者密钥是否确实不存在。
我以这种方式实现 StateStore:
spring-cloud-stream-binder-kafka - Kakfa 消息在数据库事务之前提交
我无法让 spring-cloud-stream-binder-kafka 用于以下用例:
启动@transaction(Rest 控制器)数据库更新/插入发送 Kafka 消息
在事务提交之前,消费者(使用@EnableBinding 和@StreamListener 配置)能够读取记录。此使用者已经配置了 read_committed 隔离级别。
我不确定这是一个问题还是我这边的任何配置。
尝试配置 bean,ChainedTransactionManager,但有一些其他问题。