1

我想使用 spark 执行更新和插入操作请找到现有表的图像参考

现有表

在这里,我正在更新 id :101 location 和 inserttime 并插入另外 2 条记录:

在此处输入图像描述

并使用模式覆盖写入目标

df.write.format("jdbc")
  .option("url",  "jdbc:mysql://localhost/test")
  .option("driver","com.mysql.jdbc.Driver")
  .option("dbtable","temptgtUpdate")
  .option("user", "root")
  .option("password", "root")
  .option("truncate","true")
  .mode("overwrite")
  .save()

执行上述命令后,我的数据已损坏,已插入 db 表

在此处输入图像描述

数据框中的数据

在此处输入图像描述

您能否让我知道您的观察和解决方案

4

5 回答 5

1

如果您需要在 pyspark 代码中执行 UPSERT/DELETE 操作,我建议您使用 pymysql 库,并执行您的 upsert/delete 操作。请查看此帖子以获取更多信息和代码示例以供参考:在使用重复键上的插入表时出错,使用 for 循环数组

请根据您的需要修改代码示例。

于 2020-05-11T05:12:10.223 回答
1

Spark JDBC writer 支持以下模式:

由于您使用的是“覆盖”模式,因此它会根据列长度重新创建表,如果您希望自己的表定义首先创建表并使用“附加”模式

于 2020-05-10T01:51:02.390 回答
1

我想使用 spark 执行更新和插入操作

Spark SQL中没有与 SQLUPDATE语句等效的功能。也没有与 Spark SQL 等效的 SQLDELETE WHERE语句。相反,您必须在 Spark 之外删除需要更新的行,然后使用模式将包含新记录和更新记录的 Spark 数据帧写入表append(以保留表中剩余的现有行)。

于 2020-05-10T08:47:52.840 回答
0

我不推荐 TRUNCATE,因为它实际上会删除表并创建新表。执行此操作时,表可能会丢失之前设置的列级属性......所以在使用 TRUNCATE 时要小心,并确保删除表/重新创建表是否可以。

于 2020-05-11T05:14:44.093 回答
-1

执行以下步骤时,Upsert 逻辑工作正常

  df = (spark.read.format("csv").
        load("file:///C:/Users/test/Desktop/temp1/temp1.csv", header=True,
             delimiter=','))

并这样做

  (df.write.format("jdbc").
    option("url", "jdbc:mysql://localhost/test").
    option("driver", "com.mysql.jdbc.Driver").
    option("dbtable", "temptgtUpdate").
    option("user", "root").
    option("password", "root").
    option("truncate", "true").
    mode("overwrite").save())

尽管如此,当我直接使用数据框编写时,我仍然无法理解为什么它失败的逻辑

于 2020-05-11T04:58:39.323 回答