0

我是 Spark SQL 的新手。在 Spark Sql Query 中没有 Concat 函数,为此我们注册了一个 sql 函数,在这个函数中我需要访问另一个表。为此,我们在 SQLContext 对象上编写了 spark sql 查询。当我调用此查询时,我得到 NullpointerException。请您帮忙解决这个问题。

提前致谢

//这是我的代码

class SalesHistory_2(sqlContext:SQLContext,sparkContext:SparkContext) extends Serializable   {

import sqlContext._
import sqlContext.createSchemaRDD
try{


sqlContext.registerFunction("MaterialTransformation", Material_Transformation _)

  def Material_Transformation(Material_ID: String): String = 
{     

   var material:String =null;
      var dd = sqlContext.sql("select * from product_master")      

material
}     



  /* Product master*/
    val productRDD = this.sparkContext.textFile("D:\\Realease 8.0\\files\\BHI\\BHI_SOP_PRODUCT_MASTER.txt")        

  val product_schemaString = productRDD.first

  val product_withoutHeaders = dropHeader(productRDD)

  val product_schema = StructType(product_schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true)))

  val productdata = product_withoutHeaders.map{_.replace("|", "| ")}.map(x=> x.split("\\|"))
var product_rowRDD = productdata.map(line=>{
 Row.fromSeq(line.map {_.trim() })
})

    val product_srctableRDD = sqlContext.applySchema(product_rowRDD, product_schema)

    product_srctableRDD.registerTempTable("product_master")
    cacheTable("product_master")

    /* Customer master*/


    /* Sales History*/



  val srcRDD = this.sparkContext.textFile("D:\\Realease 8.0\\files\\BHI\\BHI_SOP_TRADE_SALES_HISTORY_DS_4_20150119.txt")

  val schemaString= srcRDD.first

  val withoutHeaders = dropHeader(srcRDD)    

  val schema = StructType(schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true)))

  val lines = withoutHeaders.map {_.replace("|", "| ")}.map(x=> x.split("\\|"))
var rowRDD = lines.map(line=>{
 Row.fromSeq(line.map {_.trim() })
})

val srctableRDD = sqlContext.applySchema(rowRDD, schema)

srctableRDD.registerTempTable("SALES_HISTORY")  



 val srcResults = sqlContext.sql("SELECT Delivery_Number,Delivery_Line_Item,MaterialTransformation(Material_ID),Customer_Group_Node,Ops_ID,DC_ID,Mfg_ID,PGI_Date,Delivery_Qty,Customer_Group_Node,Line_Total_COGS,Line_Net_Rev,Material_Description,Sold_To_Partner_Name,Plant_Description,Originating_Doc,Orig_Doc_Line_item,Revenue_Type,Material_Doc_Ref,Mater_Doc_Ref_Item,Req_Delivery_Date FROM  SALES_HISTORY")

 val path: Path = Path ("D:/Realease 8.0/files/output/")
try {
  path.deleteRecursively(continueOnFailure = false) 
} catch {
  case e: IOException => // some file could not be deleted
}

  val successRDDToFile = srcResults.map { x => x.mkString("|")} 
  successRDDToFile.coalesce(1).saveAsTextFile("D:/Realease 8.0/files/output/")      

  }
  catch {
    case ex: Exception => println(ex) // TODO: handle error
  }   

this.sparkContext.stop()    

def dropHeader(数据:RDD[String]):RDD[String] = {

data.mapPartitionsWithIndex((idx, lines) => {
  if (idx == 0) {
    lines.drop(1)
  }
  lines
})

}

4

2 回答 2

1

这里的答案相当简短,可能令人失望——你根本不能做这样的事情。

Spark 中的一般规则是您不能从另一个操作和转换触发操作或转换,或者更准确地说,驱动程序之外的 Spark 上下文不再可访问/定义。

于 2015-07-30T18:35:07.657 回答
0

为 Sales History RDD 中的每一行调用 Spark SQL 看起来是个非常糟糕的主意:

val srcResults = sqlContext.sql("SELECT Delivery_Number,Delivery_Line_Item,MaterialTransformation(Material_ID),Customer_Group_Node,Ops_ID,DC_ID,Mfg_ID,PGI_Date,Delivery_Qty,Customer_Group_Node,Line_Total_COGS,Line_Net_Rev,Material_Description,Sold_To_Partner_Name,Plant_Description,Originating_Doc,Orig_Doc_Line_item,Revenue_Type,Material_Doc_Ref,Mater_Doc_Ref_Item,Req_Delivery_Date FROM  SALES_HISTORY")

您最好在您的 RDD 之间加入一个用户并忘记您的自定义函数:

val srcResults = sqlContext.sql("SELECT s.*, p.* FROM  SALES_HISTORY s join product_master p on s.Material_ID=p.ID")
于 2015-02-12T10:38:58.587 回答