我对 Kafka + Hbase 很陌生。感谢你的帮助。
我们得到了一个 Kafka 消息流,并希望将数据插入到从 Kafka 读取的 Hbase 中。
我编写了一个 java 消费者(单主题、单分区和一个线程)来读取来自主题的消息。一切皆好。在线程中,我试图连接 Hbase 以将消息插入到不起作用的表中。当它试图连接到 hbase(创建 HTable 的实例)时,读取流的 kafka 线程被杀死了。如何克服这一点将数据插入 Hbase?非常感谢您在这个问题上的想法和帮助。
Kafka 高级消费者代码
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
// Read Message
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
//Load into HBase
// Instantiating Configuration class
Configuration config = HBaseConfiguration.create();
config.clear();
//config.set("hbase.zookeeper.quorum", "localhost");
//config.set("hbase.zookeeper.property.clientPort","2181");
config.set("hbase.zookeeper.quorum", "localhost");
config.set("hbase.zookeeper.property.clientPort","2181");
config.set("hbase.master", "localhost");
// Instantiating HTable class
HTable hTable = new HTable(config, "device");
// Instantiating Put class
// accepts a row name.
Put p = new Put(Bytes.toBytes("id14"));
// adding values using add() method
// accepts column family name, qualifier/row name ,value
p.add(Bytes.toBytes("d_name"), Bytes.toBytes("primary"),Bytes.toBytes("ios"));
// Saving the put Instance to the HTable.
hTable.put(p);
// closing HTable
hTable.close();