3

我在 Scala 中创建了一个hiveContextinmain()函数,我需要将参数传递hiveContext给其他函数,这是结构:

object Project {
    def main(name: String): Int = {
      val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
      ... 
    } 
    def read (streamId: Int, hc:hiveContext): Array[Byte] = {
    ... 
    } 
    def close (): Unit = {
    ...
    }
 }

但它不起作用。函数read()在内部被调用main()

任何想法?

4

2 回答 2

2

我将 hiveContext 声明为隐式,这对我有用

implicit val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)

在 MyJob 中定义:

override def run(config: Config)(implicit sqlContext: SQLContext): Unit = ...

但如果你不希望它隐含,这应该是一样的

val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)(sqlContext)

override def run(config: Config)(sqlContext: SQLContext): Unit = ...

此外,您的函数 read 应该接收 HiveContext 作为参数 hc 的类型,而不是 hiveContext

def read (streamId: Int, hc:HiveContext): Array[Byte] = 
于 2016-06-01T10:36:57.770 回答
1

我尝试了几种选择,这最终对我有用..

object SomeName extends App {

val conf = new SparkConf()...
val sc = new SparkContext(conf)

implicit val sqlC = SQLContext.getOrCreate(sc)
getDF1(sqlC)

def getDF1(sqlCo: SQLContext): Unit = {
    val query1 =  SomeQuery here  
    val df1 = sqlCo.read.format("jdbc").options(Map("url" -> dbUrl,"dbtable" -> query1)).load.cache()

 //iterate through df1 and retrieve the 2nd DataFrame based on some values in the Row of the first DataFrame

  df1.foreach(x => {
    getDF2(x.getString(0), x.getDecimal(1).toString, x.getDecimal(3).doubleValue) (sqlCo)
  })     
}

def getDF2(a: String, b: String, c: Double)(implicit sqlCont: SQLContext) :  Unit = {
  val query2 = Somequery

  val sqlcc = SQLContext.getOrCreate(sc)
  //val sqlcc = sqlCont //Did not work for me. Also, omitting (implicit sqlCont: SQLContext) altogether did not work
  val df2 = sqlcc.read.format("jdbc").options(Map("url" -> dbURL, "dbtable" -> query2)).load().cache()
   .
   .
   .
 }
}

注意:在上面的代码中,如果我在 getDF2 方法签名中省略了 (implicit sqlCont: SQLContext) 参数,它将不起作用。我尝试了其他几种将 sqlContext 从一种方法传递到另一种方法的选项,它总是给我 NullPointerException 或 Task not serializable Excpetion。好消息是它最终以这种方式工作,我可以从 DataFrame1 的一行中检索参数,并在加载 DataFrame 2 时使用这些值。

于 2017-02-07T05:19:26.437 回答