14

我有创建火花上下文的主要内容:

    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

然后创建数据框并对数据框进行过滤和验证。

    val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00")

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0))
    // record length cannot be < 2 
    .na.drop(3)
    // round to hours
    .withColumn("time",convertToHourly($"time"))

这很好用。

但是当我尝试通过将数据帧发送到

function ValidateAndTransform(df: DataFrame) : DataFrame = {...}

获取数据框并进行验证和转换:似乎我需要

 import sqlContext.implicits._

为避免错误:“value $ is not a member of StringContext”在线发生:.withColumn("time",convertToHourly( $ "time"))

但是要使用 import sqlContext.implicits._ 我还需要sqlContext在新文件中定义的任何一个,如下所示:

val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

或发送到

function ValidateAndTransform(df: DataFrame) : DataFrame = {...}
function

我觉得我试图对 2 个文件(主要和验证)进行的分离没有正确完成......

关于如何设计这个的任何想法?或者只是将 sqlContext 发送到函数?

谢谢!

4

1 回答 1

14

您可以使用 SQLContext 的单例实例。您可以在spark 存储库中查看此示例

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
...
//And wherever you want you can do
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._  
于 2015-09-08T15:30:05.440 回答