我们没有任何开箱即用的东西,但我可以通过对SimpleMessageStore
. 我已将全文发布RollingMessageStore
在 gist中。
底线是修改removeGroup
为仅删除第一条消息,而不是整个组。另外,做completeGroup
一个无操作。
设置expreGroupOnCompletion
为强制聚合器“删除”组(通过调用修改后的removeGroup()
方法。
这是SimpleMessageGroup
和RollingMessageGroup
...之间的区别
182,184c190,194
<
< groupUpperBound.release(groupIdToMessageGroup.get(groupId).size());
< groupIdToMessageGroup.remove(groupId);
---
> Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
> if (message != null) {
> this.groupUpperBound.release(1);
> this.removeMessageFromGroup(groupId, message);
> }
(加上删除所有的代码completeGroup()
。
和一个测试用例...
@Test
public void testRolling() {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setReleaseStrategy(new ReleaseStrategy() {
@Override
public boolean canRelease(MessageGroup group) {
return group.size() == 3;
}
});
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);
aggregator.handleMessage(message1);
aggregator.handleMessage(message2);
aggregator.handleMessage(message3);
aggregator.handleMessage(message4);
aggregator.handleMessage(message5);
Message<?> reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 105);
reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 315);
reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 693);
}
请继续打开JIRA 新功能问题,我们将考虑将此(或更通用的解决方案)添加到即将发布的 3.0 版本中。
使用correlation-strategy-expression="'foo'"
和
release-strategy-expression=size()==3
.