问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
52 浏览

spring-webflux - 如何使用 `Function 增加主题消费者吞吐量, 通量>`?

我有一个基于 webFlux 的服务,将使用然后从 kafka 主题生成消息。我的代码就是这样

我发现当我有 2 个实例时,我每 30 分钟可以消耗 750 条消息,但我的 CPU 从未高于 10%。随着时间的推移,延迟不断增加,所以我想知道如何增加消费者吞吐量。从文档来看,并发对反应性无效,链接

有谁知道如何在不添加更多实例的情况下增加吞吐量?

0 投票
1 回答
84 浏览

java - Spring集成Kafka MessaheChannel Thread?

我有一个 Spring Integration 流 bean(通过 Java DSL 配置),它处理来自与 Spring CloudStream 绑定的 kafka 队列消息通道的消息。

kafka 消息的来源是一个外部应用程序,所以我真正想了解的是,哪些线程/线程将实际处理该消息。

它是使用应用程序创建的单个专用线程,还是 CloudStream 自动创建和配置的线程池,或者其他什么?

我能以某种方式管理它吗?

0 投票
1 回答
3219 浏览

spring-boot - 使用 Spring Cloud Stream Kafka Binder + Kafka Streams Binder 的 Spring Boot 应用程序不起作用 - 生产者不发送消息

我的带有 SCS Hoshram.SR6 的 Spring Boot 2.3.1 应用程序正在使用 Kafka Streams Binder。我需要添加一个将在应用程序的另一部分中使用的 Kafka Producer,因此我添加了 kafka binder。问题是生产者不工作,抛出异常:

这是我的配置:

这里可能是什么问题?

更新

我调整了配置如下:

现在我看不到任何异常,但消息没有进入主题。

在另一个仅使用 kafka binder 的应用程序中,它可以完美运行:

更新 2

我设置了 logging.level.org.springframework.cloud.stream: debug

在日志中显示以下跟踪:

但是,没有任何关于 a 的内容Creating binder: kafka

0 投票
1 回答
408 浏览

spring-boot - KafkaBindingRebalanceListener Bean 未由 KafkaMessageChannelBinder Bean 自动装配

文档非常直接,建议公开 KafkaBindingRebalanceListener 类型的 Bean,并且将在内部调用 onPartitiosnAssigned 方法。我正在尝试做同样的事情,并且在 spring 框架创建其 KafkaMessageChannelBinder Bean 时,ObjectProvider.getIfUnique() 总是返回 null,因为它无法找到所需的 bean。似乎当应用程序启动 SpringFramework 时首先创建它的 Beans 并且由于它尚未创建而无法找到 Rebalance Listener Bean。以下是项目中的三个代码片段。如果我缺少任何指示应用程序在进入 Spring Framework 之前首先在应用程序包中创建 Bean 的内容,请提供帮助。

再平衡监听器

配置类

应用类

0 投票
1 回答
1858 浏览

spring - spring-cloud-stream-binder-kafka - 无法使用 ssl 配置创建多个 kafka 活页夹

我正在尝试使用 jaas 配置通过 SASL_SSL 协议连接到 kafka 集群,如下所示:

上述配置与spring-cloud-stream 的官方 git repo上可用的示例配置内联。

该库的 git repo 上提出的类似问题说它已在最新版本中修复,但似乎并非如此。收到以下错误:

springBootVersion:2.2.8 和 spring-cloud-stream-dependencies 版本 - Horsham.SR6。

这让我认为该库没有正确获取配置道具,因为它jaas.loginModule被指定为ScramLoginModule但它Krb5LoginModule用于进行身份验证。

但是,令人惊讶的是,当配置如下完成时(区别在于最后一部分在 binder 的环境之外使用 ssl 凭据),它会连接到全局 ssl props 中指定的 binder(在 binder 的 env 之外)并默默地忽略另一个活页夹而不显示任何错误日志。

假设如果kafka-2-with-ssl在全局 ssl props 中指定了 binder 的密码凭据,则会创建该 binder,并且订阅该 binder 的 bindings 开始使用 events。但这仅在我们需要创建单个活页夹时才有用。

向您保证 ssl 凭据没有任何问题。使用成功单独创建的 ssl-kafka-binder 进行了努力测试。目的是使用 SASL_SSL 协议连接到多个 kafka binder。提前致谢。

0 投票
1 回答
1096 浏览

java - 在 kstreams 应用程序中使用自定义 Kafka 状态存储

我们正在使用 spring cloud stream Hoxton RC7 项目中包含的 Kafka-streams(因此使用提供的 Kafka-streams 和 Kafka-client 版本 [2.3.1])

我们已经实现了一个 kstreams 应用程序

我们在其中进行一些聚合,例如:

现在物化应该是一个自定义的外部状态存储(Redis):

由 StoreBuilder Bean 提供:

我现在使用 EmbeddedKafka 测试应用程序:

