0

我需要根据消息内容从 S3 下载文件。换句话说,要下载的文件以前是未知的,我必须在运行时搜索并找到它。S3StreamingMessageSource似乎不太合适,因为:

  1. 它依赖于轮询,因为我需要等待消息。
  2. 我找不到S3StreamingMessageSource在流程中间动态创建的任何方法。gateway(IntegrationFlow)看起来很有趣,但我需要的是一个gateway(Function<Message<?>, IntegrationFlow>)不存在的。

另一个候选者是S3MessageHandler但它不支持列出查找所需文件所需的文件。

我可以直接使用 AWS API 实现我自己的消息处理程序,只是想知道我是否遗漏了什么,因为这似乎不是一个不寻常的要求。毕竟,并非每个应用程序都只是坐在那里并不断轮询 S3 以获取新文件。

4

2 回答 2

1

有可以在 中使用S3RemoteFileTemplatelist()功能handle()。然后split()结果并调用S3MessageHandler每个远程文件下载。

尽管最后一个具有下载整个远程目录的功能。

于 2017-12-29T03:13:09.520 回答
0

对于遇到这个问题的任何人,这就是我所做的。诀窍是:

  1. 稍后设置过滤器,而不是在构建时。注意没有addFiltersorgetFilters方法,所以过滤器只能设置一次,以后不能再添加。@artem-bilan,这很不方便。
  2. S3StreamingMessageSource.receive手动调用。

    .handle(String.class, (fileName, h) -> {
    if (messageSource instanceof S3StreamingMessageSource) {
        S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;
    
        ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilters(
                new S3SimplePatternFileListFilter("**/*/*.json.gz"),
                new S3PersistentAcceptOnceFileListFilter(metadataStore, ""),
                new S3FileListFilter(fileName)
        );
        s3StreamingMessageSource.setFilter(chainFileListFilter);
    
        return s3StreamingMessageSource.receive();
    }
    log.warn("Expected: {} but got: {}.",
            S3StreamingMessageSource.class.getName(), messageSource.getClass().getName());
    return messageSource.receive();
    }, spec -> spec
        .requiresReply(false) // in case all messages got filtered out
    )
    
于 2017-12-31T08:20:27.303 回答