0

我们正在 pyspark 中构建数据摄取框架并尝试处理时间戳异常。基本上,希望在不确认架构的单独列中有拒绝记录。

df = spark.createDataFrame(
    [
        ("1988-06-15 11:55:12.1","1"),
        ("1988-06-14 11:55:12", "3"),
        ("1988-06-13 11:55:12","1"),
        ("1988-06-12 11:55:12", "2")
    ],
    ['timestampColm','intColm']
)

在名为 badRecords 的数据框中创建一个新列,以捕获此数据框中可能存在的所有错误,并使用“yyyy-MM-dd HH:mm:dd”格式验证时间戳列。

尝试使用以下代码验证时间戳

样品 1

df1 = df.withColumn("badRecords",
                f.when(
                        to_timestamp(f.col("timestampColm"), "yyyy-MM-dd HH:mm:ss").cast("Timestamp").isNull() & f.col("timestampColm").isNotNull(),f.lit("Not a valid Timestamp")
                       ).otherwise(f.lit(None))
              )

所以它应该将第一条记录“1988-06-15 11:55:12.1”标记为无效,因为它不支持“HH:mm:ss”格式,但它仍然验证记录而不拒绝它。

+--------------------+-----------+----------+
|       timestampColm|    intColm|badRecords|
+--------------------+-----------+----------+
|1988-06-15 11:55:...|          1|      null|
| 1988-06-14 11:55:12|       null|      null|
| 1988-06-13 11:55:12|          1|      null|
| 1988-06-12 11:55:12|          2|      null|
+--------------------+-----------+----------+

经过几次分析发现我们可以使用unix_timestamp但没有运气

样品 2

df1 = df.withColumn("badRecords",
                      f.when(
                            f.from_unixtime(
                                  f.unix_timestamp(
                                         f.col("timestampColm"),"yyyy-MM-dd HH:mm:ss")
                            ).cast("timestamp").isNull() & f.col("timestampColm").isNotNull(),
                            f.lit("Not a valid Timestamp")
                    ).otherwise(f.lit(None))
                )

帮助我了解我缺少什么,因为它仍在验证而不是拒绝记录?

4

2 回答 2

0

我可以通过创建自定义 UDF 来解决这个问题,并且工作正常。

validate_timestamp_udf = udf(lambda val: validate_timestamp(val))
df6 = df2.withColumn("badRecords",validate_timestamp_udf(f.col(ColName)))

validate_timestamp()函数中,我在正则表达式的帮助下进行格式验证。

于 2020-08-26T11:38:01.283 回答
0

在您的情况下,您已经编写了 & 但它应该是“和”以便进行逻辑运算。'&' 是位运算符。可能在 pyspark 中被视为逻辑运算符。考虑试试这个 -: df1 = df.withColumn("badRecords", f.when( (to_timestamp(f.col("timestampColm"), "yyyy-MM-dd HH:mm:ss").cast("Timestamp ").isNull()) & (f.col("timestampColm").isNotNull()),f.lit("不是有效的时间戳")).otherwise(f.lit(None)))

我的意思是考虑添加括号并将每个条件括起来,例如 (condition1) & (condition2)。希望这可以帮助。

于 2020-05-13T15:50:36.990 回答