2

我需要创建一个类模式来支持 29 个字段。由于案例类有 22 个字段的限制,我尝试使用 Product 接口扩展我的类“sdp_d”,如下所示:

class sdp_d( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int],  DESC_TEXT :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String], IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String], ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String], BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String], LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String], INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME :Option[java.sql.Timestamp]) extends Product{

@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match 
{
    case 0 => WID
    case 1 => BATCH_ID
    case 2 => SRC_ID
    case 3 => ORG_ID
    case 4 => CLASS_WID
    case 5 => DESC_TEXT
    case 6 => PREMISE_WID
    case 7 => FEED_LOC
    case 8 => GPS_LAT
    case 9 => GPS_LONG
    case 10 => PULSE_OUTPUT_BLOCK
    case 11 => UDC_ID
    case 12 => UNIVERSAL_ID
    case 13 => IS_VIRTUAL_FLG
    case 14 => SEAL_INFO
    case 15 => ACCESS_INFO
    case 16 => ALT_ACCESS_INFO
    case 17 => LOC_INFO
    case 18 => ALT_LOC_INFO
    case 19 => TYPE
    case 20 => SUB_TYPE
    case 21 => TIMEZONE_ID
    case 22 => GIS_ID
    case 23 => BILLED_UPTO_TIME
    case 24 => POWER_STATUS
    case 25 => LOAD_STATUS
    case 26 => BILLING_HOLD_STATUS
    case 27 => INSERT_TIME
    case 28 => LAST_UPD_TIME
    case _ => throw new IndexOutOfBoundsException(n.toString())
}

override def productArity: Int = 29
override def canEqual(that: Any): Boolean = that.isInstanceOf[sdp_d]

}

这定义了类“sdp_d”。但是,当我尝试使用此预定义模式加载 csv 数据并将其注册为表时,我收到错误消息:

