问题标签 [spring-integration-aws]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
541 浏览

spring-cloud-stream - 无法在 Kinesis Binder 中以批处理模式使用消息

我正在尝试将来自 Kinesis 流的消息作为批处理

我在用

应用程序.yml

代码

根据文档,当 listenerMode 为批处理时,预计将列表作为有效负载

我已发送消息为 {"ab":"aa"}

我收到消息错误为

0 投票
1 回答
236 浏览

spring-integration - Spring Integration:MessageSource 不支持 errorChannel 标头

我有以下流程:

我发现如果s3StreamingMessageSource.receive抛出异常,错误最终会出现在为管道中的前一个流配置的错误通道中,而不是S3_ERROR_CHANNEL为此流配置的那个。不确定它是否与这个问题有关。

0 投票
1 回答
739 浏览

spring-boot - 弹簧集成 - 如何将分割线聚合成一批 x?

我们需要将事件发送到 kinesis,并且由于 aws 定价,我们计划将记录分批放入 kinesis。

我们读入一个 csv 文件,然后使用文件拆分器吐出行并将每一行转换为 json。

那么在转换为 json 之后,我们如何将这些行批处理为每批 25 行,以便我们的 kinesis serviceActivator 可以发送批处理?

任何示例将不胜感激。

编辑:我按照“Artem Bilan”的建议添加了它并且有效

但我得到错误:

  1. 我正在使用markers="true",以便我们知道它是文件的结尾,因此我们可以将其重命名为“.done”。

  2. 在拆分器和转换器之间添加了一个路由器,当 FileMarker 为 END 时,它仅路由到“nullChannel”或“fileProcessedChannel”,否则,拆分线进入 default-output-channel="lineOutputChannel"

    /li>

路由器代码看起来像这样

但我收到此错误:

有任何想法吗?

0 投票
1 回答
50 浏览

spring-integration - 拆分器在异常期间中止,不处理后续消息

我需要拆分消息并一一处理。如果任何消息失败,我想将其报告给错误通道并继续处理下一个可用消息

我正在使用带有 1.0.0-SNAPSHOT 的 spring cloud aws 流启动器

我使用拆分器编写了一个示例程序

当我发送消息为Hello时,期望'h','e','o'应该被处理,但是'l'应该被报告为错误。

但是这里在'l'之后,处理没有恢复。

有什么办法可以做到这一点。

0 投票
1 回答
399 浏览

spring - 来自多个 s3 存储桶的 Spring 集成 AWS s3-inbound-streaming-channel-adapter 流

我正在使用基于 XML 的 spring 集成并用于s3-inbound-streaming-channel-adapter单个s3 存储桶流式传输。

我们现在需要从两个s3 存储桶进行流式传输。

那么是否可以s3-inbound-streaming-channel-adapter多个存储桶中流式传输?

或者我需要s3-inbound-streaming-channel-adapter每个s3 存储桶创建一个单独的存储桶吗?

这是我当前为单个 s3 存储桶设置的,它确实有效。

提前致谢。

更新:

我最终得到了两个 s3-inbound-streaming-channel-adapter,如下面的Artem Bilan所述。

但是,对于每个入站适配器,我必须分别声明acceptOnceFilter 和 metadataStore 的实例。

这是因为如果我只有一个 acceptOnceFilter 和 metadataStore 实例,并且它们被两个入站适配器共享,那么就会开始发生一些奇怪的循环。

例如,当file_1.csv 到达bucket-1并得到处理时,如果您将相同的file_1.csv 放在bucket-2上,则开始发生奇怪的循环。不知道为什么!所以我最终为每个入站适配器创建了 acceptOnceFilter 和 metadataStore。

`

`

0 投票
1 回答
1229 浏览

spring - spring 集成错误处理不同类型

我正在使用 spring 集成 XML,并且想知道处理错误处理的最佳方法是什么。

我正在使用 s3 连接到 s3 s3-inbound-streaming-channel-adapter,然后从 s3 存储桶转换 csv 文件。

可能发生的潜在错误是:

  1. 如果文件中的一行无效,则可能会发生转换异常,因此可能会引发自定义错误:

LineTransformationException

  1. 如果在 s3 存储桶上错误地放置了图像文件并再次出现转换异常怎么办

  2. s3 可能会关闭并且可能会得到

原因:com.amazonaws.SdkClientException:无法执行 HTTP 请求:连接到 localhost:4572 [localhost/127.0.0.1] 失败:连接被拒绝

许多已知和未知错误的列表可以继续......

  1. 那么处理所有这些错误的最佳方法是什么?通过自定义ErrorHandlerexception-type-router

  2. 如果做到这一点,ErrorHandler那么如何满足这么多的例外。

  3. 是否有一个包罗万象的异常处理程序?

    公共类错误处理程序{

    公共无效句柄失败(消息错误消息){

    }}

或者

0 投票
1 回答
352 浏览

spring - 使用 Spring-Integration-Kinesis 消息驱动适配器时的内部故障

我已经设置了这个 KinesisMessageDrivenChannelAdapter:

大多数时候,它工作正常。但有时,我会收到这样的信息:

之后,适配器完全停止工作,而不会挂起应用程序。我指定了要使用的错误通道,如果需要,我很乐意重新启动应用程序以使适配器重新联机。但这似乎不是一个选择。

如何在其中构建错误处理?

0 投票
1 回答
647 浏览

spring-boot - 根据配置创建 Kinesis 消费者

我想开发一个消费者库,它将从弹簧属性文件中读取运动流名称并创建/激活关联的消费者。

在我见过的所有示例中(例如https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kinesis-samples/kinesis-produce-consume/src/main/java/demo /stream/OrderStreamConfiguration.java),消费者是基于注释的,应该手动定义。

我想知道我可以使用 spring-integration/spring-aws-kinesis 库中的哪个类来一般地创建消费者。

谢谢

0 投票
1 回答
285 浏览

spring-integration - Kinesis Streams 和 Spring 集成通道

我正在开发一个消费者,它使用来自多个 Kinesis 流的事件。我有一些问题要了解最佳实践。

  1. 我应该为每个流创建一个频道吗?在“每个流的通道”或“所有流的一个通道”之间做出决定时应考虑哪些因素?

  2. 哪个渠道更适合我的案例表现?有不同的频道类型,如 PollableChannel、SubscribaleChannel 和 DirectChannel。

谢谢

0 投票
1 回答
265 浏览

spring-integration - Spring Cloud 中的反序列化

我有一个具有以下内容类型配置的生产者

在消费者方面,我使用弹簧集成(KinesisMessageDrivenChannelAdapter)将事件路由到不同的通道。当我在监听器类上收到消息时,如下所示:

编组到 MyEvent 失败。在堆栈错误中,我可以看到内容类型是有效负载的一部分,有效负载仍未从 json 反序列化为 POJO。

我想知道如何在进行任何其他转换之前反序列化消息。我没有找到任何可以将 MessageConverter 设置为适配器的方法。

我感谢您的帮助。

谢谢