问题标签 [spring-cloud-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
504 浏览

json - 通过 JSON 标头进行 Spring Cloud Dataflow 路由

我一直在尝试使用 Spring Cloud Dataflow 创建一个流,但运气不佳(主要是由于缺乏文档)。

问题 1:访问 dockerized Spring Cloud Dataflow 的 Web GUI 我有一个在基本 Ubuntu 容器上运行 Kafka 的 dockerized Spring Cloud 服务器。由于某种原因,我无法访问 Windows 中的 Web GUI(位于 < docker-machine ip >:9393/dashboard)。但是,我有一个单独的 Docker Ubuntu 容器运行 Nginx 反向代理,当我转到 < docker-machine ip >/index.html 等时会显示它。我认为这不是端口问题,我有 Spring使用 -p 9393:9393 设置云容器,否则该端口未使用。

问题 2:按 JSON 标头路由 我的最终目标是从 Nginx 加载一个文件并根据其 JSON 标头(有两个不同的 JSON 标头)进行路由,然后将查询提取到 Cassandra。除了按 JSON 标头排序之外,我可以完成所有这些操作。您会推荐我使用哪个应用程序?

0 投票
1 回答
317 浏览

java - Spring Cloud Stream 项目是否需要 Web 服务器(Tomcat)?

我正在使用Spring Cloud Stream编写消息驱动的微服务。

  • 项目A:Producer(Spring Cloud Stream,集成tomcat及其端口:8080)
  • 项目B:Consumer(Spring Cloud Stream,集成tomcat及其端口:8081)
  • 消息代理:RabbitMQ(amqp 端口:5672)

我的问题是:对于项目 A 和 B,他们是否需要集成 Web 服务器/端口(Tomcat)?或者只是一个普通的java应用程序就可以了?

0 投票
1 回答
631 浏览

spring-integration - Sink 组件在 Spring Cloud 数据流中无法使用 kafka 获取正确的数据

我不是以英语为母语的人,但我尽量清楚地表达我的问题。我遇到了这个困扰我两天的问题,但我仍然找不到解决方案。

我已经构建了一个流,它将在 Hadoop YARN 中的 Spring Can 数据流中运行。

流由 Http 源、处理器和文件接收器组成。

1.Http Source
HTTP Source 组件有两个输出通道绑定两个不同的目的地,即在 application.properties 中定义的 dest1 和 dest2。

spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2

以下是 HTTP 源代码片段供您参考。

2.处理器
处理器有两个多输入通道和两个输出通道绑定不同的目的地。目标绑定在处理器组件项目的 application.properties 中定义。

下面是处理器的代码片段。

3.文件接收器组件。

我使用 Spring 的官方 fil sink 组件。maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT

我只是在其 applicaiton.properties 文件中添加目标绑定。spring.cloud.stream.bindings.input.destination=fileSink

4.发现:

我期望的数据流应该是这样的:

Source.handleRequest() -->Processor.handleRequest()

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

应该只将字符串“由 transform2 处理”保存到文件中。

但是经过我的测试,数据流实际上是这样的:

Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

“由transform1处理”和“由transform2处理”字符串都保存到文件中。

5.问题:

虽然 Processor.handleRequest() 中输出通道的目的地绑定到 hdfsSink 而不是 fileSink,但数据仍然流向文件 Sink。我无法理解这一点,这不是我想要的。我只希望来自 Processor.handleRequest2() 的数据流到文件接收器而不是两者。如果我做得不对,谁能告诉我该怎么做以及解决方案是什么?这让我困惑了2天。

感谢您的热心帮助。

亚历克斯

0 投票
3 回答
6147 浏览

java - 使用 Spring Cloud Stream Source 将方法结果发送到流

我正在尝试在 Spring Boot 应用程序中创建一个 Spring Cloud Stream Source Bean,它只是将方法的结果发送到流(底层 Kafka 主题绑定到流)。

我见过的大多数 Stream 示例都使用@InboundChannelAdapter注释通过轮询器将数据发送到流。但我不想使用轮询器。我尝试将轮询器设置为一个空数组,但另一个问题是使用 @InboundChannelAdapter 时您无法拥有任何方法参数。

我正在尝试做的总体概念是从入站流中读取的。进行一些异步处理,然后将结果发布到出站流。因此,使用处理器似乎也不是一种选择。我正在使用@StreamListener接收器通道来读取入站流并且有效。

这是我一直在尝试的一些代码,但这根本不起作用。我希望它会这么简单,因为我的 Sink 是但也许不是。寻找某人向我指出不是处理器的源示例(即不需要收听入站通道)并且不使用@InboundChannelAdapter或给我一些设计技巧来完成我需要做的事情一种不同的方式。谢谢!

0 投票
1 回答
1087 浏览

rabbitmq - Redis 是 Spring Cloud Stream 的好主意吗?我应该使用 Kafka 还是 RabbitMQ?

我正在部署一个小型 Spring Cloud Stream 项目,仅使用 http 源和 jdbc 接收器(每个 3 个实例)。估计负载为 10 次点击/秒。

我正在考虑使用 redis,因为我觉得它更舒服,但在最新的文档中,几乎所有的参考都是 kafka 和 RabbitMQ,所以我想知道将来是否不支持 redis,或者使用时是否有任何问题雷迪斯。

问候

0 投票
1 回答
281 浏览

spring-cloud-stream - 具有主题通配符的 Spring 云流 rabbitMQ 消费者

我正在尝试使用 spring cloud stream rabbitmq 并将消息发送到同一个交换但具有不同的主题。并且会有消费者使用绑定中的通配符来消费一个或多个主题的消息。有这种情况的例子吗?

我猜我可以将主题用作 partitionKey。这样,所有消息都将转到同一个交换器。但我不清楚如何设置消费者

0 投票
1 回答
752 浏览

spring-cloud-stream - spring cloud streamrabbitmq在consumer中指定topic

我正在尝试发送具有不同主题的消息,然后将消费者配置为它想要收听的主题。

我的想法是使用单个目标“domainMessage”并使用自定义分区策略。我有一个枚举,我只是使用该值作为 partitionKey,partitionStrategy 将只返回密钥(假设密钥将始终等于生产者端的分区计数)。

这行得通吗?如果是这样,我不确定如何配置消费者。

我的制片人有以下 application.properties

我的 PartitionKeyExtractorImpl 看起来像

我的 PartitionStrategyImpl 看起来像

我的消费者 application.properties 看起来像

假设payload.getType()可以返回 1-3 之间的值。如何将使用者配置为仅收听 partitionKey 为 1 和 3 的消息?

0 投票
2 回答
865 浏览

spring-cloud - Spring Cloud Data Flow DLQ 配置不起作用

我正在尝试在 Spring Cloud Data Flow 中配置 DLQ。这是流定义以及我如何部署它

在自定义转换 - 处理器代码中,我已经提到

这意味着如果消息包含 ERROR 则 RunTimeException 并且我想在 DLQ 中捕获这些消息。但似乎当我运行代码时,我没有得到任何名称为 test-tran 的 Kafka DL 队列。

我是否需要设置更多属性以启用 DLQ,或者我需要更改代码中的某些内容以正确使用 DLQ。

自定义转换代码

转换服务应用程序.java

TransformationMessageEndPoint.java

pom.xml

添加模块

添加流

部署流

0 投票
0 回答
307 浏览

spring-integration - Spring Cloud Stream Kafka 动态通道 - 指定常用生产者属性

使用 spring-cloud-stream kafka 的 BinderAwareChannelResolver ,我基于事件动态解析通道,因为我有很多生产者通道并且所有通道输出 kafka 消息都将被非弹簧应用程序消耗,我需要设置 headerMode在 application.properties 中将每个通道作为原始通道,这使我的配置变得混乱,有没有办法为我使用的所有生产者指定 headerMode = raw common,而不是单独为每个通道指定。

0 投票
2 回答
4083 浏览

spring - Spring Cloud Stream 不创建队列

我正在尝试使用 RabbitMQ 配置一个简单的 Spring Cloud Stream 应用程序。我使用的代码主要取自spring-cloud-stream-samples。我有一个入口点:

以及示例中的简单消息生产者:

此外,这里是 application.yml 配置:

当我运行该示例时,它会连接到 Rabbit 并创建一个名为 test 的交换。但我的问题是,它不会自动创建队列和绑定。我可以看到 Rabbit 中的流量,但我所有的消息都消失了。虽然我需要他们留在某个队列中,除非他们被消费者阅读。

也许我误解了一些东西,但从我阅读的所有主题来看,Spring Cloud Stream 似乎应该自动创建一个队列和一个绑定。如果没有,我该如何配置它以便我的消息被持久化?

我正在使用 Spring Cloud Brixton.SR5 和 Spring Boot 1.4.0.RELEASE。