1

在我的脚本中,该write方法PySpark获取一个数据帧并将其写入 a Redshift,但是在某些数据帧中,有一些布尔列返回错误,指出Redshift不接受位数据类型。

我的问题是因为它说应该是布尔值的是位。

编码:

spark = (
    SparkSession.builder.appName("data_quality")
    .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
    .config("spark.jars", "redshift-jdbc42-2.1.0.3.jar")
    .config("spark.hadoop.fs.s3a.access.key", "key")
    .config("spark.hadoop.fs.s3a.secret.key", "secret_key" )
    .config("spark.hadoop.fs.s3a.session.token","tokiem")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate()
)

df = spark.createDataFrame(
    [
        (1, False),  # create your data here, be consistent in the types.
        (2, True),
    ],
    ["id", 'column_type_bool']  # add your column names here
)
df.show()
df.dtypes

df.write \
  .format("jdbc") \
  .option("url", f"jdbc:redshift://{url_db}:5439/{db_name}") \
  .option("driver", "com.amazon.redshift.jdbc42.Driver") \
  .option("dbtable", f"{schema}.{tab}") \
  .option("user", user_db) \
  .option("password", pw) \
  .option("tempdir", "s3a://path") \
  .mode("overwrite") \
  .save()

桌子:

根 |-- namecolumn: boolean (nullable = true)

错误:

Py4JJavaError: An error occurred while calling o113.save.
: com.amazon.redshift.util.RedshiftException: ERROR: Column "nametable.namecolumn" has unsupported type "bit".
    at com.amazon.redshift.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2601)
    at com.amazon.redshift.core.v3.QueryExecutorImpl.processResultsOnThread(QueryExecutorImpl.java:2269)
    at com.amazon.redshift.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1880)
    at com.amazon.redshift.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1872)
    at com.amazon.redshift.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
    at com.amazon.redshift.jdbc.RedshiftStatementImpl.executeInternal(RedshiftStatementImpl.java:514)
    at com.amazon.redshift.jdbc.RedshiftStatementImpl.execute(RedshiftStatementImpl.java:435)
    at com.amazon.redshift.jdbc.RedshiftStatementImpl.executeWithFlags(RedshiftStatementImpl.java:376)
    at com.amazon.redshift.jdbc.RedshiftStatementImpl.executeCachedSql(RedshiftStatementImpl.java:362)
    at com.amazon.redshift.jdbc.RedshiftStatementImpl.executeWithFlags(RedshiftStatementImpl.java:339)
    at com.amazon.redshift.jdbc.RedshiftStatementImpl.executeUpdate(RedshiftStatementImpl.java:297)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.executeStatement(JdbcUtils.scala:1026)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:912)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:80)
4

0 回答 0