0

我在 Azure 中使用带有 spark 1.6 的 hortonworks 沙箱。我有一个填充了 TPC-DS 示例数据的 Hive 数据库。我想从外部文件中读取一些 SQL 查询并在 spark 的 hive 数据集上运行它们。我按照这个主题在 spark 中使用 hive 数据库,它只是在我的数据集中使用一个表,并且它再次在 spark 中编写 SQL 查询,但我需要将整个数据集定义为我的查询源,我认为我应该使用数据帧但我不确定也不知道怎么做!我也想从外部 .sql 文件导入 SQL 查询,不要再写下查询!请你指导我该怎么做?非常感谢,最好的!

4

1 回答 1

0

Spark 可以直接从 Hive 表中读取数据。您可以使用 Spark 创建、删除 Hive 表,甚至可以通过 Spark 执行所有 Hive hql 相关操作。为此,您需要使用 SparkHiveContext

来自 Spark 文档:

Spark HiveContext 提供了基本 SQLContext 提供的功能的超集。其他功能包括使用更完整的 HiveQL 解析器编写查询的能力、对 Hive UDF 的访问以及从 Hive 表中读取数据的能力。要使用 HiveContext,您不需要现有的 Hive 设置。

有关更多信息,您可以访问Spark 文档

为了避免在代码中编写 sql,您可以使用属性文件来放置所有 Hive 查询,然后您可以在代码中使用该键。

请看下面 Spark HiveContext 的实现和 Spark Scala 中属性文件的使用。

package com.spark.hive.poc

import org.apache.spark._
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.DataFrame;
import org.apache.spark.rdd.RDD;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.hive.HiveContext;

//Import Row.
import org.apache.spark.sql.Row;
//Import Spark SQL data types
import org.apache.spark.sql.types.{ StructType, StructField, StringType };

object ReadPropertyFiles extends Serializable {

  val conf = new SparkConf().setAppName("read local file");

  conf.set("spark.executor.memory", "100M");
  conf.setMaster("local");

  val sc = new SparkContext(conf)
  val sqlContext = new HiveContext(sc)

  def main(args: Array[String]): Unit = {

    var hadoopConf = new org.apache.hadoop.conf.Configuration();
    var fileSystem = FileSystem.get(hadoopConf);
    var Path = new Path(args(0));
    val inputStream = fileSystem.open(Path);
    var Properties = new java.util.Properties;
    Properties.load(inputStream);

    //Create an RDD
    val people = sc.textFile("/user/User1/spark_hive_poc/input/");
    //The schema is encoded in a string
    val schemaString = "name address";

    //Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));

    //Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim));
    //Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
    peopleDataFrame.printSchema();

    peopleDataFrame.registerTempTable("tbl_temp")

    val data = sqlContext.sql(Properties.getProperty("temp_table"));

    //Drop Hive table
    sqlContext.sql(Properties.getProperty("drop_hive_table"));
    //Create Hive table
    sqlContext.sql(Properties.getProperty("create_hive_tavle"));
    //Insert data into Hive table
    sqlContext.sql(Properties.getProperty("insert_into_hive_table"));
    //Select Data into Hive table
    sqlContext.sql(Properties.getProperty("select_from_hive")).show();

    sc.stop

  }
}

属性文件中的条目:

temp_table=select * from tbl_temp
drop_hive_table=DROP TABLE IF EXISTS default.test_hive_tbl
create_hive_tavle=CREATE TABLE IF NOT EXISTS default.test_hive_tbl(name string, city string) STORED AS ORC
insert_into_hive_table=insert overwrite table default.test_hive_tbl select * from tbl_temp
select_from_hive=select * from default.test_hive_tbl

Spark submit 命令运行此作业:

[User1@hadoopdev ~]$ spark-submit --num-executors 1 \
--executor-memory 100M --total-executor-cores 2 --master local \
--class com.spark.hive.poc.ReadPropertyFiles Hive-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
/user/User1/spark_hive_poc/properties/sql.properties

注意:属性文件位置应为 HDFS 位置。

于 2016-12-27T11:24:48.777 回答