1

我正在使用 spark JDBC 将行保存在数据库中。数据的保存工作正常。

问题:如果遇到任何错误记录,Spark 会中止保存(例如,当表期望非空值时,一列具有空值)

我想要什么:我希望 Spark 忽略坏行并继续保存下一行。这怎么可能实现?我在文档中看不到太多。使用StructType不是一种选择。

任何指针?

我的代码看起来像这样。

class DatabaseWriter {

  def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession): Unit = {
    val dbProperties = getSQLProperties(sparkSession, configurationProp)

    dataFrameTobeWritten.write.mode(SaveMode.Overwrite)
        .option("driver", dbProperties.driverName)
        .option("truncate", "true")
        .option("batchsize", configurationProp.WriterBatchSize())
        .jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
  }
}
4

1 回答 1

0

在方法中添加非空列的列表,并使用它们创建过滤条件以过滤掉坏行

  class DatabaseWriter {

  def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession, notNullColumns : List[String]): Unit = {
    val dbProperties = getSQLProperties(sparkSession, configurationProp)
    val filterCondition = notNullColumns.map(c -> s"$c IS NOT NULL").mkString(" AND ")
    dataFrameTobeWritten.filter(filterCondition).write.mode(SaveMode.Overwrite)
    .option("driver", dbProperties.driverName)
    .option("truncate", "true")
    .option("batchsize", configurationProp.WriterBatchSize())
    .jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
  }
}
于 2020-12-03T14:07:06.803 回答