问题标签 [spring-cloud-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1379 浏览

java - 如何使用 Rabbit 绑定创建具有事务和 DLQ 的处理器?

我刚刚开始学习 Spring Cloud Streams 和 Dataflow,我想知道对我来说重要的用例之一。我创建了示例处理器乘法器,它接收消息并将其重新发送 5 次以输出。

您可以看到,在第 5 次发送此处理器之前崩溃。为什么?因为它可以(程序抛出异常)。在这种情况下,我想在 Spring Cloud Stream 上练习故障预防。

我想要实现的是在 DLQ 中支持输入消息和之前发送的 4 条消息被还原并且不被下一个操作数使用(就像在正常的 JMS 事务中一样)。我已经尝试在我的处理器项目中定义以下属性,但没有成功。

你能告诉我是否可能,还有我做错了什么?我会非常感谢一些例子。

0 投票
1 回答
342 浏览

spring-cloud - spring-cloud-deployer - 指定 Maven 远程存储库 URL

Spring Cloud 数据流 - Cloudfoundry 服务器 (v1.0.0.M4)

在尝试在属性文件中外部化配置信息,然后在安装时使用 Spring Cloud Config Server 提供这些环境设置时,我有一个关于我通常会在 YML 中以某种方式放置的一些值的问题显现。

首先,在 YML 清单中,我可以这样定义它们:

那么我如何将这些放入属性文件中呢?这是我的猜测:

0 投票
1 回答
1248 浏览

spring-cloud-config - 外化 Spring Cloud Data Flow Config - Spring Cloud Config Server

仍在努力解决这个任务。我有以下东西:

  • Pivotal Cloud Foundry - 弹性运行时 1.7.5
  • Spring Cloud 数据流服务器 Cloudfoundry - 1.0.0.M4
  • Spring Cloud Config Server (SCCS) 服务(服务名称“scdf-sccs”)

SCCS 实例已正确连接到我的 BitBucket 存储库,并且它是在线的。我正在尝试使用以下清单推送 spring-cloud-dataflow-server-cloudfoundry-1.0.0.M4.jar:

在我的 BitBucket 存储库中,我有以下包含环境变量的属性文件:scdf-dev-abc.properties

以下是该属性文件的内容:

当我使用清单和指定的 SCCS 绑定推送应用程序时,它没有获取属性文件中指定的属性,并且我在解决问题时遇到了困难。我的配置或我提供应用程序名称和弹簧配置文件的方式是否缺少某些内容?

在弄清楚为什么 Spring Cloud Config Server 没有读取配置方面,我还应该寻找什么?例如,如果 Spring Cloud Data Flow 应用程序作为某个进程或用户运行,而该进程或用户无权访问 BitBucket 存储库,这有关系吗?

0 投票
2 回答
312 浏览

spring-cloud-stream - 在部署的流应用程序中设置 env var 值

使用以下内容:

  • Spring Cloud 数据流服务器 Cloudfoundry 1.0.0.RC1
  • Spring Cloud 配置服务器服务

我使用附加依赖项重建了 spring-cloud-dataflow-server-cloudfoundry,以按照说明提供它与 Spring Cloud Config 服务器的绑定。它似乎按预期工作,所以这很好。

现在出现了问题,当我尝试使用我开发的自定义模块定义流时,模块的环境变量(特别是 ENCRYPT_KEY)位于我的 git 存储库中的清单 YML 文件中。

清单文件的名称是customapp-dev.yml. 清单如下所示:

属性文件的名称是customapp-dev.properties. 属性文件如下所示:

该应用程序在启动 jar 中有一个名为application.properties. 它看起来像这样:

最后,当我部署流时,我在命令行中提供了一个附加属性,如下所示:

跟踪应用程序部署的日志,我可以看到正在读取配置服务器实例,并且正在正确解析应用程序名称和配置文件。配置客户端正在映射 YML 清单和我的 git 存储库中的属性文件。

但是,错误表明占位符无法解密customapp.password

如果我为已部署的(崩溃的)应用程序显式设置 ENCRYPT_KEY 环境变量,并重新存储它,它会正常启动并像魅力一样工作。

我是否有另一种方法可以在部署时为流应用程序指定环境变量?

0 投票
1 回答
926 浏览

spring - Spring Cloud DataFlow 本地服务器未导入 Tailing 文件的新自定义源

我正在尝试为“Tailing”编写自定义源代码,并使用 spring 示例中的片段来编写该代码。这是实际的代码,

属性文件是,

现在,当我使用此代码运行 junit 测试时,使用下面的 junit 测试用例它工作得很好。

运行 junit 测试时,我确实在控制台中看到了默认文件内容。

现在,当我尝试将其捆绑在一个 jar 中并将其部署到具有版本的本地 spring 云数据流服务器时,我面临两个问题:

  • spring-cloud-dataflow-server-local-1.0.1.BUILD-SNAPSHOT.jar
  • spring-cloud-dataflow-server-local-1.0.0.RELEASE.jar

我使用的 shell jar 版本是,

  • spring-cloud-dataflow-shell-1.0.1.BUILD-SNAPSHOT.jar

我面临的问题是:

  1. 源类型已成功部署并显示在应用列表中,但当我运行应用信息 source:tail时,应用属性信息未显示,请查看下面的屏幕截图。

外壳截图

jar 结构与 File 源引导 jar 完全相同,是使用 Spring 的 Maven 包装器构建的。带有元数据的 json 属性文件也是根据预期生成的,并且存在于捆绑的 spring 源 jar 中的 META_INF 中。

  1. 源本身不起作用。当我尝试将这个新的尾源与日志接收器一起使用时,它永远不会起作用。

最后,我确实知道它可能无法在云服务器上运行,但计划是在本地服务器-本地数据流服务器上运行它。我也知道我可以尝试使用 spring 集成框架本身,但它是通过 spring cloud Dataflow 完成此任务的外部请求。

非常感谢任何帮助,我的请求发给Artem ( https://stackoverflow.com/users/2756547/artem-bilan)、Gary(https://stackoverflow.com/users/1240763/gary-russell )和来自所有社区的其他人。提前感谢您的评论和反馈!

0 投票
0 回答
133 浏览

spring-cloud - Spring Cloud Stream 中的 HTTP 智能代理

有什么方法可以在 Spring Cloud Streams 下实现智能代理模式?有一个 HTTP 源可以获取连接到其他微服务(或处理器)的请求,并最终通过请求中建立的相同连接返回响应?

使用 Spring Cloud Data Flow 可能会很困难,但使用 Spring Cloud Stream 应该是可能的,不是吗?

谢谢。

0 投票
1 回答
504 浏览

json - 通过 JSON 标头进行 Spring Cloud Dataflow 路由

我一直在尝试使用 Spring Cloud Dataflow 创建一个流,但运气不佳(主要是由于缺乏文档)。

问题 1:访问 dockerized Spring Cloud Dataflow 的 Web GUI 我有一个在基本 Ubuntu 容器上运行 Kafka 的 dockerized Spring Cloud 服务器。由于某种原因,我无法访问 Windows 中的 Web GUI(位于 < docker-machine ip >:9393/dashboard)。但是,我有一个单独的 Docker Ubuntu 容器运行 Nginx 反向代理,当我转到 < docker-machine ip >/index.html 等时会显示它。我认为这不是端口问题,我有 Spring使用 -p 9393:9393 设置云容器,否则该端口未使用。

问题 2:按 JSON 标头路由 我的最终目标是从 Nginx 加载一个文件并根据其 JSON 标头(有两个不同的 JSON 标头)进行路由,然后将查询提取到 Cassandra。除了按 JSON 标头排序之外,我可以完成所有这些操作。您会推荐我使用哪个应用程序?

0 投票
1 回答
631 浏览

spring-integration - Sink 组件在 Spring Cloud 数据流中无法使用 kafka 获取正确的数据

我不是以英语为母语的人,但我尽量清楚地表达我的问题。我遇到了这个困扰我两天的问题,但我仍然找不到解决方案。

我已经构建了一个流,它将在 Hadoop YARN 中的 Spring Can 数据流中运行。

流由 Http 源、处理器和文件接收器组成。

1.Http Source
HTTP Source 组件有两个输出通道绑定两个不同的目的地,即在 application.properties 中定义的 dest1 和 dest2。

spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2

以下是 HTTP 源代码片段供您参考。

2.处理器
处理器有两个多输入通道和两个输出通道绑定不同的目的地。目标绑定在处理器组件项目的 application.properties 中定义。

下面是处理器的代码片段。

3.文件接收器组件。

我使用 Spring 的官方 fil sink 组件。maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT

我只是在其 applicaiton.properties 文件中添加目标绑定。spring.cloud.stream.bindings.input.destination=fileSink

4.发现:

我期望的数据流应该是这样的:

Source.handleRequest() -->Processor.handleRequest()

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

应该只将字符串“由 transform2 处理”保存到文件中。

但是经过我的测试,数据流实际上是这样的:

Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

“由transform1处理”和“由transform2处理”字符串都保存到文件中。

5.问题:

虽然 Processor.handleRequest() 中输出通道的目的地绑定到 hdfsSink 而不是 fileSink,但数据仍然流向文件 Sink。我无法理解这一点,这不是我想要的。我只希望来自 Processor.handleRequest2() 的数据流到文件接收器而不是两者。如果我做得不对,谁能告诉我该怎么做以及解决方案是什么?这让我困惑了2天。

感谢您的热心帮助。

亚历克斯

0 投票
1 回答
64 浏览

spring - 如何使用 Spring Cloud Dataflow 知道文件何时在 HDFS 上沉没

我正在下载文件源并逐行创建和流式处理以最终沉入 HDFS。为此,我使用 Spring Cloud Dataflow + Kafka。

问题:有什么方法可以知道何时将完整的文件放入 HDFS 以触发事件?

0 投票
2 回答
865 浏览

spring-cloud - Spring Cloud Data Flow DLQ 配置不起作用

我正在尝试在 Spring Cloud Data Flow 中配置 DLQ。这是流定义以及我如何部署它

在自定义转换 - 处理器代码中,我已经提到

这意味着如果消息包含 ERROR 则 RunTimeException 并且我想在 DLQ 中捕获这些消息。但似乎当我运行代码时,我没有得到任何名称为 test-tran 的 Kafka DL 队列。

我是否需要设置更多属性以启用 DLQ,或者我需要更改代码中的某些内容以正确使用 DLQ。

自定义转换代码

转换服务应用程序.java

TransformationMessageEndPoint.java

pom.xml

添加模块

添加流

部署流