0

我已经阅读了有关 Apache 风暴的信息并做了一些基本教程。我有以下拓扑结构,我想用storm来实现,但不确定如何处理数据分布。业务需求是实时评估客户组合。简而言之,它包括: 1) 接受实时市场价格(货币、商品等) 2) 对于每个价格变动,计算每个头寸的当前利润并将其转换为客户账户货币 3) 分析总盈亏和每个客户的所有头寸的数量,并在需要时生成信号 4) 在客户级别的计算必须是顺序的和原子的/序列化的。即,所有仓位必须按照其进入系统的顺序的每个分时进行评估,并且即使客户有 100 个仓位,也必须根据相同的价格计算总数。

所有订单都执行并存储在 rdbms 中。我的主要问题是如何在每个节点处理它自己的部分的不同节点上的 Storm bolts 上分配成千上万个位置。使用 Modulo 足以对客户进行分区,但是我如何为每个 bolt 实例提供 id,以便每个实例只处理它自己的相同部分客户?Storm 中是否有开箱即用的功能可以做到这一点?另一个问题是如何有效地进行上述聚合?

4

1 回答 1

0

你可以使用fieldsGrouping它。您可以声明一个用于对元组进行分组的字段(在您的情况下为id)。

我只是假设您的输入流是带有 id 和 body 字段的 JSON 对象

{"id":"1234","body":"some body"}

还假设您的拓扑有一个喷口,两个螺栓,即 BoltA 和 BoltB。

在 BoltB 中,重写 declareOutputFields 方法并填写详细信息。

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("id","log"));
}

你可以像下面这样声明拓扑

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("boltA", new BoltA(), 1)
       .shuffleGrouping("spout");
builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id"));

在这种情况下,具有相同 id from 的元组boltA将被传递到相同的实例boltB

于 2015-05-14T01:19:46.993 回答