0

我试图实现:

  1. 从 s3 目录中读取所有文件。
  2. 将所有文件复制到 s3 上的备份目录。
  3. 将所有文件内容聚合到一个文件中,并将其复制到 s3 上的另一个目录。

但我坚持在一次民意调查中阅读所有文件的第一点。

my from router : aws-s3://${camel.bucket.name}?amazonS3Client=#s3Client&prefix=some_path_on_s3&deleteAfterRead=true&delay=100s

for example if, some_path_on_s3 -> has 2 files say first.txt and 
second.txt

according to camel documentation, it has to read both the files in a 
single poll, but is reading 1 file per poll.

I also tried with parameter,  maxMessagesPerPoll=2 but no luck. It 
still reads one file per poll.

有没有办法在一次轮询中从 s3 目录中获取所有文件?

4

2 回答 2

0
  • 我在这里工作,

    from("file://<some_path_to_dir>")
    .routeId("some_route_id")
    .to("backup_dir")
    .to("direct:aggregate")
    .end();
    
    
    
    from("direct:aggregate")
    .routeId("aggregate_router")
    .aggregate(constant(true), new GroupedExchangeAggregationStrategy())
            .completionPredicate(exchange -> {
                List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
                Exchange latestExchange = list.get(list.size() - 1);
                return (boolean) latestExchange.getProperty(Exchange.BATCH_COMPLETE);
            })
    .to("direct:merge");
    
    
    from("direct:merge")
            .routeId("merge_router")
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
                    StringBuilder builder = new StringBuilder();
                    for(Exchange ex : list){
                        builder.append(ex.getIn().getBody(String.class));
                    }
    
                    exchange.getIn().setBody(builder.toString());
                    // set any other necessary header if required here
                    // example, if aws s3 is the endpoint, set the S3Constants.KEY header here
                }
            })
    .to("some_final_endpoint");
    
于 2019-07-12T00:27:58.267 回答
0

事实是它一次向路由发送一个文件,但每次轮询它都会确认整个批次。

maxMessagesPerPoll 仅对每批读取的文件数创建限制。我认为您正在寻找的信息在每次交易所的骆驼批次标题上:

CamelBatchComplete:一个布尔值,指示批次中的最后一个交换。仅适用于最后一个条目。

CamelBatchIndex:批次的当前索引。从 0 开始。

CamelBatchSize:该批次中轮询的交易所总数。

使用此信息,您可以多播消息,然后实现一个聚合器以加入一条路线上的文件,一旦 CamelBatchComplete=true,然后将文件备份到另一条路线上。

在此处查找更多信息:

批量消费

组播

于 2019-07-10T12:14:26.117 回答