1

我是 java 和 Trident 的新手,我导入了获取推文的项目,但我想得到一些东西,当我从tuple.getValue(0); 只意味着第一条推文的代码中获得时,这段代码如何获得不止一条推文?!

我的问题是在 hashset 或 hashmap 中获取所有推文以获取每条推文中独特词的总数

public void execute(TridentTuple tuple, TridentCollector collector) {

此方法用于在推文上执行方程式

public Values getValues(Tweet tweet, String[] words){
 }

这段代码得到了第一条推文,然后得到了它的主体,将它转换为字符串数组,我知道我需要解决什么但写得不好

我的想法:像循环一样

for (int i=0;i<10;i++)
{
 Tweet tweet = (Tweet) tuple.getValue(i);   
}
4

2 回答 2

0
  1. 对于每条推文:
    • 对于推文中的每个单词:
      • 尝试将每个单词添加到集合中。
        如果该词已存在于集合中,则将其从集合中移除。
    • 计算包含该推文单词的集合大小。
于 2016-02-17T00:26:39.447 回答
-1

“问题”是“获取所有推文中不同单词的计数”和作为流处理器的 Strom 之间的不匹配。您要回答的查询只能在有限的推文集上计算。但是,在流处理中,您会处理潜在的无限输入数据流。

如果您有一组有限的推文,您可能想要使用批处理框架,例如 Flink、Spark 或 MapReduce。如果你确实有无数条推文,你必须重新表述你的问题......

正如您已经提到的,您实际上想要“遍历所有推文”。正如你这样流处理,没有这样的概念。您有无限数量的输入元组,Storm 应用于execute()每个元组(即,您可以将其视为 Storm 自动“循环输入”——即使“循环”不是正确的术语)。由于您的计算是“所有推文”,您需要在 Bolt 代码中维护一个状态,以便您可以为每个推文更新此状态。Storm 中状态的简单形式将是 Bolt 类中的成员变量。

public class MyBolt implements ??? {
    // this is your "state" variable
    private final Set<String> allWords = new HashSet<String>();

    public void execute(TridentTuple tuple, TridentCollector collector) {
        Tweet tweet = (Tweet)tuple.getValue(0);        
        String tweetBody = tweet.getBody();
        String words[] = tweetBody.toLowerCase().split(regex);
        for(String w : words) {
            // as allWords is a set, you cannot add the same word twice
            // the second "add" call on the same word will just be ignored
           // thus, allWords will contain each word exactly once
            this.allWords.add(w);
        }
    }
}

现在,这段代码没有发出任何东西,因为不清楚你真正想要发出什么?就像在流处理中一样,没有尽头,你不能说“发出最终的单词数,包含在allWords”中。您可以做什么,它会在每次更新后发出当前计数...为此,collector.emit(new Values(this.allWords.size()));请在execute().

此外,我想补充一点,所提出的解决方案只有在没有应用并行性的情况下才能正常工作MyBolt——否则,实例上的不同集合可能包含相同的单词。为了解决这个问题,需要在无状态的 Bolt 中将每条推文标记为其单词,并将这些单词流放入MyBolt使用内部Set作为状态的收养中。的输入数据MyBolt还必须通过接收数据,fieldsGrouping以确保每个实例上的单词集不同。

于 2016-02-17T12:10:28.757 回答