5

我正在尝试向我的本地风暴集群提交一个简单的字数统计拓扑。首先,我尝试使用 Maven,然后使用 Storm 命令行客户端。我使用 Eclipse 创建了 JAR 文件。但是,它抛出主类未找到异常。谁能告诉我可能是什么问题?我在下面附上代码和异常。

package com.test.newpackage;

import com.test.newpackage.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.test.newpackage.WordCounter;
import com.test.newpackage.WordNormalizer;

public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
        // Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader", new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
                .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(), 2).fieldsGrouping(
                "word-normalizer", new Fields("word"));
        // Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(false);
        // Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf,
                builder.createTopology());
        Thread.sleep(1000);
        cluster.shutdown();
    }
}

package com.test.newpackage;

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    private OutputCollector collector;

    /**
     * At the end of the spout (when the cluster is shutdown We will show the
     * word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter [" + name + "-" + id + "] --");
        for (Map.Entry<String, Integer> entry : counters.entrySet()) {
            System.out.println(entry.getKey() + ": " + entry.getValue());
        }
    }

    /**
     * On each word We will count
     */
    @Override
    public void execute(Tuple input) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create this, if not We
         * will add 1
         */
        if (!counters.containsKey(str)) {
            counters.put(str, 1);
        } else {
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
        // Set the tuple as Acknowledge
        collector.ack(input);
    }

    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

package com.test.newpackage;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer implements IRichBolt {
    private OutputCollector collector;

    public void cleanup() {
    }

    /**
     * The bolt will receive the line from the words file and process it to
     * Normalize this line
     * 
     * The normalize will be put the words in lower case and split the line to
     * get all words in this
     */
    public void execute(Tuple input) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for (String word : words) {
            word = word.trim();
            if (!word.isEmpty()) {
                word = word.toLowerCase();
                // Emit the word
                List a = new ArrayList();
                a.add(input);
                collector.emit(a, new Values(word));
            }
        }
        // Acknowledge the tuple
        collector.ack(input);
    }

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

package com.test.newpackage;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader implements IRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    private TopologyContext context;

    public boolean isDistributed() {
        return false;
    }

    public void ack(Object msgId) {
        System.out.println("OK:" + msgId);
    }

    public void close() {
    }

    public void fail(Object msgId) {
        System.out.println("FAIL:" + msgId);
    }

    /**
     * The only thing that the methods will do It is emit each file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Do nothing
            }
            return;
        }
        String str;
        // Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            // Read all lines
            while ((str = reader.readLine()) != null) {
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str), str);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading tuple", e);
        } finally {
            completed = true;
        }
    }

    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.context = context;
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file["
                    + conf.get("wordFile") + "]");
        }
        this.collector = collector;
    }

    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }

    @Override
    public void activate() {
        // TODO Auto-generated method stub

    }

    @Override
    public void deactivate() {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

    mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="resources/
    words.txt"

    storm jar TopologyMain.jar TopologyMain wordcount

     Exception in thread "main" java.lang.NoClassDefFoundError: TopologyMain
        Caused by: java.lang.ClassNotFoundException: TopologyMain
            at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
            at java.security.AccessController.doPrivileged(Native Method)
            at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
        Could not find the main class: TopologyMain. Program will exit.

编辑例外:

 backtype.storm.daemon.nimbus  - Activating word-count: word-count-1-1374756437
1724 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn  - Thread Thread[main,5,main] died
java.lang.NullPointerException
    at clojure.lang.Numbers.ops(Numbers.java:942)
    at clojure.lang.Numbers.isPos(Numbers.java:94)
    at clojure.core$take$fn__4112.invoke(core.clj:2500)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$concat$fn__3804.invoke(core.clj:662)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$map$fn__4091.invoke(core.clj:2437)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245)
    at backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408)
    at backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420)
    at backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315)
    at backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:625)
    at clojure.lang.RestFn.invoke(RestFn.java:410)
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto__$reify__3603.submitTopology(nimbus.clj:898)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
    at backtype.storm.testing$submit_local_topology.invoke(testing.clj:227)
    at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:19)
    at backtype.storm.LocalCluster.submitTopology(Unknown Source)
    at storm.starter.WordCountTopology.main(WordCountTopology.java:83)
11769 [Thread-2] ERROR backtype.storm.daemon.nimbus  - Error when processing event
java.lang.NullPointerException
    at clojure.lang.Numbers.ops(Numbers.java:942)
    at clojure.lang.Numbers.isPos(Numbers.java:94)
    at clojure.core$take$fn__4112.invoke(core.clj:2500)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$concat$fn__3804.invoke(core.clj:662)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$map$fn__4091.invoke(core.clj:2437)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245)
    at backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408)
    at backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420)
    at backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315)
    at backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:625)
    at clojure.lang.RestFn.invoke(RestFn.java:410)
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596$fn__3597.invoke(nimbus.clj:860)
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596.invoke(nimbus.clj:859)
    at backtype.storm.timer$schedule_recurring$this__1753.invoke(timer.clj:69)
    at backtype.storm.timer$mk_timer$fn__1736$fn__1737.invoke(timer.clj:33)
    at backtype.storm.timer$mk_timer$fn__1736.invoke(timer.clj:26)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:679)
11774 [Thread-2] INFO  backtype.storm.util  - Halting process: ("Error when processing an event")
4

1 回答 1

4

Naresh,根据我所看到的,您的问题的解决方案可能在于您使用的类名。这是您指定的命令行参数:

storm jar TopologyMain.jar TopologyMain wordcount

您仅使用类名而不是完全限定名将主类称为“TopologyMain”。这是我对您的风暴命令行尝试的修订:

storm jar TopologyMain.jar com.test.newpackage.TopologyMain

我使用完整的包名而不是单独的类。请注意,我还删除了对“wordcount”的引用,因为我不知道它在那里做什么(在您的代码示例中没有名为“wordcount”的类、方法或变量)。

这是一篇关于 Google 群组的优秀文章,其中涵盖了 Storm 的早期设置问题: 早期设置问题

在我开始使用 Storm 的前几周,我发现自己在使用这篇文章。

如果这能解决您的问题,请告诉我们。

于 2013-07-21T10:51:35.210 回答