问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
385 浏览

spring-cloud-stream - Spring Cloud Stream 分支流未按预期工作

下面是分支代码,它只流向一个主题(第一个主题)。据我了解,它应该流向所有三个主题?

无论如何,我可以使用分支流式传输到三个主题?

处理器的配置

0 投票
2 回答
8096 浏览

spring-boot - MessageDispatchingException:调度程序没有订阅者

有一个简单的 Spring Cloud Stream 设置。

界面

捆绑

听众

应用属性

一切正常。接收使用发布者发送的消息。
现在我正在尝试使用 KafkaTemplate 从另一个应用程序向该主题发送消息:

这次在接收端抛出错误:

春季版本 5+。
这是一个有效的场景吗,使用 KafkaTemplate 发送消息并期望它们被云流订阅者接收?

0 投票
1 回答
2219 浏览

java - Spring Cloud Stream 3.0 与 kafka 消费者在批处理模式下获取列表中的单个记录而不是更多

尝试使用 Spring Cloud Stream 3.0 以批处理模式使用 kafka 消息。

消费者收到一个包含单个记录的列表,而不是更多。

下面是 yml ,使用的消费者编码

消费者代码

输出:日志显示在列表中获取了一条记录,而不是我们需要一批 10 条记录

0 投票
0 回答
539 浏览

spring-boot - Sleuth 与 Spring Cloud Stream Binder Kafka Streams

0 投票
1 回答
327 浏览

apache-kafka - 来自不同 Kafka 主题的事件数量的聚合(总和)

我的应用程序有三个主题,它们接收一些属于用户的事件:

这将是消息流的一个示例:

我希望能够生成包含每个用户每月事件总数的报告,聚合来自三个主题的所有事件,例如:

拥有三个 KStreams(每个主题一个),我如何每月执行此添加以汇总来自三个不同主题的所有事件?你能显示这个的代码吗?

0 投票
1 回答
173 浏览

apache-kafka-streams - Spring Cloud Stream - 在不明确使用 KTable/KStream 的情况下查询主题?

我在 Java 应用程序中使用 Spring Cloud Stream 库。我想将 Kafka Streams 活页夹用于状态存储。该应用程序将向主题发布消息,我希望使用 Kafka StreamsInteractiveQueryService从同一主题中检索数据。是否可以按原样执行此类查询,还是我需要先将主题作为 KTable/KStream 使用并在执行查询之前将其具体化?我没有对主题进行KTable/KStream处理的任何要求,我只想查询主题内容。我希望有某种方法可以将其隐式实现为状态存储。

0 投票
1 回答
1877 浏览

spring-cloud - java.lang.ClassCastException: class .$Proxy143 cannot be cast to class .MessageChannel (... are in unnamed module of loader 'app')

我正在为 Spring Cloud Stream 应用程序编写测试。这有一个从 topicA 读取的 KStream。在测试中,我使用 KafkaTemplate 发布消息并等待 KStream 日志出现。

测试抛出以下异常:

此异常不会出现在应用程序的正常执行中。

克流:

测试:

0 投票
1 回答
423 浏览

java - Spring Cloud Stream (Hoxton) Kafka Producer / Consumer 无法与 EmbeddedKafka 进行集成测试

我有一个工作应用程序,它使用 Hoxton 附带的 Producers 的最新更新。现在我正在尝试添加一些集成测试,断言 Producer 实际上正在按预期生成消息。问题是,我在测试中使用的消费者从不从主题中读取任何内容。

为了使这个问题可重现,我spring-cloud-stream-samples/source-samples/dynamic-destination-source-kafka从 spring 云流示例中重新使用了一个项目 (),并对其进行了如下调整:

DynamicDestinationSourceApplication(EmitterProcessor 现在是一个 bean)

模块应用程序测试

它以No records found for topic. 为什么这在测试期间不起作用?

更新:

我的实际项目的行为与上面的项目不完全一样,我看到的emitterProcessor.onNext()并没有最终调用AbstractMessageHandler.onNext()

调试到emitterProcessor.onNext()我已经看到它调用drain()并且在FluxPublish.PubSubInner<T>[] a = subscribers;订阅者中是一个空数组,而在正常的应用程序执行中它包含一个 EmitterProcessor。

0 投票
1 回答
766 浏览

apache-kafka-streams - Kafka流状态存储rocksdb文件大小在手动删除消息时不会减少

我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过 kafka 密钥在状态存储上使用交互式查询调用确认,但它不会减少目录 tmp/kafka-streams 下本地磁盘上的 kafka 流文件大小。

kafka 流目录大小

我是否需要任何特定配置才能控制文件大小?如果这样不行,删除kafka-streams目录可以吗?我认为它应该是安全的,因为这样的删除将从状态存储和更改日志主题中删除记录。

0 投票
1 回答
132 浏览

spring-boot - 在 Spring 云流 Azure Event Hub binder 中使用消息后没有 DB 提交

我有一个事件侦听器 Spring Boot 应用程序,它执行从 Azure 事件中心主题读取操作 -> 将事件持久保存到 DB中。我使用了 spring-cloud-azure-eventhubs-stream-binder(版本 - 1.2.1)的 Sink 来监听我的主题中的事件,它工作得非常好。但是,该事件不会保留在 DB 中。当我查看 JPA 创建的 sql 时,根本没有插入操作。我只能看到在 JPA 保存时执行的 Select 查询。

我只是按照链接eventthubs-binder-sample中的示例进行操作。

在 @StreamListener 注释的方法中执行的任何 JPA 保存操作都不会将数据插入到 DB 中。

任何提示都非常感谢..我想我必须对事务同步(KafkaTransactionManager + JPATransactionManager)做一些事情,但不确定..