0

嗨,我们正在尝试通过利用 apache camel 或 spring 集成来流式处理金融市场数据以计算交易信号。我们的一个用例是根据价格时间戳将连续价格聚合在一起,如下所示:

  • 输入

输入消息以时间序列中的 (timestamp,price) 对形式出现。假设传入的值是,每对 (TX,PX) 是一条消息,而 T 代表时间戳,P 代表价格值

(T0,P1),(T1,P1),(T2,P2),(T3,P3),(T4,P4)... 
  • 聚合

假设我们需要将每 3 条连续消息聚合在一起以进行进一步计算,给定输入消息我们需要生成以下组,每 3 对组是一个聚合消息:

[(T0,P1),(T1,P1),(T2,P2)],
[(T1,P1),(T2,P2),(T3,P3)],
[(T2,P2),(T3,P3),(T4,P4)],
....

如您所见,大多数消息将聚合到多个组中。有人可以建议是否有一种方法可以通过使用当前聚合器而不编写聚合器来做到这一点。

似乎spring集成聚合分组也是基于相关键的,所以消息需要映射到一组相关键。但是,目前的 api 似乎只允许我们生成一个关联键,这意味着每条消息只能聚合到一组。是否有任何解决方法。

附言

在阅读了骆驼的源代码后,骆驼似乎无法支持我们的要求。试试我的春天运气。手指交叉 的骆驼问题

4

1 回答 1

1

我们没有任何开箱即用的东西,但我可以通过对SimpleMessageStore. 我已将全文发布RollingMessageStore 在 gist中。

底线是修改removeGroup为仅删除第一条消息,而不是整个组。另外,做completeGroup一个无操作。

设置expreGroupOnCompletion为强制聚合器“删除”组(通过调用修改后的removeGroup()方法。

这是SimpleMessageGroupRollingMessageGroup...之间的区别

182,184c190,194
< 
<               groupUpperBound.release(groupIdToMessageGroup.get(groupId).size());
<               groupIdToMessageGroup.remove(groupId);
---
>               Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
>               if (message != null) {
>                   this.groupUpperBound.release(1);
>                   this.removeMessageFromGroup(groupId, message);
>               }

(加上删除所有的代码completeGroup()

和一个测试用例...

@Test
public void testRolling() {
    AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
    aggregator.setExpireGroupsUponCompletion(true);
    aggregator.setReleaseStrategy(new ReleaseStrategy() {

        @Override
        public boolean canRelease(MessageGroup group) {
            return group.size() == 3;
        }
    });
    QueueChannel replyChannel = new QueueChannel();
    Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
    Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
    Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
    Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
    Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);

    aggregator.handleMessage(message1);
    aggregator.handleMessage(message2);
    aggregator.handleMessage(message3);
    aggregator.handleMessage(message4);
    aggregator.handleMessage(message5);

    Message<?> reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 105);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 315);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 693);
}

请继续打开JIRA 新功能问题,我们将考虑将此(或更通用的解决方案)添加到即将发布的 3.0 版本中。

使用correlation-strategy-expression="'foo'"

release-strategy-expression=size()==3.

于 2013-11-14T15:34:46.243 回答