> scala> import java.text.SimpleDateFormat; val sdf = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss.S"); import java.util.Calendar; import java.util.Date; val calendar = Calendar.getInstance()
import java.text.SimpleDateFormat
sdf: java.text.SimpleDateFormat = java.text.SimpleDateFormat@cce61785
import java.util.Calendar
import java.util.Date
calendar: java.util.Calendar = java.util.GregorianCalendar[time=1424687963209,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Asia/Kolkata",offset=19800000,dstSavings=0,useDaylight=false,transitions=6,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2015,MONTH=1,WEEK_OF_YEAR=9,WEEK_OF_MONTH=4,DAY_OF_MONTH=23,DAY_OF_YEAR=54,DAY_OF_WEEK=2,DAY_OF_WEEK_IN_MONTH=4,AM_PM=1,HOUR=4,HOUR_OF_DAY=16,MINUTE=9,SECOND=23,MILLISECOND=209,ZONE_OFFSET=19800000,DST_OFFSET=0]

    > scala> sc.textFile("hdfs://CDH-Master-1.cdhcluster/user/spark/Sdp_d.csv").map(_.split(",")).map { r =>
         | val upto_time = sdf.parse(r(23).trim);
         | calendar.setTime(upto_time); 
         | val r23 = new java.sql.Timestamp(upto_time.getTime); 
         | 
         | val insert_time = sdf.parse(r(26).trim); 
         | calendar.setTime(insert_time); 
         | val r26 = new java.sql.Timestamp(insert_time.getTime); 
         | 
         | val last_upd_time = sdf.parse(r(27).trim);
         | calendar.setTime(last_upd_time); 
         | val r27 = new java.sql.Timestamp(last_upd_time.getTime); 
         | 
         | sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)
         | }.registerAsTable("sdp")
    <console>:36: error: not found: value sdp_d
                  sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)
                  ^`

我在火花壳工作。Spark 版本 1.1.0 和 Scala 版本 2.10.4。

我不明白为什么错误:未找到:值 sdp_d。

当我创建自己的扩展产品接口的类时,我应该如何注册?

请帮助解决错误。

4

3 回答 3

1

您是否碰巧看过https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

于 2015-02-24T11:06:07.183 回答
0

您应该使用以下方法实例化该类new

new sdp_d(r(0).trim.toInt, r(1).trim.toInt, ...
于 2015-02-24T10:40:18.613 回答
0

您可以:

  1. 使用新关键字实例化new sdp_d(...)
  2. 您将字段声明为 Option[T],例如 Option[Int],因此我们需要将 Option[T] 作为参数(Some 或 None)传递。 new sdp_d(Try(r(0).trim.toInt).toOption, Try(r(1).trim.toInt).toOption, l(2).trim.toOption, ...)

这对我有用:

//AirTraffic.scala
        class AirTraffic(Year:Option[Int], Month:Option[Int], DayOfMonth:Option[Int], DayOfWeek:Option[Int],
                     DepTime:Option[Int], CRSDepTime:Option[Int], ArrTime:Option[Int], CRSArrTime:Option[Int],
                     UniqueCarrier:String, FlightNum:Option[Int], TailNum:String, ActualElapsedTime:Option[Int],
                     CRSElapsedTime:Option[Int], AirTime:Option[Int], ArrDelay:Option[Int], DepDelay:Option[Int],
                     Origin:String, Dest:String, Distance:Option[Int], TaxiIn:Option[Int], TaxiOut:Option[Int],
                     Cancelled:Option[Boolean], CancellationCode:String, Diverted:Option[Boolean], CarrierDelay:Option[Int],
                     WeatherDelay:Option[Int], NASDelay:Option[Int], SecurityDelay:Option[Int], LateAircraftDelay:Option[Int]) extends Product {

      // We declare field with Option[T] type to make that field null-able.

      override def productElement(n: Int): Any =
        n match {
          case 0 => Year
          case 1 => Month
          case 2 => DayOfMonth
          case 3 => DayOfWeek
          case 4 => DepTime
          case 5 => CRSDepTime
          case 6 => ArrTime
          case 7 => CRSArrTime
          case 8 => UniqueCarrier
          case 9 => FlightNum
          case 10 => TailNum
          case 11 => ActualElapsedTime
          case 12 => CRSElapsedTime
          case 13 => AirTime
          case 14 => ArrDelay
          case 15 => DepDelay
          case 16 => Origin
          case 17 => Dest
          case 18 => Distance
          case 19 => TaxiIn
          case 20 => TaxiOut
          case 21 => Cancelled
          case 22 => CancellationCode
          case 23 => Diverted
          case 24 => CarrierDelay
          case 25 => WeatherDelay
          case 26 => NASDelay
          case 27 => SecurityDelay
          case 28 => LateAircraftDelay
          case _ => throw new IndexOutOfBoundsException(n.toString)
        }

      override def productArity: Int = 29

      override def canEqual(that: Any): Boolean = that.isInstanceOf[AirTraffic]
    }

//main.scala    
    val data = sparkContext.textFile("local-input/AIRLINE/2008.csv").map(_.split(","))
          .map(l => new AirTraffic(Try(l(0).trim.toInt).toOption, Try(l(1).trim.toInt).toOption, Try(l(2).trim.toInt).toOption, Try(l(3).trim.toInt).toOption,
          Try(l(4).trim.toInt).toOption, Try(l(5).trim.toInt).toOption, Try(l(6).trim.toInt).toOption, Try(l(7).trim.toInt).toOption,
          l(8).trim, Try(l(9).trim.toInt).toOption, l(10).trim, Try(l(11).trim.toInt).toOption,
          Try(l(12).trim.toInt).toOption, Try(l(13).trim.toInt).toOption, Try(l(14).trim.toInt).toOption, Try(l(15).trim.toInt).toOption,
          l(16).trim, l(17).trim, Try(l(18).trim.toInt).toOption, Try(l(19).trim.toInt).toOption, Try(l(20).trim.toInt).toOption,
          Try(l(21).trim.toBoolean).toOption, l(22).trim, Try(l(23).trim.toBoolean).toOption, Try(l(24).trim.toInt).toOption,
          Try(l(25).trim.toInt).toOption, Try(l(26).trim.toInt).toOption, Try(l(27).trim.toInt).toOption, Try(l(28).trim.toInt).toOption)).toDF()

        // register table with SQLContext
        data.registerTempTable("AirTraffic")

    val count = sqlContext.sql("SELECT COUNT(*) FROM AirTraffic").collect()
        count.foreach(print)

如果您认为它仍然丑陋,我们可以通过以下方式做更多事情:

implicit class StringConverter(val s: String) extends AnyVal {
    def tryGetInt = Try(s.trim.toInt).toOption

    def tryGetString = {
      val res = s.trim
      if (res.isEmpty) None else res
    }

    def tryGetBoolean = Try(s.trim.toBoolean).toOption
  }

然后

val data = sparkContext.textFile("local-input/AIRLINE/2008.csv").map(_.split(","))
      .map(l => new AirTraffic(l(0).tryGetInt, l(1).tryGetInt, l(2).tryGetInt, l(3).tryGetInt,
      l(4).tryGetInt, l(5).tryGetInt, l(6).tryGetInt, l(7).tryGetInt,
      l(8).trim, l(9).tryGetInt, l(10).trim, l(11).tryGetInt,
      l(12).tryGetInt, l(13).tryGetInt, l(14).tryGetInt, l(15).tryGetInt,
      l(16).trim, l(17).trim, l(18).tryGetInt, l(19).tryGetInt, l(20).tryGetInt,
      l(21).tryGetBoolean, l(22).trim, l(23).tryGetBoolean, l(24).tryGetInt,
      l(25).tryGetInt, l(26).tryGetInt, l(27).tryGetInt, l(28).tryGetInt)).toDF()
于 2015-04-28T07:55:23.373 回答