问题标签 [spring-cloud-stream-binder-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-boot - KafkaConsumer 没有轮询所有消息
我有一个类似于下面的代码:
运行代码时,我无法获取此特定主题中的所有消息。例如,如果我在这个分区中有 24 条记录,我只能得到 20 条记录。
这个问题并不总是发生。当我们在运行我的应用程序的同一台机器上使用 docker 制作 Kafka (wurstmeister/kafka:1.1.0) 时,我能够从该特定分区获取所有记录。
但是当我在另一台机器上制作相同的 docker-compose up 并连接到它时,就会发生这个问题。
kafka-consumer-api - springframework.cloud.stream.messaging 包中的 Sink 是什么?
我正在运行的应用程序将在 kafka 流中充当消费者。我通过 spring 的流配置了 kafka 消息传递。
如何确定它是否从正确的主题中消费?
springframework.cloud.stream.messaging 包中的 SINK 是什么?它扮演什么角色。如何配置它。什么是 Sink.INPUT?
spring-kafka - Kafkalistener 作为无服务器或功能
我们有一个 Kafka 监听器从主题中消费消息。我们想让这个 bean 成为函数式的,这样当负载很重时,我们可以使用无服务器架构启动多个函数实例。谁能告诉我正确的方向
spring-cloud-stream - spring cloud stream 3.0 存在生产者问题
我阅读了关于spring cloud stream 3.0的文档,了解新使用java.util.function.[Supplier/Function/Consumer]来代表生产者,消费和生产,消费者,这应该是正确的。
但我不了解供应商。
该文件指出,对供应商的轮询用于一致地为供应商生成数据,并且不需要程序参与。
但是很多时候,我们需要在特定时间生成数据,例如网络请求,我找不到任何文档或示例。
它可能就像注入 Supplier 对象并调用 get() 方法一样简单,但是如何禁用轮询调用呢?
感谢所有提供信息的人。
spring-cloud - 无法解码密钥的 json 类型:Spring Cloud Data Flow 流中的 file_name
我使用 Spring Cloud Data Flow 设置读取 CSV 文件的流,使用自定义处理器对其进行转换并记录它:
文件和 csvToMap 应用程序工作正常,但在日志应用程序中我看到这种异常,对于每条记录:
对于 file_relativePath 标头也会引发此异常。我不明白为什么 spring-kafka 试图将它们读取为 JSON。
此外,日志接收器以正确的方式记录我的记录:
我在我的 csvToMap 处理器中记录了 kafka 标头以进行调试,给了我:
所以我绝对不明白为什么日志接收器会尝试解码 file_name 和 file_relativePath 标头。
我设置了一个本地环境:
- Windows 7的
- Spring CDF 服务器 v 2.2.1.REALEASE
- Spring Cloud Skipper v 2.1.2.RELEASE
- Spring CDF 外壳 v 2.2.1.RELEASE
- 卡夫卡 2.12-2.3.0
我的 csvToMap 处理器定义如下:
与此父母:
而这个 Spring 云版本:
我做错了什么导致这个问题?
apache-kafka - Kafka Streams:对第 n 个事件的操作
我正在尝试找到如何在 Kafka Streams 中对第n个事件执行操作的最佳方法。
我的情况:我有一个带有一些Events的输入流。我必须通过eventType == login过滤它们,并且在同一accountId的每个n次登录(比如说,第五次)上,将此Event发送到输出流。
经过一些调查和不同的尝试,我得到了下面的代码版本(我使用的是 Kotlin)。
我最大的担心是transform
在我的情况下函数不是线程安全的。我已经检查了在我的案例中使用的 KV 存储的实现,这是 RocksDB 存储(非事务性),因此值可能会在读取和比较之间更新,并且错误的事件将被发送到输出。
我的其他想法:
- 使用物化视图作为没有变压器的商店,但我坚持实施。
- 创建一个将使用 TransactionalRocksDB 的自定义持久 KV 存储(不确定是否值得)。
- 创建一个自定义的持久 KV 存储,该存储将在内部使用 ConcurrentHashMap(如果我们预期的用户很多,它可能会导致高内存消耗)。
还有一点需要注意:我正在使用 Spring Cloud Stream,所以也许这个框架有一个适合我的案例的内置解决方案,但我没有找到它。
我将不胜感激任何建议。提前致谢。
spring-boot - 反应式 Spring Webflux 应用程序中的 Spring Cloud Stream Kafka Producer 事务性
我在 Spring Webflux 应用程序中使用 Spring Cloud Stream Binder Kafka 3.0.0,该应用程序公开了一个 API,该 API 接收一些数据并将其发布到 Kafka 主题,使用@Output
:
我已经将 Kafka 和 Spring Boot 应用程序都配置为使用事务生产者(请注意@Transactional
上面发送方法上的注释):
关键是,作为一个 Spring Webflux 应用程序,我不应该阻塞 http 线程,所以我应该将(阻塞)生产者包装在 fromCallable 块上并在另一个线程池上执行它,例如:
我的问题是:
@Transactional
注释是否仍然适用于这种方法?估计不应该...- 在响应式 Spring Webflux + Cloud Stream Kafka 上下文中支持事务性的推荐方法是什么?
- 奖励:Spring Cloud Stream 是否支持Reactor Kafka ?如果是这样,我们如何在这种情况下配置它
@Output
,事务支持......?
apache-kafka - java.lang.NullPointerException:使用 Avro 格式的 Spring Cloud Stream 3 Kafka 函数生产者中的 null
我正在升级现有的 Spring Cloud Stream 应用程序以使用新的 Spring Cloud Function 生产者。生成的消息使用 Avro 格式。
这是我的整个设置:
有一个 REST 端点接收一些信息并使用以下命令将其发送到输出主题InfoProducer
:
问题是我遇到了这个丑陋的异常:
InBeanFactoryAwareFunctionRegistry
acceptedOutputMimeTypes
是一个空数组。
这里有什么问题?
java - @Autowired 处理器为空
我正在使用 Spring Cloud Stream 的 OOTB 示例。
我的主要代码中有这段代码。
当我在其中投入一些东西时,我的生产者类中的处理器类为空。
@Autowired 注释工作正常,因为我的功能没有错误,但流程变量随时为空。
可以从这里克隆基础项目: https ://github.com/eugenp/tutorials/tree/master/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka
对此有什么想法吗?
提前致谢!
apache-kafka - 如何配置 Spring Cloud Stream (Kafka) 应用程序以在 Confluent Cloud 中自动创建主题?
有没有办法让(Spring Cloud Stream)应用程序在 Confluent Cloud 中自动创建他们需要的主题?
到目前为止,我不得不手动创建它们,当您认为您还必须设置更改日志主题时,这很容易出错。