问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
268 浏览

spring-boot - KafkaConsumer 没有轮询所有消息

我有一个类似于下面的代码:

运行代码时,我无法获取此特定主题中的所有消息。例如,如果我在这个分区中有 24 条记录,我只能得到 20 条记录。

这个问题并不总是发生。当我们在运行我的应用程序的同一台机器上使用 docker 制作 Kafka (wurstmeister/kafka:1.1.0) 时,我能够从该特定分区获取所有记录。

但是当我在另一台机器上制作相同的 docker-compose up 并连接到它时,就会发生这个问题。

0 投票
1 回答
83 浏览

kafka-consumer-api - springframework.cloud.stream.messaging 包中的 Sink 是什么?

我正在运行的应用程序将在 kafka 流中充当消费者。我通过 spring 的流配置了 kafka 消息传递。

如何确定它是否从正确的主题中消费?

springframework.cloud.stream.messaging 包中的 SINK 是什么?它扮演什么角色。如何配置它。什么是 Sink.INPUT?

0 投票
1 回答
73 浏览

spring-kafka - Kafkalistener 作为无服务器或功能

我们有一个 Kafka 监听器从主题中消费消息。我们想让这个 bean 成为函数式的,这样当负载很重时,我们可以使用无服务器架构启动多个函数实例。谁能告诉我正确的方向

0 投票
1 回答
1610 浏览

spring-cloud-stream - spring cloud stream 3.0 存在生产者问题

我阅读了关于spring cloud stream 3.0的文档,了解新使用java.util.function.[Supplier/Function/Consumer]来代表生产者,消费和生产,消费者,这应该是正确的。

但我不了解供应商。

该文件指出,对供应商的轮询用于一致地为供应商生成数据,并且不需要程序参与。

但是很多时候,我们需要在特定时间生成数据,例如网络请求,我找不到任何文档或示例。

它可能就像注入 Supplier 对象并调用 get() 方法一样简单,但是如何禁用轮询调用呢?

感谢所有提供信息的人。

0 投票
2 回答
2279 浏览

spring-cloud - 无法解码密钥的 json 类型:Spring Cloud Data Flow 流中的 file_name

我使用 Spring Cloud Data Flow 设置读取 CSV 文件的流,使用自定义处理器对其进行转换并记录它:

文件和 csvToMap 应用程序工作正常,但在日志应用程序中我看到这种异常,对于每条记录:

对于 file_relativePath 标头也会引发此异常。我不明白为什么 spring-kafka 试图将它们读取为 JSON。

此外,日志接收器以正确的方式记录我的记录:

我在我的 csvToMap 处理器中记录了 kafka 标头以进行调试,给了我:

所以我绝对不明白为什么日志接收器会尝试解码 file_name 和 file_relativePath 标头。

我设置了一个本地环境:

  • Windows 7的
  • Spring CDF 服务器 v 2.2.1.REALEASE
  • Spring Cloud Skipper v 2.1.2.RELEASE
  • Spring CDF 外壳 v 2.2.1.RELEASE
  • 卡夫卡 2.12-2.3.0

我的 csvToMap 处理器定义如下:

与此父母:

而这个 Spring 云版本:

我做错了什么导致这个问题?

0 投票
1 回答
84 浏览

apache-kafka - Kafka Streams:对第 n 个事件的操作

我正在尝试找到如何在 Kafka Streams 中对第n个事件执行操作的最佳方法。

我的情况:我有一个带有一些Events的输入流。我必须通过eventType == login过滤它们,并且在同一accountId的每个n次登录(比如说,第五次)上,将此Event发送到输出流。

经过一些调查和不同的尝试,我得到了下面的代码版本(我使用的是 Kotlin)。

我最大的担心是transform在我的情况下函数不是线程安全的。我已经检查了在我的案例中使用的 KV 存储的实现,这是 RocksDB 存储(非事务性),因此值可能会在读取和比较之间更新,并且错误的事件将被发送到输出。

我的其他想法:

  1. 使用物化视图作为没有变压器的商店,但我坚持实施。
  2. 创建一个将使用 TransactionalRocksDB 的自定义持久 KV 存储(不确定是否值得)。
  3. 创建一个自定义的持久 KV 存储,该存储将在内部使用 ConcurrentHashMap(如果我们预期的用户很多,它可能会导致高内存消耗)。

还有一点需要注意:我正在使用 Spring Cloud Stream,所以也许这个框架有一个适合我的案例的内置解决方案,但我没有找到它。

我将不胜感激任何建议。提前致谢。

0 投票
0 回答
835 浏览

spring-boot - 反应式 Spring Webflux 应用程序中的 Spring Cloud Stream Kafka Producer 事务性

我在 Spring Webflux 应用程序中使用 Spring Cloud Stream Binder Kafka 3.0.0,该应用程序公开了一个 API,该 API 接收一些数据并将其发布到 Kafka 主题,使用@Output

我已经将 Kafka 和 Spring Boot 应用程序都配置为使用事务生产者(请注意@Transactional上面发送方法上的注释):

关键是,作为一个 Spring Webflux 应用程序,我不应该阻塞 http 线程,所以我应该将(阻塞)生产者包装在 fromCallable 块上并在另一个线程池上执行它,例如:

我的问题是:

  • @Transactional注释是否仍然适用于这种方法?估计不应该...
  • 在响应式 Spring Webflux + Cloud Stream Kafka 上下文中支持事务性的推荐方法是什么?
  • 奖励:Spring Cloud Stream 是否支持Reactor Kafka ?如果是这样,我们如何在这种情况下配置它@Output,事务支持......?
0 投票
1 回答
1121 浏览

apache-kafka - java.lang.NullPointerException:使用 Avro 格式的 Spring Cloud Stream 3 Kafka 函数生产者中的 null

我正在升级现有的 Spring Cloud Stream 应用程序以使用新的 Spring Cloud Function 生产者。生成的消息使用 Avro 格式。

这是我的整个设置:

有一个 REST 端点接收一些信息并使用以下命令将其发送到输出主题InfoProducer

问题是我遇到了这个丑陋的异常:

InBeanFactoryAwareFunctionRegistry acceptedOutputMimeTypes是一个空数组。

这里有什么问题?

0 投票
1 回答
219 浏览

java - @Autowired 处理器为空

我正在使用 Spring Cloud Stream 的 OOTB 示例。

我的主要代码中有这段代码。

当我在其中投入一些东西时,我的生产者类中的处理器类为空。

@Autowired 注释工作正常,因为我的功能没有错误,但流程变量随时为空。

可以从这里克隆基础项目: https ://github.com/eugenp/tutorials/tree/master/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka

对此有什么想法吗?

提前致谢!

0 投票
1 回答
1121 浏览

apache-kafka - 如何配置 Spring Cloud Stream (Kafka) 应用程序以在 Confluent Cloud 中自动创建主题?

有没有办法让(Spring Cloud Stream)应用程序在 Confluent Cloud 中自动创建他们需要的主题?

到目前为止,我不得不手动创建它们,当您认为您还必须设置更改日志主题时,这很容易出错。