1

所以我的项目流程是 Kafka -> Spark Streaming -> HBase

现在我想再次从 HBase 读取数据,这将遍历上一个作业创建的表并进行一些聚合并将其以不同的列格式存储在另一个表中

Kafka->Spark Streaming(2ms)->HBase->Spark Streaming(10ms)->HBase

现在我不知道如何使用 Spark Streaming 从 HBase 读取数据。我找到了一个 Cloudera 实验室项目,它是 SparkOnHbase(http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/)库,但我不知道如何获得inputDStream 用于来自 HBase 的流处理。如果有任何可以帮助我做到这一点,请提供任何指针或库链接。

4

2 回答 2

0

您可以使用 queueStream 从 RDD 队列创建 DStream: StreamingContext

JavaSparkContext sc = new JavaSparkContext(conf);
org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
JavaHBaseContext jhbc = new JavaHBaseContext(sc, hconf);
Scan scan1 = new Scan();           
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getBytes());

// Create RDD
         rdd = jhbc.hbaseRDD(tableName, scan1, new Function<Tuple2<ImmutableBytesWritable, Result>, Tuple2<ImmutableBytesWritable, Result>>() {
            @Override
            public Tuple2<ImmutableBytesWritable, Result> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
                return immutableBytesWritableResultTuple2;
            }
        });

   // Create streaming context and queue
   JavaSparkStreamingContext ssc = new JavaSparkStramingContext(sc);

   Queue<JavaRDD<Tuple2<ImmutableBytesWritable, Result> >> queue =new Queue<JavaRDD<Tuple2<ImmutableBytesWritable, Result>>>( );
        queue.enqueue(rdd);

JavaDStream<Tuple2<ImmutableBytesWritable, Result>> ssc.queueStream(queue);

PS:你可以只使用 Spark(没有流式传输)

于 2016-07-26T14:31:17.643 回答
0

Splice Machine (Open Source) 有一个演示火花流运行的演示。

http://community.splicemachine.com/category/tutorials/data-ingestion-streaming/

这是此用例的示例代码。

https://github.com/splicemachine/splice-community-sample-code/tree/master/tutorial-kafka-spark-streaming

于 2016-08-03T20:47:57.897 回答