0

我想将数据存储在 Storm Spout 发出的 hdfs 中。我在 Bolt 类中添加了 hadoop FS API 代码,但它会引发编译错误。

以下是Storm 螺栓类:

package bolts;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class DataNormalizer extends BaseBasicBolt {

  public void execute(Tuple input, BasicOutputCollector collector) {
    String sentence = input.getString(0);
    String[] process = sentence.split(" ");
    int n = 1;
    String rec = "";
    try {
        String filepath = "/root/data/top_output.csv";
        String dest = "hdfs://localhost:9000/user/root/nishu/top_output/top_output_1.csv";

        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(conf);
        System.out.println(fileSystem);
        Path srcPath = new Path(source);
        Path dstPath = new Path(dest);
        String filename = source.substring(source.lastIndexOf('/') + 1,
                source.length());
        try {
            if (!(fileSystem.exists(dstPath))) {
                FSDataOutputStream out = fileSystem.create(dstPath, true);
                InputStream in = new BufferedInputStream(
                        new FileInputStream(new File(source)));
                byte[] b = new byte[1024];
                int numBytes = 0;
                while ((numBytes = in.read(b)) > 0) {
                    out.write(b, 0, numBytes);
                }
                in.close();
                out.close();

            } else {
                fileSystem.copyFromLocalFile(srcPath, dstPath);
            }
        } catch (Exception e) {
            System.err.println("Exception caught! :" + e);
            System.exit(1);
        } finally {
            fileSystem.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}

我还在 CLASSPATH 中添加了 hadoop jars。以下是 classpath 的值:

$STORM_HOME/storm-0.8.1.jar:$JAVA_HOME/lib/:$HADOOP_HOME/hadoop-core-1.0.4.jar:$HADOOP_HOME/lib/:$STORM_HOME/lib/

还复制了 hadoop 库:在Storm/lib目录中的 hadoop-cor-1.0.4.jar、commons-collection-3.2.1.jarcommons-cli-1.2.jar 。

当我构建这个项目时,它会抛出以下错误:

3006 [Thread-16] ERROR backtype.storm.daemon.executor  -
java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration
        at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:37)
        at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:34)
        at org.apache.hadoop.security.UgiInstrumentation.create(UgiInstrumentation.java:51)
        at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:216)
        at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:184)
        at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:236)
        at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:466)
        at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:452)
        at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:1494)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1395)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:123)
        at bolts.DataNormalizer.execute(DataNormalizer.java:67)
        at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:32)
       ......................
4

1 回答 1

3

错误消息告诉您缺少 Apache 公共配置。您必须将其添加到类路径中。

更一般地说,您应该将所有 Hadoop 依赖项添加到您的类路径中。您可以使用依赖管理器(Maven、Ivy、Gradle 等)找到它们,或者查看/usr/lib/hadoop/lib安装了 Hadoop 的机器。

于 2013-03-21T07:21:03.340 回答