“问题”是“获取所有推文中不同单词的计数”和作为流处理器的 Strom 之间的不匹配。您要回答的查询只能在有限的推文集上计算。但是,在流处理中,您会处理潜在的无限输入数据流。
如果您有一组有限的推文,您可能想要使用批处理框架,例如 Flink、Spark 或 MapReduce。如果你确实有无数条推文,你必须重新表述你的问题......
正如您已经提到的,您实际上想要“遍历所有推文”。正如你这样流处理,没有这样的概念。您有无限数量的输入元组,Storm 应用于execute()
每个元组(即,您可以将其视为 Storm 自动“循环输入”——即使“循环”不是正确的术语)。由于您的计算是“所有推文”,您需要在 Bolt 代码中维护一个状态,以便您可以为每个推文更新此状态。Storm 中状态的简单形式将是 Bolt 类中的成员变量。
public class MyBolt implements ??? {
// this is your "state" variable
private final Set<String> allWords = new HashSet<String>();
public void execute(TridentTuple tuple, TridentCollector collector) {
Tweet tweet = (Tweet)tuple.getValue(0);
String tweetBody = tweet.getBody();
String words[] = tweetBody.toLowerCase().split(regex);
for(String w : words) {
// as allWords is a set, you cannot add the same word twice
// the second "add" call on the same word will just be ignored
// thus, allWords will contain each word exactly once
this.allWords.add(w);
}
}
}
现在,这段代码没有发出任何东西,因为不清楚你真正想要发出什么?就像在流处理中一样,没有尽头,你不能说“发出最终的单词数,包含在allWords
”中。您可以做什么,它会在每次更新后发出当前计数...为此,collector.emit(new Values(this.allWords.size()));
请在execute()
.
此外,我想补充一点,所提出的解决方案只有在没有应用并行性的情况下才能正常工作MyBolt
——否则,实例上的不同集合可能包含相同的单词。为了解决这个问题,需要在无状态的 Bolt 中将每条推文标记为其单词,并将这些单词流放入MyBolt
使用内部Set
作为状态的收养中。的输入数据MyBolt
还必须通过接收数据,fieldsGrouping
以确保每个实例上的单词集不同。