11

我正在尝试编写一个执行以下操作的拓扑:

  1. 订阅 twitter 提要的 spout(基于关键字)
  2. 一个聚合螺栓,它聚合一个集合中的许多推文(比如 N)并将它们发送到打印机螺栓
  3. 一个简单的螺栓,可以立即将集合打印到控制台。

实际上,我想对集合进行更多处理。

我在本地对其进行了测试,看起来它正在工作。但是,我不确定我是否正确设置了螺栓上的分组,以及在实际风暴集群上部署时是否可以正常工作。如果有人可以帮助查看此拓扑并提出任何错误、更改或改进建议,我将不胜感激。

谢谢。

这就是我的拓扑结构。

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
   builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
                .shuffleGrouping("spout");
   builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");

聚合螺栓

public class SampleAggregatorBolt implements IRichBolt {

    protected OutputCollector collector;
    protected Tuple currentTuple;
    protected Logger log;
    /**
     * Holds the messages in the bolt till you are ready to send them out
     */
    protected List<Status> statusCache;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;

        log = Logger.getLogger(getClass().getName());
        statusCache = new ArrayList<Status>();
    }

    @Override
    public void execute(Tuple tuple) {
        currentTuple = tuple;

        Status currentStatus = null;
        try {
            currentStatus = (Status) tuple.getValue(0);
        } catch (ClassCastException e) {
        }
        if (currentStatus != null) {

            //add it to the status cache
            statusCache.add(currentStatus);
            collector.ack(tuple);


            //check the size of the status cache and pass it to the next stage if you have enough messages to emit
            if (statusCache.size() > 10) {
                collector.emit(new Values(statusCache));
            }

        }
    }

    @Override
    public void cleanup() {


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("tweets"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }


    protected void setupNonSerializableAttributes() {

    }

}

打印机螺栓

public class PrinterBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        System.out.println(tuple.size() + " "  + tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

}
4

2 回答 2

4

从我能看到它看起来不错。不过,魔鬼在细节中。我不确定您的聚合器螺栓做了什么,但如果它对传递给它的值做出任何假设,那么您应该考虑适当的字段分组。当您使用默认的并行提示 1 时,这可能不会有那么大的区别,但是如果您决定使用多个聚合螺栓实例进行扩展,您所做的隐含逻辑假设可能需要非随机分组。

于 2013-06-04T19:07:36.953 回答
0

嗨,一旦您尝试订阅多个关键字,您就会遇到问题。我建议您的 spout 也发出用于过滤的原始关键字。

然后我不会做 shuffleGrouping 我会做一个 fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
            .shuffleGrouping("spout", new Fields("keyword"));

这样,您可以确保单个关键字的结果每次都出现在同一个螺栓上。这样您就可以正确计算聚合。如果省略 fieldsGrouping Storm 可以实例化任意数量的聚合 Bolt 并将任何消息从 spout 发送到聚合螺栓的任何实例,这最终会导致错误的结果。

于 2014-10-11T16:45:40.100 回答