0

我在 Ubuntu 14.04 上使用 Apache Hadoop 2.5.0 的单节点集群设置,我使用 Flume 将推文存储在我的 HDFS 中。然后,我使用以下 Hive 命令在 Hive 中创建了一个表,该表以表格格式存储所有推文:

CREATE EXTERNAL TABLE tweets (
   id BIGINT,
   created_at STRING,
   source STRING,
   favorited BOOLEAN,
   retweet_count INT,
   retweeted_status STRUCT<
      text:STRING,
      user:STRUCT<screen_name:STRING,name:STRING>>,
   entities STRUCT<
      urls:ARRAY<STRUCT<expanded_url:STRING>>,
      user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
      hashtags:ARRAY<STRUCT<text:STRING>>>,
   text STRING,
   user STRUCT<
      screen_name:STRING,
      name:STRING,
      friends_count:INT,
      followers_count:INT,
      statuses_count:INT,
      verified:BOOLEAN,
      utc_offset:INT,
      time_zone:STRING>,
   in_reply_to_screen_name STRING
)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/user/flume/tweets';

我已经通过使用 HiveQL(来自 Hive 命令行界面)查询数据库来验证数据是否存在于表“推文”中。我还使用以下命令创建了一个输出表:

CREATE TABLE outputtable (
    a STRING,
    b INT );

我正在使用已经包含 HCatalog 的 Apache Hive 0.13.1。毕竟,我正在尝试在 Eclipse 中使用 Java 语言编写 MapReduce Job。我已将以下库作为外部 jar 添加到我的项目中:

  1. path-of-installation-of-hadoop/share/hadoop/common 中存在的所有库
  2. path-of-installation-of-hadoop/share/hadoop/mapreduce 中存在的所有库
  3. Hive 的 lib 文件夹中存在的所有库
  4. path-of-installation-of-Hive/hcatalog/share/hcatalog 中存在的所有库

我的 MapReduce 代码试图从表“推文”中导入推文的文本,然后对其进行处理。我的 MapReduce 代码是:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hcatalog.common.*;
import org.apache.hcatalog.mapreduce.*;
import org.apache.hcatalog.data.*;
import org.apache.hcatalog.data.schema.*;

public class UseHCat extends Configured implements Tool {

    public static class Map extends Mapper<WritableComparable, HCatRecord, Text, IntWritable> {
    String tweetText;

    @Override
      protected void map( WritableComparable key,
                      HCatRecord value,
                      org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
                      Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        tweetText = (String) value.get(7);
        int i = 1;
        context.write(new Text(tweetText), new IntWritable(i));
    }
    }

    public static class Reduce extends Reducer<Text, IntWritable,
                                   WritableComparable, HCatRecord> {

    protected void reduce( Text key,
                           java.lang.Iterable<IntWritable> values,
                           org.apache.hadoop.mapreduce.Reducer<Text, IntWritable,
                           WritableComparable, HCatRecord>.Context context)
        throws IOException, InterruptedException {
        Iterator<IntWritable> iter = values.iterator();
        IntWritable iw = iter.next();
        int id = iw.get();
        HCatRecord record = new DefaultHCatRecord(2);
        record.set(0, key.toString());
        record.set(1, id);
        context.write(null, record);
    }
    }

    public int run(String[] args) throws Exception {
    Configuration conf = getConf();

    String inputTableName = "tweets";
    String outputTableName = "outputtable";
    String dbName = null;

    Job job = new Job(conf, "UseHCat");
    HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));
    job.setJarByClass(UseHCat.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    // An HCatalog record as input
    job.setInputFormatClass(HCatInputFormat.class);

    // Mapper emits a string as key and an integer as value
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    // Ignore the key for the reducer output; emitting an HCatalog record as value
    job.setOutputKeyClass(WritableComparable.class);
    job.setOutputValueClass(DefaultHCatRecord.class);
    job.setOutputFormatClass(HCatOutputFormat.class);

    HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
    HCatSchema s = HCatOutputFormat.getTableSchema(job);
    System.err.println("INFO: output schema explicitly set for writing:" + s);
    HCatOutputFormat.setSchema(job, s);
    return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new UseHCat(), args);
    System.exit(exitCode);
    }
}

我们面临的第一个问题是我们收到许多警告,指出某些类型和构造函数已被弃用。我们忽略了警告并创建了一个主类为“UseHCat”的项目的 jar 文件。然后我们使用 Ubuntu 中提供的终端浏览到创建 jar 文件的位置并运行以下命令:

hadoop jar MyProject.jar

我们收到以下错误:

14/11/16 17:17:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/metadata/HiveStorageHandler
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at org.apache.hcatalog.mapreduce.InitializeInput.getInputJobInfo(InitializeInput.java:146)
    at org.apache.hcatalog.mapreduce.InitializeInput.setInput(InitializeInput.java:86)
    at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:86)
    at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:55)
    at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:47)
    at UseHCat.run(UseHCat.java:64)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
    at UseHCat.main(UseHCat.java:91)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.metadata.HiveStorageHandler
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 26 more
4

1 回答 1

0

Hive 是为了最小化编写 mapreduce 程序而开发的。您可以使用 Hive 查询执行该过程,在内部它将转换为 mapreduce 作业。

但是,如果您想访问 Hivedb 数据,您可以访问。Hive 不是数据库。以可读格式存储在仓库目录下的所有数据。您可以将完整路径作为 mapreduce 程序的输入。

您是否在 Eclipse 中尝试过示例 mapreduce 程序。因为您已经构建了 Hadoop 插件,或者您可以使用 Eclipse 中现有的插件来运行 mapreduce。

于 2014-11-17T06:55:20.400 回答