我尝试访问状态存储并查询添加的项目:

但是当我运行我的测试时,我收到一个错误:

几个问题:

  • [1] 解释的使用自定义状态存储的示例在处理器中使用它。这是否自动意味着,我无法在聚合中使用自定义状态存储?
  • 如果无法在聚合中使用它,那么使用自定义状态存储有什么意义呢?
  • 当我为 kstreams 稍微更改上面的代码并定义一个处理器而不是在聚合方法中使用物化时,错误会发生变化,然后它会在尝试执行 getQueryableStore 时抱怨缺少状态“redis-store”存储。但我确实可以看到,addStateStoreBeans 注册了“redis-store”。这怎么可能发生?

我想使用自定义状态存储的原因是,我(真的很容易)无法为应用程序实例提供专用硬盘。为了快速启动应用程序,我希望避免在每次启动应用程序时处理完整的变更日志(最好每天进行几次,目前需要一个多小时)。所以现在最后一个问题:

  • 使用自定义外部状态存储时,我能否在应用程序重新启动时恢复到最后一个状态?

[1] https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive -查询

0 投票
1 回答
66 浏览

spring-boot - 使用 Kafka 作为独立库的 spring-boot 云流

我正在尝试将基于 spring-boot 云流的库与非 Spring 应用程序中的 Kafka 集成。

当这个库被加载到另一个 Spring 应用程序中时,一切正常。

当我尝试使用未启用弹簧启动的应用程序初始化应用程序上下文并获取我的 bean 时,我收到以下警告和异常:

0 投票
1 回答
246 浏览

java - Spring Cloud Kinesis Binder 如何处理生产者和消费者的错误 - 根据文档,它不起作用

我遵循了以下文档,并且我有一个生产者和消费者使用 Kinesis Stream 工作得很好。我想了解如何处理生产者(源)和消费者(处理器)中的错误,以防发生任何异常。

根据 Spring Stream 错误处理文档,我尝试了以下方法:

  1. 我尝试使用 @ServiceActivator("input-stream.input-stream-group.erros") - 这有效,但我的“输入流”是每个生产环境中的动态名称,根据我应该附加生产环境的策略定义数据流时的名称。这是首选方式,但如何解决这个问题?

  2. 我尝试过使用@ServiceActivator("errorChannel") - 这不起作用意味着如果我为此引入一种方法并放置一个记录器,则错误正在捕获和打印,但由于以下重新抛出而引发错误org.springframework.cloud.stream.binding.StreamListenerMessageHandler(第 53-68 行)

  3. 我已经自动装配了一个与“errorChannel”同名的 MessageChannel,并且在捕获异常时我已经准备了一条消息并发送给它,但在 ServiceActivator 方法中的行为与上面相同。

我该如何处理和解决这个问题?请建议并帮助我。

文档:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview。 adoc#error-channels

0 投票
1 回答
1067 浏览

spring-cloud-stream - Spring Cloud Stream 发送到 DLQ 的所有消息

我正在使用以下应用程序使用带有 kafka-binding 的 spring-cloud-stream 进行练习:

如您所见,应用程序非常简单:如果收到的消息是“EXCP”,则抛出异常,否则发布“OK”消息。

我不清楚的是,为什么如果从“react-in”读取错误消息,那么从该主题读取的每条消息都进入 DLQ 主题。例如:

  1. 将“test-1”写入“react-in” -> 在“react-out”中得到“OK:test 1”
  2. 将 "test-2" 写入 "react-in" -> 在 "react-out" 中得到 "OK: test 2"
  3. 将“EXCP”写入“react-in” -> 在“dlq”中获得“ECP”
  4. 将“test-3”写入“react-in” -> 在“dlq”中得到“test 3”

我本来希望最后一条消息发布在“react-out”主题而不是“dlq”主题中。这里的日志:

有趣的是,“单一”函数的行为符合我的预期:

  1. 将“test-1”写入“single-in” -> 在“single-out”中得到“OK:test 1”
  2. 将 "test-2" 写入 "single-in" -> 在 "single-out" 中得到 "OK: test 2"
  3. 将“EXCP”写入“single-in” -> 在“dlq”中获得“ECP”
  4. 将“test-3”写入“single-in” -> 在“single-out”中得到“OK:test 3”

有人可以解释一下为什么使用反应器实现所有消息都在 dql 中发布,以及“调度程序没有订阅者”错误是什么意思?

谢谢

0 投票
1 回答
898 浏览

java - 需要有关 Spring Boot Kafka Stream Binder Application 错误的建议

我试图从https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_usage_2站点运行 Spring boot Kafka Stream 示例。

我能够成功地构建它。但是在运行时出现如下所示的错误(java.lang.IllegalStateException: Error processing condition on org.springframework.cloud.stream.function.FunctionConfiguration)。任何人都可以帮助找出代码中的问题吗?

复制KafkaStreamApplication.java

pom.xml

错误