0

我正在尝试将我的 Spark Scala 脚本(spark-shellspark-submit. 我使用 Spark SQL 进行了很多调用,这些调用执行了大量关于时区的时间戳计算。我必须明确设置以下配置(因为每个分布式节点可能配置了不同的默认时区),以确保我的时区始终为 UTC,以便通过该方法中的任何 Spark SQL 函数调用(代码块)进行任何后续 Spark SQL 时间戳操作。

spark.conf.set("spark.sql.session.timeZone", "UTC")

如果该方法签名包含 (spark: org.apache.spark.sql.SparkSession) 作为参数,那么我总是可以从将时区设置为 UTC 的显式代码语句开始,SparkSession而不用冒险(所有分布式 Spark 节点可能有也可能没有完全相同的时区配置)?

我遇到的下一个问题是,我如何确定由 设置的“spark”变量spark-shell是 aval还是var?在寻找这个问题的答案时,我找到了这个代码片段,希望找出这个 Scala 变量是immutable还是mutable. 但它没有告诉我 Scala 变量spark是 avar还是 a val。设置为后是否需要返回spark到方法调用者spark.sql.session.timeZoneUTC因为我在我的方法中对其进行了修改?目前我的方法签名需要两个输入参数(org.apache.spark.sql.SparkSession, org.apache.spark.sql.DataFrame),输出是一个 tuple (org.apache.spark.sql.SparkSession, org.apache.spark.sql.DataFrame)

scala> def manOf[T: Manifest](t: T): Manifest[T] = manifest[T]
manOf: [T](t: T)(implicit evidence$1: Manifest[T])Manifest[T]

scala> manOf(List(1))
res3: Manifest[List[Int]] = scala.collection.immutable.List[Int]

scala> manOf(spark)
res2: Manifest[org.apache.spark.sql.SparkSession] = org.apache.spark.sql.SparkSession

额外上下文:作为启动spark-shell的一部分,变量spark初始化如下:

Spark context available as 'sc' (master = yarn, app id = application_1234567890_111111).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_REDACTED)
Type in expressions to have them evaluated.
Type :help for more information.
4

1 回答 1

1

感谢@Luis Miguel Mejía Suárez为我提供答案和提示,作为对我问题的评论。我实现了以下实验,它spark是一个可变对象,我只是将spark其用作对方法外部和方法内部的同一对象的相同引用。虽然这种不良副作用不是纯粹的函数式实现,但它确实省去了我将spark对象返回给调用者以进行其他后续处理的麻烦。如果其他人有更好的解决方案,请分享。

def x(spark: SparkSession, inputDF: DataFrame) = {
  import spark.implicits._
  spark.conf.set("spark.sql.session.timeZone", "UTC") // mutation of the object inside method

  //...spark.sql.functions...
  finalDF
}

启动spark-shell并执行了以下操作:

Spark context available as 'sc' (master = yarn, app id = application_1234567890_222222).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_REDACTED)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.conf.get("spark.sql.session.timeZone")
res1: String = America/New_York

scala> :load x.scala
x: (spark: org.apache.spark.sql.SparkSession, inputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

scala> val timeConvertedDF = x(spark, inputDF)
timeConvertedDF: org.apache.spark.sql.DataFrame = [att1: timestamp, att2: string ... 25 more fields]

scala> spark.conf.get("spark.sql.session.timeZone")
res4: String = UTC
于 2020-02-25T02:37:47.257 回答