0

我正在研究一个用例,我应该轮询 S3 -> 读取内容流 -> 进行一些处理并将其上传到另一个存储桶,而不是在我的服务器中写入文件。

我知道我可以在 Spring aws 集成中使用 S3StreamingMessageSource 来实现它,但我面临的问题是我不知道如何处理通过轮询接收到的消息流

public class S3PollerConfigurationUsingStreaming {
    @Value("${amazonProperties.bucketName}")
    private String bucketName;

    @Value("${amazonProperties.newBucket}")
    private String newBucket;

    @Autowired
    private AmazonClientService amazonClient;

    @Bean
    @InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(bucketName);
        messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
                "streaming"));      
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "s3Channel", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer();
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new S3SessionFactory(amazonClient.getS3Client()));
    }

    @Bean
    public PollableChannel s3Channel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow fileStreamingFlow() {
        return IntegrationFlows
                .from(s3InboundStreamingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
                .handle(streamFile())
                .get();
    }

}

有人可以帮我处理流的代码吗?

4

1 回答 1

2

不知道你的问题是什么,但我看到你有各种各样的担忧。如果您使用消息注释(请参阅@InboundChannelAdapter您的配置),那么s3InboundStreamingMessageSourceIntegrationFlow定义中使用相同注释有什么意义?

无论如何,看起来您已经为自己探索了一个StreamTransformer. 这个具有charset将您InputStream从远程 S3 资源转换为String. 否则它返回一个byte[]. 其他一切都取决于您如何处理转换后的内容以及如何处理。

另外,我认为没有理由将 as3Channel作为 a QueueChannel,因为无论如何您的流程的开始都是可轮询的@InboundChannelAdapter

从高处我会说我们有更多的问题要问你,反之亦然......

更新

不清楚您对InputStream处理的想法是什么,但这确实是一个事实,即S3StreamingMessageSource您将InputStream在下一个处理程序中完全作为有效负载。

也不确定 your 是什么streamFile(),但它必须真正期望InputStream作为请求消息有效负载的输入。您也可以使用StreamTransformer那里提到的:

@Bean
IntegrationFlow fileStreamingFlow() {
    return IntegrationFlows
            .from(s3InboundStreamingMessageSource(),
                    e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
            .transform(Transformers.fromStream("UTF-8"))
            .get();
}

下一个.handle()将准备好String作为有效载荷。

于 2019-01-04T19:44:22.897 回答