我可以使用标准的 spout、bolt 组合来进行流式聚合,并且在快乐的情况下工作得很好,当使用刻度元组以某个时间间隔保存数据以使用批处理时。现在我自己在做一些故障管理(跟踪未保存的元组等)。(即不是来自storm的ootb)
但我读过三叉戟给你更高的抽象和更好的故障管理。我不明白的是三叉戟中是否有刻度元组支持。基本上我想在当前分钟左右在内存中批处理,并使用 trident 保留前几分钟的任何聚合数据。
此处的任何指示或设计建议都会有所帮助。
谢谢
我可以使用标准的 spout、bolt 组合来进行流式聚合,并且在快乐的情况下工作得很好,当使用刻度元组以某个时间间隔保存数据以使用批处理时。现在我自己在做一些故障管理(跟踪未保存的元组等)。(即不是来自storm的ootb)
但我读过三叉戟给你更高的抽象和更好的故障管理。我不明白的是三叉戟中是否有刻度元组支持。基本上我想在当前分钟左右在内存中批处理,并使用 trident 保留前几分钟的任何聚合数据。
此处的任何指示或设计建议都会有所帮助。
谢谢
实际上,微批处理是 Trident 的内置功能。您不需要任何刻度元组。当你的代码中有这样的东西时:
topology
.newStream("myStream", spout)
.partitionPersist(
ElasticSearchEventState.getFactoryFor(connectionProvider),
new Fields("field1", "field2"),
new ElasticSearchEventUpdater()
)
(我在这里使用我的自定义 ElasticSearch 状态/更新器,您可能会使用其他东西)
所以当你有这样的东西时,在引擎盖下 Trident 将你的流分组并执行 partitionPersist 操作,而不是对单个元组,而是对这些批次。
如果您出于任何原因仍然需要刻度元组,只需创建刻度喷口,这样的东西对我有用:
public class TickSpout implements IBatchSpout {
public static final String TIMESTAMP_FIELD = "timestamp";
private final long delay;
public TickSpout(long delay) {
this.delay = delay;
}
@Override
public void open(Map conf, TopologyContext context) {
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
Utils.sleep(delay);
collector.emit(new Values(System.currentTimeMillis()));
}
@Override
public void ack(long batchId) {
}
@Override
public void close() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(TIMESTAMP_FIELD);
}
}