我需要使用 Spark API 从 HBase 中提取数据,并像 SparkSQL 一样在数据顶部进行查询。
我做的事情如下:
- 创建 Spark conf 对象
- 创建 HBase 对象
- 编写 JAVPairRDD 来获取记录。
我的主类代码如下:
import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.examples.sql.JavaSparkSQL;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import com.lsr.LSRTar;
import com.lsr.utils.*;
import com.spark.hbase.TestData;
public class SparkSQLHBaseMain extends SparkJob implements Serializable{
public static final String APPNAME = "Spark-SQL HBase Application";
public static Logger logger = LoggerFactory.getLogger(SparkSQLHBaseMain.class);
public static void main(String[] args) {
logger.info("Calling method runJob()");
new SparkSQLHBaseMain().runJob();
}
public void runJob() {
final JavaSparkContext javaSparkContext = getSparkContext(APPNAME);
logger.info("Spark Object created !!!");
SQLContext sqlContext = new SQLContext(javaSparkContext);
Configuration hbaseConfig = HBaseUtils.getHBaseConf();
hbaseConfig.set(TableInputFormat.INPUT_TABLE, "emp");
hbaseConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "a"); // column family
hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "a:id a:name");
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
javaSparkContext.newAPIHadoopRDD(hbaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
JavaRDD<TestData> rowPairRDD = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, TestData>() {
public TestData call(Tuple2<ImmutableBytesWritable, Result> entry) throws Exception {
TestData cd = new TestData();
Result r = entry._2;
String keyRow = Bytes.toString(r.getRow());
cd.setRowkey(keyRow);
cd.setId((String) Bytes.toString(r.getValue(Bytes.toBytes("a"), Bytes.toBytes("id"))));
cd.setName((String) Bytes.toString(r.getValue(Bytes.toBytes("a"), Bytes.toBytes("name"))));
return cd;
}
});
System.out.println("Result : \n"+"ID : "+rowPairRDD.id()+"Name : "+rowPairRDD.name());
DataFrame dataFrame = sqlContext.createDataFrame(rowPairRDD, TestData.class);
dataFrame.show();
dataFrame.cache();
dataFrame.repartition(100);
dataFrame.printSchema();
}
}
我的 Bean 类代码如下:
package com.spark.hbase;
import java.io.Serializable;
public class TestData extends java.lang.Object implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
String keyRow;
String id;
String name;
public void setRowkey(String keyRow) {
this.keyRow = keyRow;
}
public void setId(String id) {
this.id = id;
}
public void setName(String name) {
this.name = name;
}
}
得到以下异常:
17/01/18 12:28:54 INFO HBaseUtils: HBase is running !!!
HBase is running!
17/01/18 12:28:56 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 214.5 KB, free 214.5 KB)
17/01/18 12:28:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.0 KB, free 239.5 KB)
17/01/18 12:28:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43929 (size: 25.0 KB, free: 257.8 MB)
17/01/18 12:28:57 INFO SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkSQLHBaseMain.java:49
Result :
ID : 1Name : null
Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:110)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:109)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:109)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:54)
at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:941)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:572)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:591)
at com.ril.jio.spark.sql.SparkSQLHBaseMain.runJob(SparkSQLHBaseMain.java:63)
at com.ril.jio.spark.sql.SparkSQLHBaseMain.main(SparkSQLHBaseMain.java:35)
我的 Spark 和 HBase 工作正常。
请帮我解决这个问题。