1

考虑这个最小的 Spark 作业,它将 CSV 读取到 DataFrame 并将其写为 Parquet:

val df = spark.read.format("csv").option("inferSchema", true).load(filename)
df.write.parquet("parquet_folder/")

对于输入文件中的任何时间戳列,Parquet 输出将包含具有即时语义的时间戳,解释当前 Spark 会话/JVM 时区中源数据中的时间字符串。因此,如果我的 Spark 作业在 EST/EDT 中运行,“2020-01-01 00:00”将变为“2020-01-01 00:00-0500”。

这意味着,除非所有 Spark 作业都在一个一致的时区中运行,否则我可能会有差异。

还有一个理论上的问题是 Parquet 实际上并不代表我的数据。我不知道文件中的午夜是否真的是 EST、PST、UTC 等的午夜,我真的不在乎。

Parquet 格式确实支持具有本地时间语义的时间戳概念,类似于java.util.LocalDateTime- 日期/时间的抽象概念,而不是特定时刻,无论 Spark 会话或 JVM 的时区如何,都将一致地解释它。

我想要的是 Spark 将时间戳从 CSV 读取到本地时间,然后相应地写入 Parquet。理想情况下,我也想从日期和“没有时区的时间戳”列中将其应用于 Spark JDBC 提取。

这甚至可能吗?

(注:Parquet 格式文档解释了即时语义和本地时间语义之间的区别。)

4

1 回答 1

0

我有同样的问题。Spark 时间戳转换可能会令人困惑。

要使 spark 作业对任何主机的默认本地时区设置具有鲁棒性,请添加一个额外的层来临时明确地设置 spark 时区:

from contextlib import contextmanager
from pyspark.sql import SparkSession

@contextmanager
def spark_timezone(timezone: str):
    """Context manager to temporarily set spark timezone during context manager
    life time while preserving original timezone. This is especially
    meaningful in conjunction with casting timestamps when automatic timezone
    conversions are applied by spark.

    Please be aware that the timezone property should be adjusted during DAG
    creation and execution (including both spark transformations and actions).
    Changing the timezone while adding filter/map tasks might not be
    sufficient. Be sure to change the timezone when actually executing a spark
    action like collect/save etc.

    Parameters
    ----------
    timezone: str
        Name of the timezone (e.g. 'UTC' or 'Europe/Berlin').

    Examples
    --------
    >>> with spark_timezone("Europe/Berlin"):
    >>>     df.select(df["ts_col"].cast("timestamp")).show()

    """

    spark = get_active_spark_context()
    current = spark.conf.get("spark.sql.session.timeZone")
    spark.conf.set("spark.sql.session.timeZone", timezone)

    try:
        yield None
    finally:
        spark.conf.set("spark.sql.session.timeZone", current)


def get_active_spark_context() -> SparkSession:
    """Helper function to return the currently active spark context.

    """

    return SparkSession.builder.getOrCreate()


现在,您可以spark.read.csv通过上下文管理器使用明确的 UTC 时区包装您的内容,以防止任何转换:

with spark_timezone("UTC"):
    df = spark.read.csv("path_to_file")

更新 2022-01-31

在读取包含时间戳列的 CSV 文件时再次遇到此问题。无论您是否明确设置 spark 的时区,时间戳列都将在本地时区中解释。

Spark 的时区不会影响在 JVM 中将字符串转换为时间戳。相反,您需要修改 JVM 时区设置,如这篇 SO文章中所述。

于 2021-01-13T15:55:01.377 回答