7

我在玩 Storm,我想知道 Storm 在哪里指定(如果可能的话)聚合时的(翻滚/滑动)窗口大小。例如,如果我们想在 Twitter 上查找前一小时的热门话题。我们如何指定一个螺栓应该每小时返回一次结果?这是在每个螺栓内以编程方式完成的吗?还是以某种方式指定“窗口”?

4

2 回答 2

17

免责声明:我写了gakhov 在上面的回答中引用的 Storming Topics 文章。

我想说最好的做法是在 Storm 0.8+中使用所谓的刻度元组。有了这些,您可以配置自己的 spouts/bolts 以在特定时间间隔(例如,每十秒或每分钟)收到通知。

这是一个简单的示例,它将相关组件配置为每十秒接收一次刻度元组:

// in your spout/bolt
@Override
public Map<String, Object> getComponentConfiguration() {
    Config conf = new Config();
    int tickFrequencyInSeconds = 10;
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
    return conf;
}

然后,您可以在 spout/bolt 的execute()方法中使用条件开关来区分“正常”传入元组和特殊刻度元组。例如:

// in your spout/bolt
@Override
public void execute(Tuple tuple) {
    if (isTickTuple(tuple)) {
        // now you can trigger e.g. a periodic activity
    }
    else {
        // do something with the normal tuple
    }
}

private static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

同样,正如 gakhov 指出的那样,我几天前在 Storm 中写了一篇非常详细的博客文章(无耻的插件!)。

于 2013-02-06T18:38:28.930 回答
1

Add a new spout with parallelism degree of 1, and have it emit an empty signal and then Utils.sleep until next time (all done in nextTuple). Then, link all relevant bolts to that spout using all-grouping, so all of their instances will receive that same signal.

于 2012-11-02T23:43:03.897 回答