我试图创建一个函数,该函数将从关系数据库中获取数据并将它们插入 Hive 表中。由于我使用的是 Spark 1.6,所以我需要注册一个临时表,因为直接将数据帧写入 Hive 表与 Hive 不兼容:
spark_conf = SparkConf()
sc = SparkContext(conf=spark_conf)
sqlContext = HiveContext(sc)
query = "(select * from employees where emp_no < 10008) as emp_alias"
df = sqlContext.read.format("jdbc").option("url", url) \
.option("dbtable", query) \
.option("user", user) \
.option("password", pswd).load()
df.registerTempTable('tempEmp')
sqlContext.sql('insert into table employment_db.hive_employees select * from tempEmp')
RDB 中的employees
表包含几千条记录。运行我的程序后,我可以看到创建了两个镶木地板文件:
- 一个文件,在我的代码完成后创建
- 两个小时后创建的文件
因此,当我在作业完成后尝试从 Hive 表中进行选择时,会丢失记录。
我有多个想法,这可能会导致问题:
- 可能是由于懒惰评估造成的
registerTempTable
吗?Spark 是否认为我不使用这些记录?我熟悉生成器中的惰性求值,但我无法想象惰性求值在registerTempTable
函数中究竟是如何工作的。 - 它是否将临时表保存在
tmp
文件夹中?会不会是空间不够造成的?我应该使用该dropTempTable
功能吗? - 使用起来更安全
createOrReplaceTempView
(尽管registerTempTable
在 Spark 2 中已弃用)。
更多信息
- 在 Yarn 上使用 Spark 1.6 (Hadoop 2.6.0-cdh5.8.0)
- 使用不同的 Hive 上下文运行多个作业,但我没有跨上下文访问临时表