我正在使用一个Gateway
和在网关内部,我正在循环通过将消息发送到 2 个队列sqs-outbound adapter
。
我想实现这样的目标:
jsonArray.forEach(data -> {
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
futures.add(result);
});
futures.forEach(f -> {
System.out.println("Future Result -> "+ f.get())
});
网关和 SQS 适配器配置
<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
<int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>
<!-- Send to 2 Queues -->
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-1"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-2"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
我正在查看apply-sequence
and dataChannel
,aggregator
但aggregator
必须能够同时处理ack
and error
。
问题:如何将 2 个队列的聚合响应(确认 + 错误)返回到网关?