我实际上想要一个刷新,或者一个completionSize,但是对于聚合器中的所有聚合。就像一个全局的completionSize。
基本上,我想确保批处理中的每条消息都被聚合,然后在读取最后一条消息时立即完成该聚合器中的所有聚合。
e.g. 1000 messages arrive (the length is not known beforehand)
aggregate on correlation id into bins
A 300
B 400
C 300 (size of the bins is not known before hand)
I want the aggregator not to complete until the 1000th exchange is aggregated
thereupon I want all of the aggregations in the aggregator to complete at once
CompleteSize 适用于每个聚合,而不是聚合器作为一个整体。因此,如果我设置 CompleteSize( 1000 ) 它永远不会完成,因为每个聚合必须超过 1000 才能“完成”
我可以通过构建单个 Map 对象来解决它,但这有点回避 aggregator2 中的相关性,我更喜欢理想地使用它
所以是的,无论是全局完整大小还是刷新,有没有办法智能地做到这一点?