I am trying to load data into hbase using apache flume using a custom sink but I get this error:
ERROR async.HBaseClient: The znode for the -ROOT- region doesn't exist! 14/05/14
17:12:08 ERROR lifecycle.LifecycleSupervisor: Unable to start SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@923288b counterGroup:{ name:null
counters:{} } } - Exception follows. org.apache.flume.FlumeException: Interrupted while waiting for Hbase Callbacks at org.apache.flume.sink.hbase.AsyncHBaseSink.start(AsyncHBaseSink.java:379)
Note: my hbase run properly and I can create table, put data and get with a java client.
this my flume.conf:
# A single-node Flume configuration
# uses exec and tail and will write a file every 10K records or every 1 min
# Name the components on this agent
agent3.sources = source1
agent3.sinks = sink1
agent3.channels = channel1
# Describe/configure source1
agent3.sources.source1.type = exec
agent3.sources.source1.command = tail -f /tmp/testGenerate.csv
# Describe sink1
agent3.sinks.sink1.type =org.apache.flume.sink.hbase.AsyncHBaseSink
agent3.sinks.sink1.table = AdreamLumiHB
agent3.sinks.sink1.columnFamily =lumiCF
agent3.sinks.sink1.batchSize=5000
#agent3.sinks.sink1.serializer = com.hbase.log.util.SplittingSerializer
agent3.sinks.sink1.serializer =org.apache.flume.sink.hbase.SplittingSerializer
agent3.sinks.sink1.zookeeperQuorum=localhost
agent3.sinks.sink1.znodeParent=/hbase
#agent3.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
#agent3.sinks.sink1.serializer.regex =
#agent3.sinks.sink1.serializer.regexIgnoreCase = true
agent3.sinks.sink1.serializer.columns = id,nom,valeur,batiment,etage,piece
# Use a channel which buffers events to a file
# -- The component type name, needs to be FILE.
agent3.channels.channel1.type = FILE
# checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
# dataDirs ~/.flume/file-channel/data The directory where log files will be stored
# The maximum size of transaction supported by the channel
agent3.channels.channel1.transactionCapacity = 1000000
# Amount of time (in millis) between checkpoints
agent3.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file
agent3.channels.channel1.maxFileSize = 2146435071
# Maximum capacity of the channel
agent3.channels.channel1.capacity 10000000
#keep-alive 3 Amount of time (in sec) to wait for a put operation
#write-timeout 3 Amount of time (in sec) to wait for a write operation
# Bind the source and sink to the channel
agent3.sources.source1.channels = channel1
agent3.sinks.sink1.channel = channel1
this is my hbase-site.xml
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:8020/hbase-0.98.1</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>/home/alpha/hadoop_data/hbase-${user.name}</value>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>0.0.0.0</value>
</property>
<property>
<name>hbase.master</name>
<value>localhost:60000</value>
<description>The host and port that the HBase master runs at.</description>
</property>
<property>
<name>hbase.master.info.port</name>
<value>60010</value>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>0.0.0.0</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/alpha/hadoop_data/hbase-data</value>
</property>
</configuration>
this is hbase-env.sh
export HBASE_CLASSPATH=/home/alpha/hbase-0.98.1/lib
export HBASE_OPTS="-XX:+UseConcMarkSweepGC"
export JAVA_HOME=/usr
export HADOOP_HOME=/home/alpha/hadoop-1.2.1
export HADOOP_CONF_DIR=/home/alpha/hadoop-1.2.1/conf
export HBASE_LOG_DIR=/home/alpha/hbase-0.98.1/logs
export HBASE_PID_DIR=/home/alpha/hbase-0.98.1/pids
export HBASE_MANAGES_ZK=true
this is custom sink
package org.apache.flume.sink.hbase;
import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import org.apache.flume.conf.ComponentConfiguration;
/**
* A serializer for the AsyncHBaseSink, which splits the event body into
* multiple columns and inserts them into a row whose key is available in
* the headers
*
* Originally from https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase
*
*/
public class SplittingSerializer implements AsyncHbaseEventSerializer {
private byte[] table;
private byte[] colFam;
private Event currentEvent;
private byte[][] columnNames;
private final List<PutRequest> puts = new ArrayList<PutRequest>();
private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
private byte[] currentRowKey;
private final byte[] eventCountCol = "eventCount".getBytes();
private String delim;
@Override
public void initialize(byte[] table, byte[] cf) {
this.table = table;
this.colFam = cf;
}
@Override
public void setEvent(Event event) {
// Set the event and verify that the rowKey is not present
this.currentEvent = event;
String rowKeyStr = currentEvent.getHeaders().get("rowKey");
if (rowKeyStr == null) {
throw new FlumeException("No row key found in headers!");
}
currentRowKey = rowKeyStr.getBytes();
}
@Override
public List<PutRequest> getActions() {
// Split the event body and get the values for the columns
String eventStr = new String(currentEvent.getBody());
String[] cols = eventStr.split(",");
//String[] cols = eventStr.split(regEx);
//String[] cols = eventStr.split("\\s+");
//String[] cols = eventStr.split("\\t");
// String[] cols = eventStr.split(delim);
puts.clear();
/* String[] columnFamilyName;
byte[] bCol;
byte[] bFam;*/
for (int i = 0; i < cols.length; i++) {
//Generate a PutRequest for each column.
/* columnFamilyName = new String(columnNames[i]).split(":");
bFam = columnFamilyName[0].getBytes();
bCol = columnFamilyName[1].getBytes();
if (i == 0) {
currentRowKey = cols[i].getBytes();
}*/
PutRequest req = new PutRequest(table, currentRowKey, colFam,
columnNames[i], cols[i].getBytes());
/* PutRequest req = new PutRequest(table, currentRowKey, bFam,
bCol, cols[i].getBytes());*/
puts.add(req);
}
return puts;
}
@Override
public List<AtomicIncrementRequest> getIncrements() {
incs.clear();
//Increment the number of events received
incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
return incs;
}
@Override
public void cleanUp() {
table = null;
colFam = null;
currentEvent = null;
columnNames = null;
currentRowKey = null;
}
@Override
public void configure(Context context) {
//Get the column names from the configuration
String cols = new String(context.getString("columns"));
String[] names = cols.split(",");
columnNames = new byte[names.length][];
int i = 0;
for(String name : names) {
columnNames[i++] = name.getBytes();
}
// delim = new String(context.getString("delimiter"));
}
@Override
public void configure(ComponentConfiguration conf) {
}
}