2

我可以使用以下代码读取来自 Kafka 的消息:

val ssc = new StreamingContext(sc, Seconds(50)) 
val topicmap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap)

但是,我正在尝试从 Kafka 读取每条消息并将其放入 HBase。这是我写入 HBase 的代码,但没有成功。

lines.foreachRDD(rdd => {
  rdd.foreach(record => {
    val i = +1
    val hConf = new HBaseConfiguration() 
    val hTable = new HTable(hConf, "test") 
    val thePut = new Put(Bytes.toBytes(i)) 
    thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(record)) 
  })
})
4

2 回答 2

5

好吧,您实际上并没有执行 Put,您只是在创建一个 Put 请求并向其添加数据。你缺少的是一个

hTable.put(thePut);
于 2014-12-02T10:05:10.160 回答
3

添加其他答案!

您可以使用foreachPartition执行程序级别建立连接以提高效率,而不是昂贵的操作每一行。

lines.foreachRDD(rdd => {

    rdd.foreachPartition(iter => {

      val hConf = new HBaseConfiguration() 
      val hTable = new HTable(hConf, "test") 

      iter.foreach(record => {
        val i = +1
        val thePut = new Put(Bytes.toBytes(i)) 
        thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(record)) 

        //missing part in your code
        hTable.put(thePut);
      })
    })
})
于 2018-04-18T07:23:21.340 回答