问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
623 浏览

apache-kafka - 带有 apache-kafka-binder 的 Spring-cloud-stream 功能模型

这是这个问题的续集。我可以将“普通”Apache Kafka Binder 与功能模型一起使用吗?到目前为止,我使用基于注释的配置混合了这两种配置,以便在一个应用程序中spring-cloud-stream-binder-kafka进行简单的消费/生产和spring-cloud-stream-binder-kafka-streams高级流处理。

功能模型似乎仅由streams活页夹支持,如果我尝试混合使用这两种方法 - 基于注释的简单用法和功能的流,流绑定未注册。

@EnableBinding(SimpleBinding.class)存在于配置类中。是否首选/支持按所述混合两者,或者我是否应该将其streams-binder用于简单的消息消费?

0 投票
1 回答
1164 浏览

spring-boot - 是否可以在 Spring Cloud Stream 中配置多个绑定到同一个处理器?

我有一个简单的 Spring Integration 流,它绑定到处理器以进行输入和输出。

我已将 Kafka 活页夹配置为映射输入和输出主题。这完美无缺。

假设我想将 3 个 Kafka 主题绑定到输入,导致 3 个配置的消费者从三个单独的 kafka 主题中提取,然后由我的 SI 流处理。

是否可以将多个 Kafka 主题映射到我的处理器的输入?如果是这样,该配置的外观如何?

0 投票
1 回答
815 浏览

apache-kafka-streams - Spring Cloud Stream 将值生成为包含 JSON 而不仅仅是 JSON 的字符串

在使用 Spring Cloud Stream 的流处理应用程序中,我正在获取输入流(以整数为键)并调用selectKey它以创建具有相同值但具有不同键(字符串)的新主题。输入主题中包含正确 JSON 格式的记录,例如:

问题是流处理应用程序创建的主题具有value包含 JSON 的字符串,而不是正确的 JSON,即:

代码如下:

上面的函数所做的是消耗输入流,并生成一个输出流(被发送到output)。该输出流与输入流具有相同的值,但只是一个不同的键。(在这种情况下,键来自值的publicId属性。)

application.yml如下:

有什么我想念的吗?这实际上是一个问题,还是可以将 JSON 作为字符串存储在 Spring Cloud Stream 生成的消息中?

我尝试过的其他没有什么影响的事情:

  • 使用本机解码/编码
  • 设置spring.cloud.stream.bindings.output.content-typeapplication/json
  • 使用map代替selectKey
0 投票
1 回答
1950 浏览

apache-kafka - 如何使用 Spring Cloud Stream 仅发送自定义 Header

我必须在 kafka 消息中设置自定义标头,我的 kafka 集群本机支持标头(1.xx)。我目前正在使用 springCloudVersion=Finchley.RELEASE。

当我设置属性时

没有任何标题出现在输出中。

另一方面,如果我正在设置

许多标头包括contentTypespring_json_header_types即将出现在标头中,这极大地影响了吞吐量。Kafka 是一种与语言/框架无关的消息传递机制,我觉得 Spring 应该提供一种仅包含用户提供的标头的方法。是否有任何解决方法可以仅将用户设置的标头获取到 kafka 主题,同时抑制所有与 Spring Cloud 相关的标头。

0 投票
1 回答
1337 浏览

apache-kafka-streams - 如何使用 Kafka Streams 对 Spring Cloud Stream 进行单元测试

我一直在尝试让 Spring Cloud Stream 与 Kafka Streams 一起工作一段时间,我的项目使用嵌入式 kafka 来测试 Kafka DSL,我使用这个存储库作为我的测试实现的基础(它本身就是一个测试用例这个问题)。

我在这里创建了一个存储库来演示这一点。

基本上,当使用使用“Processor.class”和 MessageChannel 作为实现的“DemoApplicationTest.ExampleAppWorking.class”时,测试是成功的。

但是当使用使用自定义绑定和 KStream 作为实现的“DemoApplicationTest.ExampleAppNotWorking.class”时,它会失败。

第二个用例中的错误是这样的:

我错过了什么?

0 投票
1 回答
273 浏览

spring-kafka - 时间窗口内的 Spring Kafka 批处理

Spring Boot 环境监听 kafka 主题(@KafkaListener / @StreamListener) 配置监听工厂以批处理方式运行:

