0

我是 WebFlux 和 MongoDB 的新手。我正在尝试在带有可尾光标的上限集合中使用聚合,但我没有成功。我想执行这个 mongoDB 查询:

db.structures.aggregate(
   [
     {
        $match: {
            id: { $in: [8244, 8052]}
        }    
     },  
     { $sort: { id: 1, lastUpdate: 1} },
     {
       $group:
         {
           _id: {id: "$id"},
           lastUpdate: { $last: "$lastUpdate" }
         }
     }
   ]
)

ReactiveMongoOperations 让我可以选择“tail”或“aggregation”。

我能够执行聚合:

    MatchOperation match = new MatchOperation(Criteria.where("id").in(8244, 8052));
    GroupOperation group = Aggregation.group("id", "$id").last("$lastUpdate").as("lastUpdate");
    Aggregation aggregate = Aggregation.newAggregation(match, group);

    Flux<Structure> result = mongoOperation.aggregate(aggregate,
            "structures", Structure.class);

或尾光标

    Query query = new Query();
    query.addCriteria(Criteria.where("id").in(8244, 8052));
    Flux<Structure> result = mongoOperation.tail(query, Structure.class);

可能吗?尾部和聚合在一起?

使用聚合是我发现只为每个 id 获取最后插入的文档的方式。

没有聚合我得到:

无聚合查询

聚合:

聚合查询

提前谢谢

4

1 回答 1

0

tailable 游标查询创建一个Flux永不完成(从不发出onComplete事件)并Flux在将记录插入数据库时​​发出记录。由于这个事实,我认为数据库引擎不允许使用可尾游标进行聚合。

所以聚合在某种程度上没有意义,因为在每条新插入的记录上都需要重新计算聚合。从技术上讲,您可以进行运行聚合,对于每个返回的记录,您计算所需的聚合记录并将其发送到下游。

一种可能的解决方案是以编程方式对返回的 "infinite" 进行聚合Flux

mongoOperation.tail(query, Structure.class)
  .groupBy(Structure::id) // create independent Fluxes based on id
  .flatMap(groupedFlux -> 
    groupedFlux.scan((result, nextStructure) -> { // scan is like reduce but emits intermediate results
    log.info("intermediate result is: {}", result);
    if (result.getLastUpdate() > nextStructure.getLastUpdate()) {
        return result;
    } else {
        result.setLastUpdate(nextStructure.getLastUpdate());
        return result;
    }
}));

另一方面,您可能应该重新审视您的用例以及您需要在这里完成的工作,看看是否应该使用上限集合以外的东西,或者聚合部分可能是多余的(即,如果新插入的记录总是具有lastUpdate比前一个更大的属性记录)。

于 2020-04-28T11:59:24.217 回答