我需要创建一个类模式来支持 29 个字段。由于案例类有 22 个字段的限制,我尝试使用 Product 接口扩展我的类“sdp_d”,如下所示:
class sdp_d( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String], IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String], ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String], BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String], LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String], INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME :Option[java.sql.Timestamp]) extends Product{
@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match
{
case 0 => WID
case 1 => BATCH_ID
case 2 => SRC_ID
case 3 => ORG_ID
case 4 => CLASS_WID
case 5 => DESC_TEXT
case 6 => PREMISE_WID
case 7 => FEED_LOC
case 8 => GPS_LAT
case 9 => GPS_LONG
case 10 => PULSE_OUTPUT_BLOCK
case 11 => UDC_ID
case 12 => UNIVERSAL_ID
case 13 => IS_VIRTUAL_FLG
case 14 => SEAL_INFO
case 15 => ACCESS_INFO
case 16 => ALT_ACCESS_INFO
case 17 => LOC_INFO
case 18 => ALT_LOC_INFO
case 19 => TYPE
case 20 => SUB_TYPE
case 21 => TIMEZONE_ID
case 22 => GIS_ID
case 23 => BILLED_UPTO_TIME
case 24 => POWER_STATUS
case 25 => LOAD_STATUS
case 26 => BILLING_HOLD_STATUS
case 27 => INSERT_TIME
case 28 => LAST_UPD_TIME
case _ => throw new IndexOutOfBoundsException(n.toString())
}
override def productArity: Int = 29
override def canEqual(that: Any): Boolean = that.isInstanceOf[sdp_d]
}
这定义了类“sdp_d”。但是,当我尝试使用此预定义模式加载 csv 数据并将其注册为表时,我收到错误消息:
> scala> import java.text.SimpleDateFormat; val sdf = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss.S"); import java.util.Calendar; import java.util.Date; val calendar = Calendar.getInstance()
import java.text.SimpleDateFormat
sdf: java.text.SimpleDateFormat = java.text.SimpleDateFormat@cce61785
import java.util.Calendar
import java.util.Date
calendar: java.util.Calendar = java.util.GregorianCalendar[time=1424687963209,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Asia/Kolkata",offset=19800000,dstSavings=0,useDaylight=false,transitions=6,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2015,MONTH=1,WEEK_OF_YEAR=9,WEEK_OF_MONTH=4,DAY_OF_MONTH=23,DAY_OF_YEAR=54,DAY_OF_WEEK=2,DAY_OF_WEEK_IN_MONTH=4,AM_PM=1,HOUR=4,HOUR_OF_DAY=16,MINUTE=9,SECOND=23,MILLISECOND=209,ZONE_OFFSET=19800000,DST_OFFSET=0]
> scala> sc.textFile("hdfs://CDH-Master-1.cdhcluster/user/spark/Sdp_d.csv").map(_.split(",")).map { r =>
| val upto_time = sdf.parse(r(23).trim);
| calendar.setTime(upto_time);
| val r23 = new java.sql.Timestamp(upto_time.getTime);
|
| val insert_time = sdf.parse(r(26).trim);
| calendar.setTime(insert_time);
| val r26 = new java.sql.Timestamp(insert_time.getTime);
|
| val last_upd_time = sdf.parse(r(27).trim);
| calendar.setTime(last_upd_time);
| val r27 = new java.sql.Timestamp(last_upd_time.getTime);
|
| sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)
| }.registerAsTable("sdp")
<console>:36: error: not found: value sdp_d
sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)
^`
我在火花壳工作。Spark 版本 1.1.0 和 Scala 版本 2.10.4。
我不明白为什么错误:未找到:值 sdp_d。
当我创建自己的扩展产品接口的类时,我应该如何注册?
请帮助解决错误。