我正在尝试从 MySQL 读取数据,但它正在抛出 NullPointerException。不知道是什么原因。代码在main.scala
object main extends App {
val dt = args.lift(0)
if (dt.isEmpty || !PairingbatchUtil.validatePartitionDate(dt.get)) {
throw new Exception("Partition date is mandatory or enter valid format 'yyyy-MM-dd'")
}
var mailProperties:Properties = new Properties
var templateMappingData: Map[String, Object] = Map(
"job" -> "Load merchant count Data from hdfs to mongo",
"jobProcessedDate" -> dt.get,
"batch" -> "Pairing Batch")
val startTime = System.currentTimeMillis()
try {
val conf = new SparkConf().setAppName("read_from_mysql") //.setMaster("local")
conf.set("spark.sql.warehouse.dir", "/user/local/warehouse/")
conf.set("hive.exec.dynamic.partition", "true")
conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1/db.table_name")
conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1/db.table_name")
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val schemaName = "/user/local/warehouse/"
val aid = "1000"
val resultPath = "/usr/local/proad" + "/" + dt.get
val dbDataPartitionsMap = Map("aid" -> aid, "dt" -> dt.get)
spark.sql("set aid=" + aid)
spark.sql("set dt=" + dt.get)
val configs = spark.sparkContext.getConf.getAll
configs.foreach(i => println(i))
val registerBaseTablesMap = Map(
"DAILY_COUNT" -> ("SELECT * FROM " + schemaName + ".table_name WHERE aid = '${aid}' and dt ='${dt}'"),
"DAILY_COUNT_FINAL" -> ("SELECT * FROM " + schemaName + ".second_table_name WHERE aid = '${aid}' and dt ='${dt}'"))
val parentDF = PairingbatchUtil.readDataFromHive(registerBaseTablesMap.get("DAILY_COUNT").get, spark)
val finalMerchantAffiliateDailyCountDF = Processor.process(parentDF, dbDataPartitionsMap, spark)
}
代码在Processor.scala
object Processor {
case class MerchantDailyCount( _id: String, date: Date, totalClicks: String, totalLinks: String, shopUrl: String, shopUUID: String, shopName: String, publisherId: String)
def process(parentDF: DataFrame, partitionsMap: Map[String, String], spark: SparkSession): DataFrame = {
val schemaString = "_id date total_clicks total_links shop_url shop_uuid shop_name publisher_id"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
var finalDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
parentDF.foreach(row => {
if (parentDF == null || row.getAs("publisher_id") == null || StringUtils.isBlank(row.getAs("shop_uuid"))) {
} else {
val shopUUID = row.getAs("shop_uuid").toString
val currentDate = row.getAs("cur_date").toString
val date = PairingbatchUtil.parseDate(currentDate, Constants.DATE_FORMAT_YYYY_MM_DD, Constants.TAIWAN_TIMEZONE)
val publisherId = row.getAs("publisher_id").toString
val totalClicks = row.getAs("total_clicks").toString
val totalLinks = row.getAs("total_links").toString
val shopUrl = PairingbatchUtil.setShopUrlInfo(shopUUID, "com.mysql.jdbc.Driver", "user_mame", "password", s"""select shop_url, shop_name from db.table_name where shop_uuid ='$shopUUID'""", "shopUrl", spark)._1
val id = PairingbatchUtil.isNeedToSet(spark, shopUrl, publisherId, date)
val merchantDailyCount = MerchantDailyCount(id, date, totalClicks, totalLinks, shopUrl,shopUUID,shopName,publisherId)
import spark.implicits._
val merchantCountDF = Seq(merchantDailyCount).toDF()
finalDF = finalDF.union(merchantCountDF)
}
})
finalDF
}
}
代码PairingBatchUtil.scala
:
def setShopUrlInfo(shopUUID: String, driverClass: String, user: String, pass: String, query: String, url: String, sparkSession: SparkSession)={
val merchantDetailsDF = sparkSession.read //line no 139
.format("jdbc")
.option("url", url)
.option("driver", driverClass)
.option("dbtable", s"( $query ) t")
.option("user",user)
.option("password", pass)
.load()
if (merchantDetailsDF.count() == 0) {
("INVALID SHOP URL","INVALID SHOP NAME")
}else {
(merchantDetailsDF.select(col = "shop_url").first().getAs("shop_url"),merchantDetailsDF.select(col = "shop_name").first().getAs("shop_name"))
}
}
我希望查询的输出是:
+--------------+---------+
| shop_url|shop_name|
+--------------+---------+
| parimal | roy |
+--------------+---------+
但实际输出是:
19/07/04 14:48:50 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:117)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:115)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:613)
at com.rakuten.affiliate.order.pairing.batch.util.PairingbatchUtil$.setShopUrlInfo(PairingbatchUtil.scala:139)
at com.rakuten.affiliate.order.pairing.batch.Processors.MechantAffDailyCountProcessor$$anonfun$process$1.apply(MechantAffDailyCountProcessor.scala:40)
at com.rakuten.affiliate.order.pairing.batch.Processors.MechantAffDailyCountProcessor$$anonfun$process$1.apply(MechantAffDailyCountProcessor.scala:30)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)