问题标签 [spring-cloud-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
7152 浏览

spring-cloud - 带有 Rabbit Binder 的 Spring Cloud Stream - 源/接收器队列名称不匹配

最近开始玩 Spring Cloud Stream 和 RabbitMQ binder。

如果我在两个服务想要传递消息时正确理解所有内容,一个应该配置source来发送消息,另一个应该配置sink来接收消息 - 两者都应该使用相同的channel

我有一个名为testchannel. 不过,我注意到该创建了 RabbitMQ 绑定:

  • 交换testchannel
  • 路由键testchannel
  • 队列testchannel.default(持久),

sink创建了 RabbitMQ 绑定:

  • 交换testchannel
  • 路由键#
  • 队列testchannel.anonymous.RANDOM_ID(独家)。

为简洁起见,我跳过了前缀。

现在,当我运行这两个应用程序时。第一个将消息发送到testchannel交换,然后将其路由到两个队列(我假设路由键是testchannel)。第二个应用程序使用随机队列中的消息,但从不使用默认队列中的消息。

我的另一个问题是 - 第二个应用程序只使用sink,但它也为输出通道创建绑定,output默认情况下,因为我没有指定任何东西。

我使用相同的 Gradle 脚本构建这两个应用程序:

第一个应用程序属性:

第一个应用程序源代码:

第二个应用程序属性:

第二个应用程序源代码:

所以我的问题是。

  • 和接收不应该使用相同的通道并因此使用相同的代理队列吗?实现这一目标的正确配置是什么?(我的目标是拥有多个接收服务实例,但只有一个应该使用该消息。)
  • 当我只使用sink时,框架应该创建输出绑定吗?如果是,如何禁用它。
0 投票
0 回答
295 浏览

spring-cloud - Spring Cloud Bus 配置一次刷新即可将配置分发给所有客户端

我正在尝试配置可靠的配置服务,该服务在配置更改发生时使用总线更新客户端。我启动了两个监控本地文件系统的配置服务器。两台eureka server,以便客户端可以在启动时发现config service(即eureka first config type)。我使用 rabbitMQ 作为 amqp 总线。当前行为如下:如果我更新配置文件并在http://config-server1/bus/refresh上请求发布,配置服务器会发送通知并且只有一个客户端选择它。因此,要更新 3 个客户,我需要发布 3 个帖子。问题:如何配置总线,以便 /bus/refresh 的一篇帖子更新所有客户端。

先感谢您。

0 投票
1 回答
8457 浏览

json - Spring Cloud Stream 消息从/到 JSON 转换配置

我正在使用带有 RabbitMQ 绑定器的 Spring Cloud Stream。它适用于byte[]有效负载和 Java 本机序列化,但我需要使用 JSON 有效负载。

这是我的处理器类。

InputDto并且OutputDto是带有 Jackson 注释的 POJO。

  • 如何配置 JSON 转换策略?
  • 邮件标头应该如何被接受和处理?
0 投票
1 回答
548 浏览

spring-cloud-dataflow - CloudFoundry 中的 Spring Cloud Dataflow ticktock 示例失败

我正在尝试获取在 CloudFoundry 上运行的 Spring Cloud Dataflow 的最基本示例。我已按照此处的步骤操作:http: //docs.spring.io/spring-cloud-dataflow-admin-cloudfoundry/docs/current-SNAPSHOT/reference/htmlsingle/#getting-started使管理应用程序在我的组织/空间。然后我尝试从http://cloud.spring.io/spring-cloud-dataflow/创建最基本的示例,即创建“ticktock”流:

我可以看到两个应用程序ticktock-time和ticktock-log都是在空间中创建的,所需的服务“redis”绑定到这些应用程序并尝试启动。不幸的是,他们并没有完全启动,因为他们无法访问“redis”服务。在日志中我们发现:

上下文初始化期间遇到异常 - 取消刷新尝试:org.springframework.context.ApplicationContextException:无法启动 bean 'outputBindingLifecycle';嵌套异常是 org.springframework.context.ApplicationContextException: 无法启动 bean 'inputBindingLifecycle'; 嵌套异常是 org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection;嵌套异常是 redis.clients.jedis.exceptions.JedisConnectionException: 无法从池中获取资源

这最终是由

引起:redis.clients.jedis.exceptions.JedisConnectionException:java.net.ConnectException:连接被拒绝

我是否缺少介于两者之间的一些配置步骤?

亚历山大

0 投票
1 回答
2723 浏览

spring-cloud-stream - spring-cloud-stream kafka json 内容类型

我正在尝试将 spring-cloud-stream 1.0.0.M4 与各种活页夹一起使用。我没有在生产者和消费者之间共享我的数据传输对象(谁做的?)所以我遇到了需要为绑定包含内容类型配置的问题。

生产者配置:

生产者代码:

消费者配置:

消费者代码:

我用 rabbit、redis 和 kafka binders 试过这个,发现 json hack 只适用于 rabbit。我在使用 kafka 和 redis 的消费者中收到以下错误。

kafka消费者错误:

redis消费者错误:

0 投票
2 回答
2004 浏览

spring-cloud-stream - spring-cloud-stream kafka 离线消费者消息丢失

我一直在玩 spring-cloud-stream (1.0.0.BUILD-SNAPSHOT with kafka binder)并注意到消费者离线时,发送的任何消息都会丢失。当我启动消费者时,它不会处理发送到 kafka 的请求的积压。这是故意的吗?

0 投票
1 回答
2824 浏览

spring-cloud-stream - spring-cloud-stream 请求-回复消息模式

是否有一种应与 spring-cloud-stream 一起使用的请求-回复模式?我可以在 spring-cloud-stream 上找到的所有文档都面向 MessageChannel.send 即发即弃类型的生产者,我对 spring-integration 中的 @MessagingGateway 很熟悉,但我不确定那会如何使用 spring-cloud-stream。当您有一个 REST POST 端点来保存具有分配标识符的实体并且您需要将分配的标识符返回给调用者但您仍想使用消息传递时,这将很有用。

0 投票
1 回答
9060 浏览

spring-cloud-stream - spring-cloud-stream kafka 消费者并发

使用 spring-cloud-stream 的 kafka binder,如何配置并发消息消费者(在单个消费者 jvm 中)?如果我理解正确,使用 kafka 时并发消息消耗需要分区,但scs 文档表明要使用分区,您需要通过 partitionKeyExpression 或 partitionKeyExtractorClass 在生产者中指定分区选择。Kafka 文档提到了循环分区。

scs 文档根本没有提到 spring.cloud.stream.bindings.*.concurrency ,尽管这在我上面描述的用例中似乎很重要。使用生产者配置

和消费者配置

我似乎得到了我想要的行为(至少在某种程度上)。我可以看到有时有 3 个消费者线程处于活动状态,尽管除了轮询之外似乎确实存在一些分区,因为有些消息似乎在等待繁忙的消费者线程并在该线程完成后被消耗。我认为这是因为消息被发送到同一个分区。

当我没有指定 partitionKeyExpression 或 partitionKeyExtractorClass 时,生产者是否使用了一些默认的密钥提取和分区策略?这是使用 kafka 设置 scs 消费者的合适方法,您希望多个线程消费消息以增加消费者吞吐量?

0 投票
2 回答
5238 浏览

java - Spring Cloud Stream 消息 JSON 转换不起作用

我按照我之前的问题Spring Cloud Stream message from/to JSON conversion configuration和配置的流进行了描述,但是,我无法使其正常工作。

我的设置如下。我有两个应用程序AB. 应用程序A使用输入通道one,输出two。应用程序B使用输入two。频道two配置为内容类型application/json

应用 A. 属性。

监听器方法。

应用 B. 属性。

监听器方法。

当我手动将带有正确 JSON 字符串的消息发送到 channelone时,它会被正确处理并two作为 JSON 消息发送到 channel(标题与上述问题中描述的完全相同)。之后,twoApp B 在频道上接收到它并抛出异常:Method handle(java.lang.String) cannot be found

当然,当我创建这两种方法时,将 Dto 和 String 作为输入处理,它可以工作,但总是调用 String 方法并且必须自己反序列化有效负载。

我在哪里弄错了吗?如何使用这样的签名设置方法:public Dto handle(Dto incoming)

0 投票
1 回答
622 浏览

spring-integration - Spring集成中Codec和MessageConverter的区别

Codec是在 Spring Integration 4.2 中引入的。

但是,文档中的描述并没有真正描述编解码器与 a 的不同之处,MessageConverter或者在哪些场景中使用哪个抽象?

基本上我想知道的是:

  • Codec当抽象看起来与 a 的作用相似时,为什么要引入抽象MessageConverter
  • 为什么要使用 a Codecover aMessageConverter反之亦然?
  • 您什么时候会选择使用其中一种?

这个问题在Spring Cloud Stream的上下文中突出显示,其中配置了默认的Kryo 编解码器,但最近已经解决MessageConverter