1

抱歉,如果我在这里真的很基础,但我需要一点 Pyspark 帮助来尝试动态覆盖配置单元表中的分区。表格已大大简化,但我正在努力解决的问题(我希望)很清楚。我对 PySpark 很陌生,并且已经通过 StackOverflow 搜索了足够多的时间来最终创建一个帐户并询问......!提前致谢!!

我有一个从数据帧(trx)构建的大型分区配置单元表(HIVETABLE_TRX)。我将更多数据提取为数据帧 (trxup),并希望适当地附加或覆盖 HIVETABLE_TRX 中的相关分区。

Dataframe (trx)

+---------------+----------+------+
|PRODUCT_LN_NAME|LOCAL_DATE|   TRX|
+---------------+----------+------+
|          HOTEL|2019-01-01|14298 |
|          HOTEL|2019-01-02|19020 |
|          HOTEL|2019-01-03|18927 |
+---------------+----------+------+

trx.write \
    .partitionBy("PRODUCT_LN_NAME","LOCAL_DATE") \
    .saveAsTable("HIVETABLE_TRX",mode='overwrite')

#Have a look at the partitioned hive table
trxchk = spark.sql("""select * from HIVETABLE_TRX""")
trxchk.show()

+------+---------------+----------+
|   TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+------+---------------+----------+
|14298 |          HOTEL|2019-01-01|
|19020 |          HOTEL|2019-01-02|
|18927 |          HOTEL|2019-01-03|
+------+---------------+----------+

要添加到 Hive 表的数据帧 (trxup) 有一个我要覆盖的重叠行 ('HOTEL'、'2019-01-03') 和 3 个要追加的增量行。

#Have a look at second dataframe (trxup)
+---------------+----------+------+
|PRODUCT_LN_NAME|LOCAL_DATE|   TRX|
+---------------+----------+------+
|         FLIGHT|2019-01-03|14410 |
|          HOTEL|2019-01-03|18927 |
|         FLIGHT|2019-01-04|15430 |
|          HOTEL|2019-01-04|19198 |
+---------------+----------+------+

我尝试将 trxup 插入 HIVETABLE_TRX,如下所示:

trxup.write \
    .insertInto("HIVETABLE_TRX",overwrite=True)

我的理解是,这将覆盖 trxup 和 HIVETABLE_TRX 之间共有的一行并附加剩余的行。

#Have a look at HIVETABLE_TRX after the basic insertInto
trxchk2 = spark.sql("""select * from HIVETABLE_TRX""")
trxchk2.show()

+----+---------------+----------+
| TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+----+---------------+----------+
|null|     2019-01-03|    14410 |
|null|     2019-01-03|    18927 |
|null|     2019-01-04|    15430 |
|null|     2019-01-04|    19198 |
+----+---------------+----------+

如您所见,它无法按名称对齐列并覆盖 HIVETABLE_TRX 中的所有现有分区。

所以: 1. 如何确保 insertInto 的列对齐?- 这是我能想到的最好的方法,虽然成功,但感觉不应该是这样的......?

colList = spark.sql("""select * from HIVETABLE_TRX""").columns
trxup.selectExpr(colList) \
    .write \
    .insertInto("HIVETABLE_TRX")
  1. 我可以将第二个 df (trxup) 插入分区的配置单元表 (HIVETABLE_TRX) 中,只需添加/覆盖适当的分区吗?

在经过大量 Google、Stackoverflow 和灵魂搜索之后,我尝试过的其他事情:

为解释器添加了选项

hive.exec.dynamic.partition = true
hive.exec.dynamic.partition.mode = nonstrict
spark.sql.sources.partitionOverwriteMode = dynamic

试图通过 insertInto 上的 trxup 进行分区

trxup.write \
    .partitionBy("PRODUCT_LN_NAME","LOCAL_DATE") \
    .insertInto("PROJECT_MERCH.AM_PARTITION_TEST_TRX",overwrite=True)

AnalysisException: u"insertInto() can't be used together with partitionBy(). Partition columns have already be defined for the table. It is not necessary to use partitionBy().;"

从 insertInto 中删除了 overwrite=True,如果不是我想要的,它实际上做了我在这一点上所期望的。

+------+---------------+----------+
|   TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+------+---------------+----------+
|14298 |          HOTEL|2019-01-01|
|19020 |          HOTEL|2019-01-02|
|18927 |          HOTEL|2019-01-03|
|  null|     2019-01-03|    14410 |
|  null|     2019-01-03|    18927 |
|  null|     2019-01-04|    15430 |
|  null|     2019-01-04|    19198 |
+------+---------------+----------+

我意识到我可以将 trxup 转换为已分区的配置单元表 (HIVETABLE_TRXUP),然后将它们合并在一起,但这感觉不是一种最佳方式 - 有点违背了分区表的目的,不是吗?

trxjoined = spark.sql("""select * from HIVETABLE_TRX t full outer join HIVETABLE_TRXUP tu on t.SITE_NAME=tu.SITE_NAME and t.LOCAL_DATE=tu.LOCAL_DATE""")
spark.sql("""drop table if exists HIVETABLE_TRX""")
spark.sql("""drop table if exists HIVETABLE_TRXUP""")
trxjoined.write \
    .partitionBy("SITE_NAME","LOCAL_DATE") \
    .saveAsTable("HIVETABLE_TRX",mode='overwrite')
4

0 回答 0