4

我可以使用标准的 spout、bolt 组合来进行流式聚合,并且在快乐的情况下工作得很好,当使用刻度元组以某个时间间隔保存数据以使用批处理时。现在我自己在做一些故障管理(跟踪未保存的元组等)。(即不是来自storm的ootb)

但我读过三叉戟给你更高的抽象和更好的故障管理。我不明白的是三叉戟中是否有刻度元组支持。基本上我想在当前分钟左右在内存中批处理,并使用 trident 保留前几分钟的任何聚合数据。

此处的任何指示或设计建议都会有所帮助。

谢谢

4

1 回答 1

0

实际上,微批处理是 Trident 的内置功能。您不需要任何刻度元组。当你的代码中有这样的东西时:

topology
    .newStream("myStream", spout)
    .partitionPersist(
        ElasticSearchEventState.getFactoryFor(connectionProvider),
        new Fields("field1", "field2"),
        new ElasticSearchEventUpdater()
    )

(我在这里使用我的自定义 ElasticSearch 状态/更新器,您可能会使用其他东西)

所以当你有这样的东西时,在引擎盖下 Trident 将你的流分组并执行 partitionPersist 操作,而不是对单个元组,而是对这些批次。

如果您出于任何原因仍然需要刻度元组,只需创建刻度喷口,这样的东西对我有用:

public class TickSpout implements IBatchSpout {

    public static final String TIMESTAMP_FIELD = "timestamp";
    private final long delay;

    public TickSpout(long delay) {
        this.delay = delay;
    }

    @Override
    public void open(Map conf, TopologyContext context) {
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        Utils.sleep(delay);
        collector.emit(new Values(System.currentTimeMillis()));
    }

    @Override
    public void ack(long batchId) {
    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(TIMESTAMP_FIELD);
    }
}
于 2015-12-15T22:32:23.050 回答