0

我试图创建一个函数,该函数将从关系数据库中获取数据并将它们插入 Hive 表中。由于我使用的是 Spark 1.6,所以我需要注册一个临时表,因为直接将数据帧写入 Hive 表与 Hive 不兼容

spark_conf = SparkConf()
sc = SparkContext(conf=spark_conf)
sqlContext = HiveContext(sc)

query = "(select * from employees where emp_no < 10008) as emp_alias"
df = sqlContext.read.format("jdbc").option("url", url) \
        .option("dbtable", query) \
        .option("user", user) \
        .option("password", pswd).load()

df.registerTempTable('tempEmp')

sqlContext.sql('insert into table employment_db.hive_employees select * from tempEmp')

RDB 中的employees表包含几千条记录。运行我的程序后,我可以看到创建了两个镶木地板文件:

  • 一个文件,在我的代码完成后创建
  • 两个小时后创建的文件

因此,当我在作业完成后尝试从 Hive 表中进行选择时,会丢失记录。

我有多个想法,这可能会导致问题:

  1. 可能是由于懒惰评估造成的registerTempTable吗?Spark 是否认为我不使用这些记录?我熟悉生成器中的惰性求值,但我无法想象惰性求值在 registerTempTable函数中究竟是如何工作的。
  2. 它是否将临时表保存在tmp文件夹中?会不会是空间不够造成的?我应该使用该dropTempTable功能吗?
  3. 使用起来更安全createOrReplaceTempView(尽管registerTempTable在 Spark 2 中已弃用)。

更多信息

  • 在 Yarn 上使用 Spark 1.6 (Hadoop 2.6.0-cdh5.8.0)
  • 使用不同的 Hive 上下文运行多个作业,但我没有跨上下文访问临时表
4

1 回答 1

0

你能检查一下 df.saveAsTable("db.tempEmp")

1 创建一个新文件employee.txt 及以下内容。

[root@quickstart spark]# vi employee.txt 

Name, Age
Vinayak, 35
Nilesh, 37
Raju, 30
Karthik, 28
Shreshta,1
Siddhish, 2

2 在 spark-shell 上执行以下命令

val employee = sc.textFile("file:///home/cloudera/workspace/spark/employee.txt")
 val employeefirst = employee.first
 val employeeMap = employee.
 filter(e=>e!=employeefirst).
 map(e=>{
   val splitted = e.split(",")
   val name = splitted(0).trim
   val age = scala.util.Try(splitted(1).trim.toInt) getOrElse(0)
   (name, age)
 })

 val employeeDF = employeeMap.toDF("Name", "age")

 employeeDF.show()

scala>  employeeDF.show()
+--------+---+
|    Name|age|
+--------+---+
| Vinayak| 35|
|  Nilesh| 37|
|    Raju| 30|
| Karthik| 28|
|Shreshta|  1|
|Siddhish|  2|
+--------+---+

3 创建一个新数据库。

hive> create database employeetest (optional);
OK
Time taken: 0.325 seconds
hive> use employeetest;
OK
Time taken: 0.153 seconds

4 在employeetest 数据库中创建表。

scala> employeeDF.saveAsTable("employeetest.Employee")

hive> show tables;
OK
employee
Time taken: 0.171 seconds, Fetched: 1 row(s)
hive> select * from employee;
OK
Vinayak 35
Nilesh  37
Raju    30
Karthik 28
Shreshta    1
Siddhish    2
Time taken: 0.462 seconds, Fetched: 6 row(s)

否则您可以使用以下方法从 spark-shell 在配置单元中创建表

scala> employeeDF.registerTempTable("employeetohive")

scala> employeeDF.sqlContext.sql("select * from employeetohive").show
+--------+---+
|    Name|age|
+--------+---+
| Vinayak| 35|
|  Nilesh| 37|
|    Raju| 30|
| Karthik| 28|
|Shreshta|  1|
|Siddhish|  2|
+--------+---+


scala> employeeDF.sqlContext.sql("create table employeetest.employeefromdf select * from employeetohive").show

hive> show tables;
OK
employee
employeefromdf
Time taken: 0.101 seconds, Fetched: 2 row(s)
hive> select * from employeefromdf;
OK
Vinayak 35
Nilesh  37
Raju    30
Karthik 28
Shreshta    1
Siddhish    2
Time taken: 0.246 seconds, Fetched: 6 row(s)
于 2018-03-06T00:36:06.740 回答