我正在尝试按照此处的示例(在流式入站通道适配器下)使用 S3StreamMessageSource 轮询 S3 对象存储:https ://github.com/spring-projects/spring-integration-aws/tree/v1.1.0.RELEASE
@Bean
@InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "15000", maxMessagesPerPoll = "1"))
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(s3RemoteFileTemplate());
messageSource.setRemoteDirectory(s3Bucket+"/INBOX/");
messageSource.setFilter(new S3SimplePatternFileListFilter("test*"));
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3Stream"));
return messageSource;
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "s3Channel", outputChannel = "nullChannel")
public Transformer transformer() {
return new StreamTransformer();
}
@Bean
public S3RemoteFileTemplate s3RemoteFileTemplate() {
return new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
}
但是每次我的应用程序启动时,都会抛出一个 NPE:
2018-01-24 23:25:42.243 ERROR 29808 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: nested exception is java.lang.NullPointerException
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:396)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:53)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:373)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.springframework.integration.json.SimpleJsonSerializer.toElement(SimpleJsonSerializer.java:92)
at org.springframework.integration.json.SimpleJsonSerializer.toJson(SimpleJsonSerializer.java:74)
at org.springframework.integration.file.remote.AbstractFileInfo.toJson(AbstractFileInfo.java:60)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:164)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:141)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:230)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:250)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:379)
... 13 more
当我逐步检查抛出异常的位置时,它位于S3FileInfo.getPermissions中的代码部分 ,我看到UnsupportedOperationException “使用 [AmazonS3.getObjectAcl()] 获取权限。”
我需要做什么来解决这个问题?我正在使用 Spring Boot 1.5.9.RELEASE、spring-integration-aws 1.1.0.RELEASE、spring-integration-core 5.0.0.RELEASE