11

在 spark 文档中,很清楚如何从RDD您自己的案例类中创建镶木地板文件;(来自文档)

val people: RDD[Person] = ??? // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

但不清楚如何转换回来,真的我们想要一个readParquetFile我们可以做的方法:

val people: RDD[Person] = sc.readParquestFile[Person](path)

定义案例类的那些值是由方法读取的值。

4

4 回答 4

7

一个简单的方法是提供您自己的转换器(Row) => CaseClass。这有点手动,但如果你知道你在读什么,它应该很简单。

这是一个例子:

import org.apache.spark.sql.SchemaRDD

case class User(data: String, name: String, id: Long)

def sparkSqlToUser(r: Row): Option[User] = {
    r match {
      case Row(time: String, name: String, id: Long) => Some(User(time,name, id))
      case _ => None
    }
}

val parquetData: SchemaRDD = sqlContext.parquetFile("hdfs://localhost/user/data.parquet")

val caseClassRdd: org.apache.spark.rdd.RDD[User] = parquetData.flatMap(sparkSqlToUser)
于 2015-06-25T22:20:26.240 回答
6

我想出的最好的解决方案需要对新类进行最少的复制和粘贴,如下所示(不过我仍然希望看到另一个解决方案)

首先,您必须定义您的案例类和(部分)可重用的工厂方法

import org.apache.spark.sql.catalyst.expressions

case class MyClass(fooBar: Long, fred: Long)

// Here you want to auto gen these functions using macros or something
object Factories extends java.io.Serializable {
  def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T = 
    fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long])
}

一些已经可用的样板

import scala.reflect.runtime.universe._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD

魔法

import scala.reflect.ClassTag
import org.apache.spark.sql.SchemaRDD

def camelToUnderscores(name: String) = 
  "[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase())

def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect {
  case m: MethodSymbol if m.isCaseAccessor => m
}.toList.map(_.toString)

def caseClassToSQLCols[T: TypeTag]: List[String] = 
  getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores)

def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = {
  val tmpName = "tmpTableName" // Maybe should use a random string
  schemaRDD.registerAsTable(tmpName)
  sqlContext.sql("SELECT " + caseClassToSQLCols[T].mkString(", ") + " FROM " + tmpName)
  .map(fac)
}

示例使用

val parquetFile = sqlContext.parquetFile(path)

val normalRDD: RDD[MyClass] = 
  schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply))

也可以看看:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html

尽管我没有通过 JIRA 链接找到任何示例或文档。

于 2014-10-03T14:54:27.967 回答
0

在 Spark 1.2.1 中有一个使用 pyspark 将模式 rdd 转换为 rdd 的简单方法。

sc = SparkContext()  ## create SparkContext
srdd = sqlContext.sql(sql)
c = srdd.collect()  ## convert rdd to list
rdd = sc.parallelize(c)

使用scala必须有类似的方法。

于 2015-03-25T09:46:56.023 回答
-1

非常粗鲁的尝试。非常不相信这会有不错的表现。当然必须有一个基于宏观的替代方案......

import scala.reflect.runtime.universe.typeOf
import scala.reflect.runtime.universe.MethodSymbol
import scala.reflect.runtime.universe.NullaryMethodType
import scala.reflect.runtime.universe.TypeRef
import scala.reflect.runtime.universe.Type
import scala.reflect.runtime.universe.NoType
import scala.reflect.runtime.universe.termNames
import scala.reflect.runtime.universe.runtimeMirror

schemaRdd.map(row => RowToCaseClass.rowToCaseClass(row.toSeq, typeOf[X], 0))

object RowToCaseClass {
  // http://dcsobral.blogspot.com/2012/08/json-serialization-with-reflection-in.html
  def rowToCaseClass(record: Seq[_], t: Type, depth: Int): Any = {
    val fields = t.decls.sorted.collect {
      case m: MethodSymbol if m.isCaseAccessor => m
    }
    val values = fields.zipWithIndex.map {
      case (field, i) =>
        field.typeSignature match {
          case NullaryMethodType(sig) if sig =:= typeOf[String] => record(i).asInstanceOf[String]
          case NullaryMethodType(sig) if sig =:= typeOf[Int] => record(i).asInstanceOf[Int]
          case NullaryMethodType(sig) =>
            if (sig.baseType(typeOf[Seq[_]].typeSymbol) != NoType) {
              sig match {
                case TypeRef(_, _, args) =>
                  record(i).asInstanceOf[Seq[Seq[_]]].map {
                    r => rowToCaseClass(r, args(0), depth + 1)
                  }.toSeq
              }
            } else {
              sig match {
                case TypeRef(_, u, _) =>
                  rowToCaseClass(record(i).asInstanceOf[Seq[_]], sig, depth + 1)
              }
            }
        }
    }.asInstanceOf[Seq[Object]]
    val mirror = runtimeMirror(t.getClass.getClassLoader)
    val ctor = t.member(termNames.CONSTRUCTOR).asMethod
    val klass = t.typeSymbol.asClass
    val method = mirror.reflectClass(klass).reflectConstructor(ctor)
    method.apply(values: _*)
  }
}
于 2015-03-30T04:41:29.500 回答