或通过application.properties

如何配置框架,以便给定两个数字:N 和 T,它将尝试为侦听器获取 N 条记录,但不会等待超过 T 秒,如下所述:https ://doc.akka.io/docs /akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
我看过的一些属性:

  • max-poll-records 确保您不会在批次中获得超过 N 个数字
  • fetch-min-size在 fetch 请求中至少获取此数量的数据
  • fetch-max-wait但不要等待超过必要的时间
  • idleBetweenPolls在民意调查之间睡一会儿:)

似乎fetch-min-size结合fetch-max-wait应该这样做,但它们比较字节,而不是消息/记录。

显然可以手动实现,我正在寻找是否可以为我配置 Spring。

0 投票
1 回答
1076 浏览

kotlin - Spring Cloud Stream/Function:不使用 Java Function/BiFunction 的 Kotlin lambdas

我有一个使用 Spring Boot 编写的 Kafka 流处理应用程序,使用spring-cloud-functionspring-cloud-stream-binder-kafka-streams. 处理几个流的方法用 注释,因此它应该被(而不是使用)@Bean拾取。当此方法返回 a时,它可以工作。但是当我将它作为普通的 Kotlin lambda 进行尝试时,Spring Boot 并没有发现它:应用程序启动然后立即结束,因为它没有找到要运行的函数。spring-cloud-function@StreamListenerBiFunction

从我在文档中可以看到,这应该可行。

这是有效的声明:

这是不起作用的声明

(两种情况下方法的内容都是一样的。)

根据文档,我已将spring-cloud-function-kotlin模块添加到类路径中,方法是将其添加到build.gradle.kts

Spring Cloud Stream 的版本是Hoxton.RC1.

我还需要做些什么来获取该功能吗?还是我需要BiFunction在这种情况下使用?

0 投票
2 回答
1182 浏览

java - Sping Cloud Stream Kafka - 以批处理模式消费消息并作为单个处理的消息发送

我尝试为下一个 Spring Cloud Stream 版本准备我们的应用程序。(目前使用 3.0.0.RC1)。使用 Kafka 活页夹。

现在我们收到一条消息,对其进行处理并将其重新发送到另一个主题。分别处理每条消息会导致对我们数据库的大量单个请求。

在 3.0.0 版本中,我们希望将消息作为批处理处理,因此我们可以将数据保存在批处理更新中。

在当前版本中我们使用@EnableBinding、@StreamListener

我已将使用者属性设置为“批处理模式”并将签名更改为List<>,但这样做会导致收到 aList<byte[]>而不是预期的List<ExchangeableStock>。Ofc 之后可以进行转换,但这感觉就像“meh”,我认为这应该在调用 Listener 之前发生。

然后我尝试了(新的)功能版本,并且使用效果很好。我也喜欢这个简单的处理版本

但是输出主题现在接收到一个对象列表作为一条消息,并且后续消费者无法正常工作。

我想我错过了什么...

我是否需要添加某种 MessageConverter (对于注释驱动版本)或者是否有办法通过功能版本实现所需的行为?

0 投票
0 回答
356 浏览

spring-cloud-stream - spring-cloud-stream-binder-kafka-streams - 在功能实现中读取 Avro 消息

我正在尝试使用来自已在 Avro 中序列化的主题的消息。文档对于这是如何工作的非常混乱。https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_inbound_deserialization

我试图阅读的消息是 avro 序列化消息。我在同一个项目中有键和值的模式,并从模式中生成了类 - 键和值。

我的困惑是,有一些独特的应用程序属性和代码组合可以使其工作。现在我似乎弄错了,我一直在尝试使用一堆属性和代码组合,但它们都不起作用。

我不断收到的错误是

看起来默认的 json 序列化器正在启动并试图反序列化 avro 序列化消息。

我的代码如下所示

应用程序.yml

0 投票
1 回答
612 浏览

apache-kafka - spring cloud stream kafka KTable作为输入不起作用

Spring Cloud Stream Kafka,KTable 作为输入不起作用

接收器.java

消息接收器.java

应用程序.yml

我得到以下错误

有人可以请教是什么问题吗?使用 KStream 作为输入,它可以工作,但不能作为 KTable。提前致谢