我正在研究一个用例,我应该轮询 S3 -> 读取内容流 -> 进行一些处理并将其上传到另一个存储桶,而不是在我的服务器中写入文件。
我知道我可以在 Spring aws 集成中使用 S3StreamingMessageSource 来实现它,但我面临的问题是我不知道如何处理通过轮询接收到的消息流
public class S3PollerConfigurationUsingStreaming {
@Value("${amazonProperties.bucketName}")
private String bucketName;
@Value("${amazonProperties.newBucket}")
private String newBucket;
@Autowired
private AmazonClientService amazonClient;
@Bean
@InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(bucketName);
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
@Transformer(inputChannel = "s3Channel", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer();
}
@Bean
public S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new S3SessionFactory(amazonClient.getS3Client()));
}
@Bean
public PollableChannel s3Channel() {
return new QueueChannel();
}
@Bean
IntegrationFlow fileStreamingFlow() {
return IntegrationFlows
.from(s3InboundStreamingMessageSource(),
e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
.handle(streamFile())
.get();
}
}
有人可以帮我处理流的代码吗?