问题标签 [spring-cloud-stream-binder-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 带有 apache-kafka-binder 的 Spring-cloud-stream 功能模型
这是这个问题的续集。我可以将“普通”Apache Kafka Binder 与功能模型一起使用吗?到目前为止,我使用基于注释的配置混合了这两种配置,以便在一个应用程序中spring-cloud-stream-binder-kafka
进行简单的消费/生产和spring-cloud-stream-binder-kafka-streams
高级流处理。
功能模型似乎仅由streams
活页夹支持,如果我尝试混合使用这两种方法 - 基于注释的简单用法和功能的流,流绑定未注册。
@EnableBinding(SimpleBinding.class)
存在于配置类中。是否首选/支持按所述混合两者,或者我是否应该将其streams-binder
用于简单的消息消费?
spring-boot - 是否可以在 Spring Cloud Stream 中配置多个绑定到同一个处理器?
我有一个简单的 Spring Integration 流,它绑定到处理器以进行输入和输出。
我已将 Kafka 活页夹配置为映射输入和输出主题。这完美无缺。
假设我想将 3 个 Kafka 主题绑定到输入,导致 3 个配置的消费者从三个单独的 kafka 主题中提取,然后由我的 SI 流处理。
是否可以将多个 Kafka 主题映射到我的处理器的输入?如果是这样,该配置的外观如何?
apache-kafka-streams - Spring Cloud Stream 将值生成为包含 JSON 而不仅仅是 JSON 的字符串
在使用 Spring Cloud Stream 的流处理应用程序中,我正在获取输入流(以整数为键)并调用selectKey
它以创建具有相同值但具有不同键(字符串)的新主题。输入主题中包含正确 JSON 格式的记录,例如:
问题是流处理应用程序创建的主题具有value
包含 JSON 的字符串,而不是正确的 JSON,即:
代码如下:
上面的函数所做的是消耗输入流,并生成一个输出流(被发送到output
)。该输出流与输入流具有相同的值,但只是一个不同的键。(在这种情况下,键来自值的publicId
属性。)
application.yml
如下:
有什么我想念的吗?这实际上是一个问题,还是可以将 JSON 作为字符串存储在 Spring Cloud Stream 生成的消息中?
我尝试过的其他没有什么影响的事情:
- 使用本机解码/编码
- 设置
spring.cloud.stream.bindings.output.content-type
为application/json
- 使用
map
代替selectKey
apache-kafka - 如何使用 Spring Cloud Stream 仅发送自定义 Header
我必须在 kafka 消息中设置自定义标头,我的 kafka 集群本机支持标头(1.xx)。我目前正在使用 springCloudVersion=Finchley.RELEASE。
当我设置属性时
没有任何标题出现在输出中。
另一方面,如果我正在设置
许多标头包括contentType
和spring_json_header_types
即将出现在标头中,这极大地影响了吞吐量。Kafka 是一种与语言/框架无关的消息传递机制,我觉得 Spring 应该提供一种仅包含用户提供的标头的方法。是否有任何解决方法可以仅将用户设置的标头获取到 kafka 主题,同时抑制所有与 Spring Cloud 相关的标头。
apache-kafka-streams - 如何使用 Kafka Streams 对 Spring Cloud Stream 进行单元测试
我一直在尝试让 Spring Cloud Stream 与 Kafka Streams 一起工作一段时间,我的项目使用嵌入式 kafka 来测试 Kafka DSL,我使用这个存储库作为我的测试实现的基础(它本身就是一个测试用例这个问题)。
我在这里创建了一个存储库来演示这一点。
基本上,当使用使用“Processor.class”和 MessageChannel 作为实现的“DemoApplicationTest.ExampleAppWorking.class”时,测试是成功的。
但是当使用使用自定义绑定和 KStream 作为实现的“DemoApplicationTest.ExampleAppNotWorking.class”时,它会失败。
第二个用例中的错误是这样的:
我错过了什么?
spring-kafka - 时间窗口内的 Spring Kafka 批处理
Spring Boot 环境监听 kafka 主题(@KafkaListener / @StreamListener) 配置监听工厂以批处理方式运行:
或通过application.properties
:
如何配置框架,以便给定两个数字:N 和 T,它将尝试为侦听器获取 N 条记录,但不会等待超过 T 秒,如下所述:https ://doc.akka.io/docs /akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
我看过的一些属性:
max-poll-records
确保您不会在批次中获得超过 N 个数字fetch-min-size
在 fetch 请求中至少获取此数量的数据fetch-max-wait
但不要等待超过必要的时间idleBetweenPolls
在民意调查之间睡一会儿:)
似乎fetch-min-size
结合fetch-max-wait
应该这样做,但它们比较字节,而不是消息/记录。
显然可以手动实现,我正在寻找是否可以为我配置 Spring。
kotlin - Spring Cloud Stream/Function:不使用 Java Function/BiFunction 的 Kotlin lambdas
我有一个使用 Spring Boot 编写的 Kafka 流处理应用程序,使用spring-cloud-function
和spring-cloud-stream-binder-kafka-streams
. 处理几个流的方法用 注释,因此它应该被(而不是使用)@Bean
拾取。当此方法返回 a时,它可以工作。但是当我将它作为普通的 Kotlin lambda 进行尝试时,Spring Boot 并没有发现它:应用程序启动然后立即结束,因为它没有找到要运行的函数。spring-cloud-function
@StreamListener
BiFunction
从我在文档中可以看到,这应该可行。
这是有效的声明:
这是不起作用的声明:
(两种情况下方法的内容都是一样的。)
根据文档,我已将spring-cloud-function-kotlin
模块添加到类路径中,方法是将其添加到build.gradle.kts
:
Spring Cloud Stream 的版本是Hoxton.RC1
.
我还需要做些什么来获取该功能吗?还是我需要BiFunction
在这种情况下使用?
java - Sping Cloud Stream Kafka - 以批处理模式消费消息并作为单个处理的消息发送
我尝试为下一个 Spring Cloud Stream 版本准备我们的应用程序。(目前使用 3.0.0.RC1)。使用 Kafka 活页夹。
现在我们收到一条消息,对其进行处理并将其重新发送到另一个主题。分别处理每条消息会导致对我们数据库的大量单个请求。
在 3.0.0 版本中,我们希望将消息作为批处理处理,因此我们可以将数据保存在批处理更新中。
在当前版本中我们使用@EnableBinding、@StreamListener
我已将使用者属性设置为“批处理模式”并将签名更改为List<>
,但这样做会导致收到 aList<byte[]>
而不是预期的List<ExchangeableStock>
。Ofc 之后可以进行转换,但这感觉就像“meh”,我认为这应该在调用 Listener 之前发生。
然后我尝试了(新的)功能版本,并且使用效果很好。我也喜欢这个简单的处理版本
但是输出主题现在接收到一个对象列表作为一条消息,并且后续消费者无法正常工作。
我想我错过了什么...
我是否需要添加某种 MessageConverter (对于注释驱动版本)或者是否有办法通过功能版本实现所需的行为?
spring-cloud-stream - spring-cloud-stream-binder-kafka-streams - 在功能实现中读取 Avro 消息
我正在尝试使用来自已在 Avro 中序列化的主题的消息。文档对于这是如何工作的非常混乱。https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_inbound_deserialization
我试图阅读的消息是 avro 序列化消息。我在同一个项目中有键和值的模式,并从模式中生成了类 - 键和值。
我的困惑是,有一些独特的应用程序属性和代码组合可以使其工作。现在我似乎弄错了,我一直在尝试使用一堆属性和代码组合,但它们都不起作用。
我不断收到的错误是
看起来默认的 json 序列化器正在启动并试图反序列化 avro 序列化消息。
我的代码如下所示
应用程序.yml
apache-kafka - spring cloud stream kafka KTable作为输入不起作用
Spring Cloud Stream Kafka,KTable 作为输入不起作用
接收器.java
消息接收器.java
应用程序.yml
我得到以下错误
有人可以请教是什么问题吗?使用 KStream 作为输入,它可以工作,但不能作为 KTable。提前致谢