0

我使用水槽将日志文件数据插入到 hbase 表中。但是没有任何东西插入到表中。水槽剂如下:

 agent1.sources = tail
 agent1.channels = memoryChannel
 agent1.sinks = loggerSink sink1
 agent1.sources.tail.type = exec
agent1.sources.tail.command = tail -f /usr/local/jarsfortest/LogsForTest/generatingLogs-app.logs
agent1.sources.tail.channels = memoryChannel

agent1.sinks.loggerSink.channel = memoryChannel
agent1.sinks.loggerSink.type = logger

agent1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
agent1.sinks.sink1.channel = memoryChannel
agent1.sinks.sink1.table = testFlume
agent1.sinks.sink1.columnFamily = log
agent1.sinks.sink1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#agent1.sinks.sink1.serializer.regex = [a-zA-Z0-9]*[^C][a-zA-Z0-9]*[^C][a-zA-Z0-9]*
agent1.sinks.sink1.serializer.regex =[a-zA-Z0-9]*^C[a-zA-Z0-9]*^C[a-zA-Z0-9]*
agent1.sinks.sink1.serializer.colNames = id, no_fill_reason, bid

agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 1000

上面的代理启动成功..但是日志文件数据没有插入hbase。日志文件数据如下: id0^COK^C10 即数据是控制字符分隔的。请帮我。提前致谢。

4

2 回答 2

1

原因可能是正则表达式不匹配调试,请按照以下步骤操作

1) 使用此选项仅启动一个代理

flume-ng agent -n $1 -c ../../config/conf/ -f ../../config/conf/$1.conf -Xmx3g **-Xdebug -Xrunjdwp:transport=dt_socket, address=1044,server=y,suspend=y**
  --classpath ../lib/*:../../config/conf/zoo.cfg:../.

一旦你启动脚本

听1044...

消息会来的。

2)创建eclipse远程应用程序以连接到您的服务器名称和端口为1044

3)getActions方法负责将行放入 hbase..

getActions在方法中设置一个断点EventSerializer

public List<Row> getActions() throws FlumeException {
        List actions = Lists.newArrayList();

        Matcher m = this.inputPattern.matcher(new String(this.payload, this.charset));
        if (!(m.matches())) {
            return Lists.newArrayList();
        }

        if (m.groupCount() != this.colNames.size())
            return Lists.newArrayList();
        try {
            byte[] rowKey;
            byte[] rowKey;
            if (this.rowKeyIndex < 0)
                rowKey = getRowKey();
            else {
                rowKey = m.group(this.rowKeyIndex + 1).getBytes(Charsets.UTF_8);
            }
            Put put = new Put(rowKey);

            for (int i = 0; i < this.colNames.size(); ++i) {
                if (i != this.rowKeyIndex) {
                    put.add(this.cf, (byte[]) this.colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
                }
            }
            if (this.depositHeaders) {
                for (Map.Entry entry : this.headers.entrySet()) {
                    put.add(this.cf, ((String) entry.getKey()).getBytes(this.charset),
                                    ((String) entry.getValue()).getBytes(this.charset));
                }
            }
            actions.add(put);
        } catch (Exception e) {
            throw new FlumeException("Could not get row key!", e);
        }
        return actions;
    }
于 2015-12-04T18:49:02.690 回答
0

我可以帮助分析根本原因。

使用CTRL+C终止this flume-ng process.

仔细观察输出。将有三种类型的指标:

  • 类型的关闭指标:SOURCE
  • 类型的关闭指标:通道
  • 类型的关闭指标:SINK

例如:

Shutdown Metric for type: SOURCE, name: r1. source.start.time == 1483838106878
Shutdown Metric for type: SOURCE, name: r1. source.stop.time == 1483838118766
Shutdown Metric for type: SOURCE, name: r1. src.append-batch.accepted == 0
Shutdown Metric for type: SOURCE, name: r1. src.append-batch.received == 0
Shutdown Metric for type: SOURCE, name: r1. src.append.accepted == 0
Shutdown Metric for type: SOURCE, name: r1. src.append.received == 0
Shutdown Metric for type: SOURCE, name: r1. src.events.accepted == 141
Shutdown Metric for type: SOURCE, name: r1. src.events.received == 147
Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0

Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1483838106874
Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1483838118789
Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 1000000
Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 141
Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 147
Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 141
Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 31
Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 26

Shutdown Metric for type: SINK, name: k1. sink.start.time == 1483838108891
Shutdown Metric for type: SINK, name: k1. sink.stop.time == 1483838118758
Shutdown Metric for type: SINK, name: k1. sink.batch.complete == 0
Shutdown Metric for type: SINK, name: k1. sink.batch.empty == 4
Shutdown Metric for type: SINK, name: k1. sink.batch.underflow == 1
Shutdown Metric for type: SINK, name: k1. sink.connection.closed.count == 1
Shutdown Metric for type: SINK, name: k1. sink.connection.creation.count == 0
Shutdown Metric for type: SINK, name: k1. sink.connection.failed.count == 0
Shutdown Metric for type: SINK, name: k1. sink.event.drain.attempt == 26
Shutdown Metric for type: SINK, name: k1. sink.event.drain.sucess == 0

在上面的示例中,问题出现在 sink 阶段,因为drain.success等于 0。

于 2017-01-08T01:22:38.430 回答