3

抱歉,如果问题已解决,但我试图找到它,但我没有成功。有一些类似的,但我没有在我见过的地方找到帮助。我有下一个问题:

603  [main] WARN  b.s.StormSubmitter - Topology submission exception: 
    Component: [escribirFichero] subscribes from non-existent stream: 
               [default] of component [buscamosEnKlout]
Exception in thread "main" java.lang.RuntimeException: 
    InvalidTopologyException(msg:Component: 
               [escribirFichero] subscribes from non-existent stream: 
                   [default] of component [buscamosEnKlout])

我不明白为什么我有这个例外。我在使用“escribirFichero”之前声明了螺栓“buscamosEnKlout”。在我的拓扑结构旁边,我将放置螺栓的基本线。我知道喷口没问题,因为这是一种反复试验的方法。

我的拓扑代码是:

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.stats.RollingWindow;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import bolt.*;
import spout.TwitterSpout;
import twitter4j.FilterQuery;

public class TwitterTopologia {
    private static String consumerKey = "xxx1";
    private static String consumerSecret = "xxx2";
    private static String accessToken = "yyy1";
    private static String accessTokenSecret="yyy2";

    public static void main(String[] args) throws Exception {
        /**************** SETUP ****************/
        String remoteClusterTopologyName = null;
        if (args!=null) { ... } 

        TopologyBuilder builder = new TopologyBuilder();
        FilterQuery tweetFilterQuery = new FilterQuery();
        tweetFilterQuery.track(new String[]{"Vacaciones","Holy Week", "Semana Santa","Holidays","Vacation"});
        tweetFilterQuery.language(new String[]{"en","es"});


        TwitterSpout spout = new TwitterSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, tweetFilterQuery);

        KloutBuscador buscamosEnKlout = new KloutBuscador();
        FileWriterBolt fileWriterBolt = new FileWriterBolt("idUsuarios.txt");

        builder.setSpout("spoutLeerTwitter",spout,1);
        builder.setBolt("buscamosEnKlout",buscamosEnKlout,1).shuffleGrouping("spoutLeerTwitter");
        builder.setBolt("escribirFichero",fileWriterBolt,1).shuffleGrouping("buscamosEnKlout");


        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("twitter-fun", conf, builder.createTopology());
            Thread.sleep(460000);
            cluster.shutdown();
        }
    }
}

螺栓“KloutBuscador”,别名“buscamosEnKlout”,是下一个代码:

String text = tuple.getStringByField("id");

String cadenaUrl;

cadenaUrl = "http://api.klout.com/v2/identity.json/twitter?screenName=";
cadenaUrl += text.replaceAll("\\[", "").replaceAll("\\]","");
cadenaUrl += "&key=" + kloutKey;
URL url = new URL(cadenaUrl);
HttpURLConnection c = (HttpURLConnection) url.openConnection();
        ...........c.setRequestMethod("GET");c.setRequestProperty("Content-length", "0");c.setUseCaches(false);c.setAllowUserInteraction(false);c.connect();
int status = c.getResponseCode();
StringBuilder sb = new StringBuilder();
switch (status) {
    case 200:
    case 201:
       BufferedReader br = new BufferedReader(new InputStreamReader(c.getInputStream()));
       String line;
       while ((line = br.readLine()) != null) sb.append(line + "\n");
           br.close();
       }

JSONObject jsonResponse = new JSONObject(sb.toString());
//getJSONArray("id");
String results = jsonResponse.toString();
_collector.emit(new Values(text,results));

第二个螺栓,fileWriterBolt,别名“escribirFichero”,是下一个:

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    _collector = outputCollector;
    try {
        writer = new PrintWriter(filename, "UTF-8");...}...}

    public void execute(Tuple tuple) {
        writer.println((count++)+":::"+tuple.getValues());

 //+"+++"+tweet.getUser().getId()+"__FINAL__"+tweet.getUser().getName()
        writer.flush();
        // Confirm that this tuple has been treated.
        //_collector.ack(tuple);

    }

如果我越过 Klous 的螺栓,只写出 spout 的结果,它就可以工作。我不明白为什么 Klous 的螺栓会导致此故障

4

1 回答 1

3

您的 buscamosEnKlout bolt 需要声明它将发出的元组的格式,以及它将发出的流。您很可能没有在该螺栓中正确实现 declareOutputFields 。它应该包含类似declarer.declare(new Fields("your-text-field", "your-results-field"))

于 2018-03-25T09:44:06.387 回答