12

好吧,我正在使用 PySpark,并且我有一个 Spark 数据框,我使用它将数据插入到 mysql 表中。

url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"

df.write.jdbc(url=url, table="myTable", mode="append")

我想通过列值和特定数字的总和来更新列值(不在主键中)。

我尝试过使用不同的模式(追加、覆盖)DataFrameWriter.jdbc() 函数。

我的问题是我们如何像ON DUPLICATE KEY UPDATE在 mysql 中那样更新列值,同时将 pyspark 数据帧数据插入表中。

4

2 回答 2

1

一种解决方法是将数据插入临时表,然后使用驱动程序执行的 SQL 语句将其迁移到最终表中。您可以使用与您的数据库提供程序相关的任何有效 SQL 语法。

于 2016-11-09T12:13:05.117 回答
0

这在香草(或 Scala Spark,就此而言)是不可能的pyspark,因为您只有 4 种写入模式(来源https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html #pyspark.sql.DataFrameWriter.jdbc):

append:将此 DataFrame 的内容附加到现有数据。

覆盖:覆盖现有数据。

忽略:如果数据已经存在,则静默忽略此操作。

error 或 errorifexists(默认情况):如果数据已经存在,则抛出异常。

但是,有一些 hacky 解决方法:

  1. 有一个jython包允许您jdbc直接编写查询,因此您可以将代码结构化为INSERT ... ON DUPLICATE KEY UPDATE .... 这是链接:https ://pypi.org/project/JayDeBeApi/

  2. 如果您精通 Scala,则可以编写新模式或覆盖org.apache.spark.sql.execution.datasources.jdbcJdbcUtils.scala INSERT INTOto INSERT ... ON DUPLICATE KEY UPDATE ...。或者更好的是,使用MERGE如下语句:

MERGE INTO table-name
USING table-ref
AS name
ON cond
WHEN NOT MATCHED THEN INSERT 
WHEN MATCHED THEN UPDATE

取决于您的 SQL 风格。

  1. 使用您覆盖的暂存表,然后mysql在此暂存环境上编写一个简单的触发器,使其运行INSERT INTO target_table ON DUPLICATE KEY UPDATE

  2. 将您的 Spark DataFrame 移动到DataFrame 并使用原始查询在那里pandas编写您的查询。upsertsqlalchemy

  3. 使用由 Apache Kafka 支持的Spark Streaming创建管道,然后使用具有 jdbc upsert 功能的工具(例如Kafka Connectupsert直接进入您的目标表。或者使用 Kafka Connectupserting从临时表到目标表。这是一些阅读https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html#idempotent-writes

于 2019-11-20T13:28:59.480 回答