******* 更新 ********
正如评论中所建议的,我消除了代码中不相关的部分:
我的要求:
- 将毫秒数统一为 3
- 将字符串转换为时间戳并将值保留为 UTC
创建数据框:
val df = Seq("2018-09-02T05:05:03.456Z","2018-09-02T04:08:32.1Z","2018-09-02T05:05:45.65Z").toDF("Timestamp")
这里使用 spark shell 的结果:
************ 结束更新 *********************************
尝试使用 scala 在 Spark 中处理时区和时间戳格式时,我很头疼。
这是我的脚本的简化以解释我的问题:
import org.apache.spark.sql.functions._
val jsonRDD = sc.wholeTextFiles("file:///data/home2/phernandez/vpp/Test_Message.json")
val jsonDF = spark.read.json(jsonRDD.map(f => f._2))
这是生成的架构:
root
|-- MeasuredValues: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- MeasuredValue: double (nullable = true)
| | |-- Status: long (nullable = true)
| | |-- Timestamp: string (nullable = true)
然后我只选择 Timestamp 字段如下
jsonDF.select(explode($"MeasuredValues").as("Values")).select($"Values.Timestamp").show(5,false)
我要修复的第一件事是每个时间戳的毫秒数并将其统一为三个。
我应用 date_format 如下
jsonDF.select(explode($"MeasuredValues").as("Values")).select(date_format($"Values.Timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")).show(5,false)
毫秒格式是固定的,但时间戳从 UTC 转换为本地时间。
为了解决这个问题,我将 to_utc_timestamp 与我的本地时区一起应用。
jsonDF.select(explode($"MeasuredValues").as("Values")).select(to_utc_timestamp(date_format($"Values.Timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),"Europe/Berlin").as("Timestamp")).show(5,false)
更糟糕的是,不会返回 UTC 值,并且会丢失毫秒格式。
任何想法如何处理这个?我会很感激
BR。保罗