问题标签 [spring-cloud-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
627 浏览

spring-cloud - Spring Integration / Spring cloud Stream:如何使用 @inboundchanneladapter 发送单个消息

我有以下代码:

我希望禁用轮询器,以便我可以发送一条消息。我怎么做?

0 投票
1 回答
868 浏览

spring-cloud-stream - 消费者组的所有消费者都收到消息

我编写了一个 spring 云流应用程序来接收来自 Kafka 主题的消息。我正在尝试设置消费者组,这样当我扩展我的应用程序时,只有一个应用程序实例接收来自 Kafka 主题的消息。

以下是我的 application.yml spring: cloud: stream: bindings: orderTopic: group: orderGroup destination: orderTopic kafka: binder: brokers: 192.168.61.21 defaultBrokerPort: 9092 zkNodes: 192.168.61.21 defaultZkPort: 2181 我已将应用程序部署到 Cloud Foundry (pcfdev) 并将应用程序的实例计数设置为 2。当我向 orderTopic 主题发送订单时,我希望只有 2 个应用程序实例中的一个收到订单, 但是,两个应用程序实例都收到相同的顺序,如下所示 2016-05-10T16:33:46.42-0600 [APP/0] OUT Order Number 23 received. 2016-05-10T16:33:47.42-0600 [APP/1] OUT Order Number 24 received. 2016-05-10T16:33:47.42-0600 [APP/0] OUT Order Number 24 received. 2016-05-10T16:33:48.42-0600 [APP/1] OUT Order Number 25 received. 2016-05-10T16:33:48.42-0600 [APP/0] OUT Order Number 25 received. 2016-05-10T16:33:49.43-0600 [APP/1] OUT Order Number 26 received. 2016-05-10T16:33:49.43-0600 [APP/0] OUT Order Number 26 received.

你能帮忙吗

0 投票
1 回答
444 浏览

spring-cloud-stream - 在没有 spring-boot 的情况下使用 spring-cloud-integration

我有兴趣在spring-cloud-starter-stream-kafka没有 spring boot 的情况下使用,因为我有一个使用 Jersey servlet 和其他自定义代码的容器应用程序。

是否有任何直接的方法可以启用与@EnableBinding使用容器内应用程序注释 spring-boot 应用程序相同的功能?

0 投票
1 回答
1768 浏览

java - Spring Cloud Stream @ServiceActivator 不会在异常时向 errorChannel 发送消息

我正在使用 spring-cloud-starter-stream-kafka 使用 spring 云流。我已将我的频道绑定到 kafka 主题,如下所示application.properties

我无法让我的程序向错误通道生成异常消息。令人惊讶的是,即使我在不​​同的线程中,它似乎也没有尝试生成它(我有一个@MessagingGateway将消息转储到gatewayOutput中,然后其余的流程异步发生)。这是我的定义ServiceActivator

这是生成的日志(我已经截断了完整的异常)。没有...

  • 投诉 errorChannel 没有任何订阅者
  • Kafka生产者线程日志记录

编辑:这是我的频道课程的内容:

0 投票
2 回答
1585 浏览

java - 如何在 Spring-Cloud-Stream 中配置重新连接到 Kafka

我用 spring-cloud-stream 开发了一个简单的 Kafka 消费者,它工作得很好。当 Kafka 关闭时,框架甚至会执行自动重新连接。问题是重新连接是全速尝试的(在我的机器上大约每秒 10 次)。

问题:如何根据两次尝试之间的间隔、退避等配置重新连接行为?

更新 正如 Marius 建议的那样,我打开了一个问题。我认为这是我想要的功能不存在的暗示。谢谢你的回答!

https://github.com/spring-cloud/spring-cloud-stream/issues/541

0 投票
1 回答
3049 浏览

spring-cloud-stream - spring-cloud-stream kafka 错误处理

我浏览了 spring-cloud-stream 1.0.0.RELEASE 的文档,似乎找不到任何有关错误处理的文档。

根据对 kafka 0.9 的观察,如果我的消费者抛出 RuntimeException,我会看到 3 次重试。3 次重试后,我在日志中看到:

此时,消费者偏移量滞后 1,如果我重新启动消费者,则再次重试消息 3 次。但是,如果我随后将另一条消息发送到同一分区以使消费者不引发异常,则消费者偏移量将被更新,并且我们为其引发异常的原始消息在重新启动后将不再重试。

这是否记录在我没有找到的地方?错误处理是特定于活页夹的,还是 scs 将其抽象为在活页夹之间保持一致?我怀疑这是消费者偏移量如何使用 kafka binder 更新的意外结果。我看到添加了 enableDlq kafka 消费者属性,我即将对其进行测试,但我不确定我们如何处理 kafka 中的死信。我对 rabbitmq 中的死信队列很熟悉,但是使用 rabbitmq,我们可以使用 rabbitmq shovel 插件来重新发布和重试 dlq 消息,以涵盖由于临时服务中断导致失败的情况。除了自己编写类似的实用程序外,我不知道 kafka 有任何类似的功能。

更新:启用 enableDlq kafka 消费者属性的测试显示与错误处理相同的消费者偏移量问题。当消费者抛出 RuntimeException 时,我看到 3 次重试,之后未记录错误消息,并且看到一条消息已error.<destination>.<group>按记录发布到,但消费者偏移量未更新并滞后 1。如果我重新启动消费者,它尝试再次处理来自原始主题分区的相同失败消息,重试 3 次并再次将相同消息放在error.<destination>.<group>主题上(重复 dlq 消息)。如果我将另一条消息发布到消费者未引发 RuntimeException 的同一主题分区,则偏移量将更新,并且在重新启动时不再重试原始失败消息。

我认为无论 enableDlq 是否为真,当消费者抛出错误时,消费者都应该更新 kafka 中的消费者偏移量。这至少可以使所有重试失败的消息被丢弃(当 enableDlq 为 false 时)或发布到 dlq 并且永远不会重试(当 enableDlq 为 true 时)。

0 投票
1 回答
243 浏览

spring-cloud-stream - Spring Cloud Stream 1.0.0-RELEASE 和线程

以前当我使用 RC2 时,我观察到每个 kafka 消费者似乎都在自己的线程中执行(在本例中为pool-6-thread-1):

但是在升级到 1.0.0RELEASE 之后,似乎同一个线程(kafka-binder-)被用于所有消费,无论设置concurrency属性如何:

这是新的默认行为吗?如何配置我的应用程序以利用 MessageChannelBinders 池,每个池在各自的线程中执行?

0 投票
1 回答
1710 浏览

spring-integration - spring-cloud-stream kafka avro

是否有任何支持或计划支持来自 spring-cloud-stream 中的融合平台的 avro 和/或模式注册表?我看到 spring-integration-kafka 1.3.0.RELEASE 中对 avro 有依赖关系,它被排除在 spring-cloud-stream-binder-kafka 之外,但是 spring-integration-kafka 的主分支(2.0)和 spring- kafka 没有我能找到的任何 avro 依赖项。

0 投票
2 回答
5396 浏览

java - How to intercept Spring Cloud Stream messages?

Spring allows interception of messages for many of their products, like RestTemplate and SpringMVC. Is it possible to intercept Spring Cloud Stream messages? For both incoming and outgoing messages.

0 投票
1 回答
529 浏览

spring-boot - Spring Cloud 数据流——微服务部署

团队,目前我正在研究 spring-xd 并用作数据分析和纱线作业的运行时容器。

我的问题是

1) 我可以利用与 spring-xd 相同的环境设置吗?
2)从文档中,我读到它可以部署为微服务,是否使用嵌入式驱动程序进行流处理?如果它使用嵌入式驱动程序,我可以使用它在集群环境中部署,并使用与 spring-xd 相同的基础架构吗?
3) 是否有为 Apache Spark 构建的特定包装器?

我的环境:Spark 1.6.1、Hadoop 2.7.2、zookeeper 3.6.8、redis 3.2、spring-xd-1.3.1

对此特定查询的任何帮助将不胜感激。