问题标签 [spring-boot-cloud-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
660 浏览

spring-boot - 在 Spring Boot 中实现 KStream / Table 的困惑

我正在尝试获取一些 Spring-Boot kafka 流“动作”工作的样本,但我似乎最终完全糊涂了 :)

我正在通过网络接收 JSON 数据。我在avro中构建了一个模式,用于序列化数据:

这是据我所知:

我希望创建一个看起来像这样的 KTable:

|platformUID|状态|纬度|经度|Alt| |-----------|-----|---|---|---|

这就是我让自己感到困惑的地方。

我假设我想GroupByPlatformUID球场上做一个,但我不清楚如何实际前进。

有人可以指出我正确的方向吗?

我认为我正在寻找的是获取input流并将其转换为 KTable,其键为value.getUID(),值为之前的值

0 投票
1 回答
5396 浏览

java - 如何在 spring-boot-2 中的 yaml/properties 文件中禁用所有与 Kafka 相关的自动配置而不删除依赖项?

我创建了一个 spring-boot-2 gradle 项目,也在build.gradle文件中添加了与 Kafka 相关的依赖项,如下所示。

现在我想从application.yaml 文件中禁用所有与 Kafka 相关的自动配置,因为我已经尝试在我的 yaml 文件中给出以下代码。

在实现上述内容后,Kafka 仍然自动配置并开始将 Kafka 与应用程序集成。

我也试过下面的代码,但这对我也不起作用。

现在请任何人帮助我,我如何从 yaml/properties 文件中禁用与 kafka 相关的所有自动配置?

谢谢,

0 投票
1 回答
1139 浏览

apache-kafka-streams - num.stream.threads 创建空闲线程

我有一个带有 2 个主题的 Spring Boot kafka 流应用程序,考虑主题 A 和 B。主题 A 有 16 个分区,主题 B 有 1 个分区。考虑应用程序部署在num.stream.threads= 16 的 1 个实例中。我运行 kafka-consumer-groups.bat 命令来检查线程是如何分配给组中的分区的,得到以下输出。主题 A 和 B 分配了 16 个线程,其中主题 B 中的 14 个线程空闲。

如何避免主题 B 中的空闲线程,或者是否有任何选项可用于设置每个主题的 num.stream.threads?

0 投票
1 回答
138 浏览

java - 如何将 Spring Cloud Stream 功能 Bean 连接到 Kafka Binder?

我正在使用Spring Cloud Streams 文档来尝试解决如何通过已在 Gradle 中下载的活页夹将我的微服务连接到 Kafka。我尝试@Bean Function<String, String>()在我的 Spring Boot Application 类中创建一个简单的方法,并验证它能够通过使用命令行与 Kafka 进行交互来uppercase-in-0uppercase-out-0主题交互,如文档开头所述,确认应用程序能够与卡夫卡通信。在这一点上,我尝试创建以下类,并期望它将通过自动发现加载:

使用application.properties这样的文件:

我不是 100% 确定现在应该发生什么,但我假设它应该看到类并自动创建一个我可以消费charcounter-out-0charcounter-in-0发布到的主题,这些主题中的数据通过该函数。这不是正在发生的事情。我可能会错过什么?此类是否应该以与创建主题相同的方式创建主题@Bean

0 投票
1 回答
2374 浏览

spring - 引起:java.lang.ClassNotFoundException:org.springframework.integration.dsl.IntegrationFlowBuilder

我正在Spring Cloud Stream using RabbitMQ使用Spring Boot v2.2.6.RELEASE.

错误:

应用程序属性

SpringCloudStreamEx1PublisherApplication.java

pom.xml

0 投票
0 回答
32 浏览

spring-boot - 我可以在 SpringBoot 中使用“spring-cloud-stream-binder-kafka”、“spring-cloud-stream-binder-kafka-streams”和“spring-cloud-stream-binder-rabbit”吗?

我正在学习Spring Cloud Streams并设法创建了一个结合了KafkaRabbit的多绑定器 Spring Boot 项目:

现在我也想添加kafka-streams binder:

  1. 那可能吗 ?
  2. 如果是这样,这里的示例都没有结合 (a) kafka和 (b) kafka-streams,您知道显示使用两者的配置的示例吗?
  3. 如果不能结合 (a) 和 (b),是否可以结合 (b) kafka-streams和 (c) rabbit
  4. 如果是这样,您知道#3 的配置示例吗?
0 投票
1 回答
238 浏览

confluent-schema-registry - Kafka 主题和模式注册表主题

我有一个关于使用 Kafka 和不同名称的主题(Kafka 代理)和主题(Schema Registry)设置流处理器的问题。

首先,Kafka 代理和模式注册表似乎一切正常,但如果处理器接收到事件,模式注册表魔法就会启动。

而不是将 abc 作为主题发送到模式注册表 abc.bla 将被发送。架构注册表回答未找到。

预期:localhost:8081/subjects/abc/versions 意外和错误:localhost:8081/subjects/abc.bla/versions

我想知道出了什么问题,因为单个生产者或消费者客户端似乎能够在没有显式配置的情况下从主题中识别出正确的主题名称。

这里是处理器代码:

这是我认为可能是问题的堆栈跟踪:

有人知道我如何配置io.confluent.kafka.serializers.KafkaAvroDeserializerio.confluent.kafka.serializers.KafkaAvroSerializer更正吗?

非常感谢,马库斯

0 投票
0 回答
20 浏览

apache-kafka - 使用 Spring bean 功能设置 Kafka 流拓扑

我尝试运行 Kafka 拓扑测试环境。

生产代码真的很小

现在我想在集成测试中测试代码。如果我理解这个概念是正确的,我必须将 new ProcessApplication().process() 包含到 StreamsBuilder 中,但我不知道如何将 java.util.Function 添加到其中。

此外,我找到了另一种解决方案,如何一起构建拓扑。

即使在这里,我也不知道我应该做什么。更糟糕的是,我有些怀疑自己写的处理器真的是进程还是必须使用ProcessorSupplier。

在 Docker 环境中,生产代码按预期工作。

谢谢,马库斯