问题标签 [spring-integration-aws]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-integration - 将 SqsConsumer 从 spring-coud-aws-messaging 迁移到 spring-integration-aws
我想使用 spring-integration-aws 向 AWS SNS 发送消息并从 AWS SQS 接收消息。我在理解如何将我的应用程序从 spring-coud-aws-messaging 迁移到使用它时遇到了一些麻烦。我的代码基本上是一个 SQS 配置类:
使用 customDestinationResolver (基本上是因为队列名称只能由于不同的部署而动态)定义为:
仅此而已:基本上使用此配置,我只需要使用注释@SqsListener("foo-queue")
或@SqsListener("bar-queue")
适当的使用者方法
我一直在尝试遵循https://github.com/spring-projects/spring-integration-aws#spring-integrations-extensions-to-aws中的文档,而我在 SQS 部分中难以理解的内容是“入站通道适配器”一章我应该将哪些队列作为参数提供给 SqsMessageDrivenChannelAdapter() 的构造函数,因为我使用的是自定义destinationResolver,以及我究竟如何使用这些消息,或者它是否应该与 Spring 的 @SqsListener 注释一起使用-cloud-aws-messaging 和以前一样。
非常感谢您的帮助,如果这不是问的正确地方,或者这是一个非常愚蠢的问题,我很抱歉,我只是第一次尝试这个:)
java - Spring Integration - SQS 消息的异步服务激活器
我正在尝试使用 void 异步服务激活器为 sqs 队列实现集成流,但从未触发处理逻辑。消息在流程中收到,由我的自定义转换器成功转换,但异步处理从未完成。
这是我的配置类:
这是我的处理程序:
doHandle()
永远无法达到方法中的逻辑。
与将返回 void 的同步处理程序的相同集成流程完美地工作:
我错过了什么吗?或者我可以通过使用 Mono 来实现它吗?我对弹簧集成和异步处理都没有太多经验。
amazon-dynamodb - 当 Spring kinesis binder 尝试释放锁定时,应用程序停止期间 DynamoDB 解锁错误
当应用程序停止时,kinesis binder 尝试解锁 dynamoDB 并抛出 unlock failed 异常。
我按照这个原始帖子处理了类似的问题,并将 spring-integration-aws 版本更新为 v2.3.1.RELEASE。但是在应用程序关闭时仍然看到相同的错误。
使用的依赖项:
spring-integration - spring-integration-aws S3StreamingMessageSource 如何删除远程文件?
我目前使用来自 spring 集成 aws 的 S3StreamingMessageSource。我将流传递给集成流。
如何从 S3 中删除检索到的远程文件?
在 S3InboundFileSynchronizer 中有一个方法。
像这样:
谁能帮助我或告诉我一个好的解决方法?
java - 使用 Spring 集成重新排序 AWS 标准 SQS 消息
我目前正在使用 SNS 和 SQS 来集成不同的远程系统。生产者向订阅了 SQS 的 AWS SNS 发送消息。消费者是启用了 spring 集成的 Spring Boot 应用程序,它使用 @SqsListener 轮询 SQS(默认配置,无需调整)。这一切都很好。
要求是以正确的顺序处理这些消息,这主要是从生产者的角度来看按时间顺序的创建时间驱动的。由于它们中的一些可能是依赖的,因此我必须考虑到原始订单来一一处理它们。
问题是我知道当侦听器轮询 SQS 时,SQS 不能保证这些消息按顺序到达。我已经通过以正确的顺序以编程方式向 SNS 发送几条消息来调查这一点,我希望它们被处理并在 SqsListener 中以稍微不同的顺序接收这些消息。
为了尝试处理这种不需要的影响,我在 SqsListener 之后放置了一个优先通道来缓冲这些消息并让这个通道重新排序消息。
这是按顺序处理标准 SQS 消息的正确方法吗?我是否应该调整侦听器配置,例如将其更改为长轮询?
spring-integration - Spring Integration AWS Local SQS
I want to implement spring-integration-aws to send and receive messages with SQS. I am looking at localstack and would like to know the recommendation of the spring team.
Which tool/api should I use for local setup of spring integration flows for SQS inbound and outbound adapters?
Also, will there be examples of AWS in spring-integration-samples in future? I am looking for an example with xml config that reads the aws config from credentials and send and receive messages via outbound adapters.
spring-integration - AWS 异步响应路由到 sqs-outbound-channel-adapter 的成功通道
我已经注册了一个AsyncHandler
并且还添加了一个success-channel
到 SQS 出站流。有success-channel
一个int:logging-channel-adapter
端点。但是,我无法看到来自此适配器的任何日志。AsyncHandler
能够接收回叫,但success-channel
. 在SqsMessageHandler
我看到我们在方法中设置了一个输出通道obtainAsyncHandler
,但我没有在success-channel
任何地方看到设置。我错过了什么吗?我更喜欢使用成功和失败通道而不是 AsyncHandler 回调 Impl 以避免在我的类中包含 AWS 特定代码。
另外 my <int-aws:sqs-outbound-channel-adapter>
位于<int:chain>
没有输出通道的 a 内,因为发送消息时流程结束。
编辑 - 添加配置 这是我可以让它记录回调的唯一方法。
在这里,我可以在链(出站通道)和 sqs-outbound(成功通道)中使用相同的通道
无法让它像下面这样工作:
spring-integration - Spring Integration:自上次轮询以来仅摄取新文件
我正在使用 Spring Integration 从 S3 存储桶中轮询文件。处理后我无法删除文件,所以我添加了S3PersistentAcceptOnceFileListFilter
. 我有很多文件并且经常轮询,这给FILE_SYNC_METADATA_STORE
桌子造成了不必要的压力。
我想知道是否有一种FileListFilter
实现可以过滤自上次轮询以来新/更新的文件,而不必为每个文件查询数据库。
spring-integration - 使用 Spring Integration SQS 在多个队列上聚合未来/响应 SQS Acks/Error
我正在使用一个Gateway
和在网关内部,我正在循环通过将消息发送到 2 个队列sqs-outbound adapter
。
我想实现这样的目标:
网关和 SQS 适配器配置
我正在查看apply-sequence
and dataChannel
,aggregator
但aggregator
必须能够同时处理ack
and error
。
问题:如何将 2 个队列的聚合响应(确认 + 错误)返回到网关?
java - ThreadPoolTaskExecutor,池中只有一个线程,不处理来自 AWS 队列的消息
我为应用程序上注册的每个队列创建了一个按需 ChannelAdapter、AsyncTaskExecutor 和一个 Channel。我注意到当maxPoolSize
AsyncTaskExecutor 的数量等于 1 时,消息没有被处理。这就是创建 AsyncTaskExecutor bean 的方式。
我注意到的另一件事是,当调用此方法时,java.util.concurrent.ThreadPoolExecutor#execute
此条件workerCountOf(c) < corePoolSize
始终为 false。完整的项目链接在这里https://github.com/LeoFuso/spring-integration-aws-demo