2

我正在尝试使用 Apache Spark 准备一个要在 HBase 上以 HFile 格式存储的 DataFrame。我正在使用 Spark 2.1.0、Scala 2.11 和 HBase 1.1.2

这是我的代码:

val df = createDataframeFromRow(Row("mlk", "kpo", "opi"), "a b c")

val cols = df.columns.sorted
val colsorteddf = df.select(cols.map(x => col(x)): _*)
val valcols = cols.filterNot(x => x.equals("U_ID"))

到目前为止,一切都很好。我只对数据框的列进行排序

val pdd = colsorteddf.map(row => {
  (row(0).toString, (row(1).toString, row(2).toString))
})

val tdd = pdd.flatMap(x => {
  val rowKey = PLong.INSTANCE.toBytes(x._1)
  for(i <- 0 until valcols.length - 1) yield {
    val colname = valcols(i).toString
    val colvalue = x._2.productElement(i).toString
    val colfam = "data"

    (rowKey, (colfam, colname, colvalue))
  }
})

在此之后,我将每一行转换为这种键值格式(rowKey,(colfam,colname,colvalue))

不,这是问题发生的时候。我尝试将 tdd 的每一行映射成一对 (ImmutableBytesWritable, KeyValue)

import org.apache.hadoop.hbase.KeyValue

val output = tdd.map(x => {
  val rowKey: Array[Byte] = x._1
  val immutableRowKey = new ImmutableBytesWritable(rowKey)
  val colfam = x._2._1
  val colname = x._2._2
  val colvalue = x._2._3

  val kv = new KeyValue(
    rowKey,
    colfam.getBytes(),
    colname.getBytes(),
    Bytes.toBytes(colvalue.toString)
  )

  (immutableRowKey, kv)
})

它呈现此堆栈跟踪:

java.lang.AssertionError: assertion failed: no symbol could be loaded from interface org.apache.hadoop.hbase.classification.InterfaceAudience$Public in object InterfaceAudience with name Public and classloader scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@3269cbb7
  at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1021)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38)
  at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.<init>(JavaMirrors.scala:163)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:140)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:133)
  at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:168)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.typeParams(SynchronizedSymbols.scala:132)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.typeParams(SynchronizedSymbols.scala:168)
  at scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:1926)
  at scala.reflect.internal.Types$NoArgsTypeRef.isHigherKinded(Types.scala:1925)
  at scala.reflect.internal.transform.UnCurry$class.scala$reflect$internal$transform$UnCurry$$expandAlias(UnCurry.scala:22)
  at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:26)
  at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:24)
  at scala.collection.immutable.List.loop$1(List.scala:173)
  at scala.collection.immutable.List.mapConserve(List.scala:189)
  at scala.reflect.internal.tpe.TypeMaps$TypeMap.mapOver(TypeMaps.scala:115)
  at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:46)
  at scala.reflect.internal.transform.Transforms$class.transformedType(Transforms.scala:43)
  at scala.reflect.internal.SymbolTable.transformedType(SymbolTable.scala:16)
  at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:225)
  at scala.

这似乎是一个scala问题。有没有人遇到过同样的问题?如果是这样,您是如何克服这一点的?

PS:我正在使用通过 spark-shell 运行此代码。

4

0 回答 0