2

******* 更新 ********

正如评论中所建议的,我消除了代码中不相关的部分:

我的要求:

  1. 将毫秒数统一为 3
  2. 将字符串转换为时间戳并将值保留为 UTC

创建数据框:

val df = Seq("2018-09-02T05:05:03.456Z","2018-09-02T04:08:32.1Z","2018-09-02T05:05:45.65Z").toDF("Timestamp")

这里使用 spark shell 的结果:

在此处输入图像描述

************ 结束更新 *********************************

尝试使用 scala 在 Spark 中处理时区和时间戳格式时,我很头疼。

这是我的脚本的简化以解释我的问题:

 import org.apache.spark.sql.functions._

 val jsonRDD  = sc.wholeTextFiles("file:///data/home2/phernandez/vpp/Test_Message.json")

 val jsonDF =  spark.read.json(jsonRDD.map(f => f._2))

这是生成的架构:

  root
 |-- MeasuredValues: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- MeasuredValue: double (nullable = true)
 |    |    |-- Status: long (nullable = true)
 |    |    |-- Timestamp: string (nullable = true)

然后我只选择 Timestamp 字段如下

jsonDF.select(explode($"MeasuredValues").as("Values")).select($"Values.Timestamp").show(5,false)

不同毫秒长度的时间戳

我要修复的第一件事是每个时间戳的毫秒数并将其统一为三个。

我应用 date_format 如下

jsonDF.select(explode($"MeasuredValues").as("Values")).select(date_format($"Values.Timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")).show(5,false)

毫秒统一但时区改变

毫秒格式是固定的,但时间戳从 UTC 转换为本地时间。

为了解决这个问题,我将 to_utc_timestamp 与我的本地时区一起应用。

jsonDF.select(explode($"MeasuredValues").as("Values")).select(to_utc_timestamp(date_format($"Values.Timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),"Europe/Berlin").as("Timestamp")).show(5,false)

to_utc_timestamp 输出

更糟糕的是,不会返回 UTC 值,并且会丢失毫秒格式。

任何想法如何处理这个?我会很感激

BR。保罗

4

1 回答 1

4

问题的原因是用于转换的时间格式字符串:

yyyy-MM-dd'T'HH:mm:ss.SSS'Z'

如您所见,Z在单引号内,这意味着它不被解释为区域偏移标记,而只是作为T中间的字符。

因此,格式字符串应更改为

yyyy-MM-dd'T'HH:mm:ss.SSSX

其中X是 Java 标准日期时间格式化程序模式(Z是 0 的偏移值)。

现在,源数据可以转换为 UTC 时间戳:

val srcDF = Seq(
  ("2018-04-10T13:30:34.45Z"),
  ("2018-04-10T13:45:55.4Z"),
  ("2018-04-10T14:00:00.234Z"),
  ("2018-04-10T14:15:04.34Z"),
  ("2018-04-10T14:30:23.45Z")
).toDF("Timestamp")

val convertedDF = srcDF.select(to_utc_timestamp(date_format($"Timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSX"), "Europe/Berlin").as("converted"))

convertedDF.printSchema()
convertedDF.show(false)

/**
root
|-- converted: timestamp (nullable = true)

+-----------------------+
|converted              |
+-----------------------+
|2018-04-10 13:30:34.45 |
|2018-04-10 13:45:55.4  |
|2018-04-10 14:00:00.234|
|2018-04-10 14:15:04.34 |
|2018-04-10 14:30:23.45 |
+-----------------------+
*/

如果您需要将时间戳转换回字符串并将值标准化为具有 3 个尾随零,则应该有另一个date_format调用,类似于您在问题中已经应用的调用。

于 2018-09-02T08:52:40.993 回答