问题标签 [spring-cloud-stream-binder-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-cloud-stream - Spring Cloud Stream 分支流未按预期工作
下面是分支代码,它只流向一个主题(第一个主题)。据我了解,它应该流向所有三个主题?
无论如何,我可以使用分支流式传输到三个主题?
处理器的配置
spring-boot - MessageDispatchingException:调度程序没有订阅者
有一个简单的 Spring Cloud Stream 设置。
界面
捆绑
听众
应用属性
一切正常。接收使用发布者发送的消息。
现在我正在尝试使用 KafkaTemplate 从另一个应用程序向该主题发送消息:
这次在接收端抛出错误:
春季版本 5+。
这是一个有效的场景吗,使用 KafkaTemplate 发送消息并期望它们被云流订阅者接收?
java - Spring Cloud Stream 3.0 与 kafka 消费者在批处理模式下获取列表中的单个记录而不是更多
尝试使用 Spring Cloud Stream 3.0 以批处理模式使用 kafka 消息。
消费者收到一个包含单个记录的列表,而不是更多。
下面是 yml ,使用的消费者编码
消费者代码
输出:日志显示在列表中获取了一条记录,而不是我们需要一批 10 条记录
apache-kafka - 来自不同 Kafka 主题的事件数量的聚合(总和)
我的应用程序有三个主题,它们接收一些属于用户的事件:
这将是消息流的一个示例:
我希望能够生成包含每个用户每月事件总数的报告,聚合来自三个主题的所有事件,例如:
拥有三个 KStreams(每个主题一个),我如何每月执行此添加以汇总来自三个不同主题的所有事件?你能显示这个的代码吗?
apache-kafka-streams - Spring Cloud Stream - 在不明确使用 KTable/KStream 的情况下查询主题?
我在 Java 应用程序中使用 Spring Cloud Stream 库。我想将 Kafka Streams 活页夹用于状态存储。该应用程序将向主题发布消息,我希望使用 Kafka StreamsInteractiveQueryService
从同一主题中检索数据。是否可以按原样执行此类查询,还是我需要先将主题作为 KTable/KStream 使用并在执行查询之前将其具体化?我没有对主题进行KTable/KStream处理的任何要求,我只想查询主题内容。我希望有某种方法可以将其隐式实现为状态存储。
spring-cloud - java.lang.ClassCastException: class .$Proxy143 cannot be cast to class .MessageChannel (... are in unnamed module of loader 'app')
我正在为 Spring Cloud Stream 应用程序编写测试。这有一个从 topicA 读取的 KStream。在测试中,我使用 KafkaTemplate 发布消息并等待 KStream 日志出现。
测试抛出以下异常:
此异常不会出现在应用程序的正常执行中。
克流:
测试:
java - Spring Cloud Stream (Hoxton) Kafka Producer / Consumer 无法与 EmbeddedKafka 进行集成测试
我有一个工作应用程序,它使用 Hoxton 附带的 Producers 的最新更新。现在我正在尝试添加一些集成测试,断言 Producer 实际上正在按预期生成消息。问题是,我在测试中使用的消费者从不从主题中读取任何内容。
为了使这个问题可重现,我spring-cloud-stream-samples/source-samples/dynamic-destination-source-kafka
从 spring 云流示例中重新使用了一个项目 (),并对其进行了如下调整:
DynamicDestinationSourceApplication(EmitterProcessor 现在是一个 bean)
模块应用程序测试
它以No records found for topic
. 为什么这在测试期间不起作用?
更新:
我的实际项目的行为与上面的项目不完全一样,我看到的emitterProcessor.onNext()
并没有最终调用AbstractMessageHandler.onNext()
调试到emitterProcessor.onNext()
我已经看到它调用drain()
并且在FluxPublish.PubSubInner<T>[] a = subscribers;
订阅者中是一个空数组,而在正常的应用程序执行中它包含一个 EmitterProcessor。
apache-kafka-streams - Kafka流状态存储rocksdb文件大小在手动删除消息时不会减少
我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过 kafka 密钥在状态存储上使用交互式查询调用确认,但它不会减少目录 tmp/kafka-streams 下本地磁盘上的 kafka 流文件大小。
kafka 流目录大小
我是否需要任何特定配置才能控制文件大小?如果这样不行,删除kafka-streams目录可以吗?我认为它应该是安全的,因为这样的删除将从状态存储和更改日志主题中删除记录。
spring-boot - 在 Spring 云流 Azure Event Hub binder 中使用消息后没有 DB 提交
我有一个事件侦听器 Spring Boot 应用程序,它执行从 Azure 事件中心主题读取操作 -> 将事件持久保存到 DB中。我使用了 spring-cloud-azure-eventhubs-stream-binder(版本 - 1.2.1)的 Sink 来监听我的主题中的事件,它工作得非常好。但是,该事件不会保留在 DB 中。当我查看 JPA 创建的 sql 时,根本没有插入操作。我只能看到在 JPA 保存时执行的 Select 查询。
我只是按照链接eventthubs-binder-sample中的示例进行操作。
在 @StreamListener 注释的方法中执行的任何 JPA 保存操作都不会将数据插入到 DB 中。
任何提示都非常感谢..我想我必须对事务同步(KafkaTransactionManager + JPATransactionManager)做一些事情,但不确定..