1

我正在尝试使用 Flume 1.6 读取我的源文件(管道分隔的文本文件)并将它们提供给 kafka。

所有管道似乎都工作正常,所有记录都成功进入卡夫卡。但是,Flume 似乎在每个数据行的开头添加了 NUL 和 STX 控制字符。

这对我来说有点问题,因为最终我将所有数据从 Kafka 流式传输到 Amazon S3,最后进入 Amazon Redshift。由于这些控制字符,我将数据加载到 Redshift(COPY 命令)失败。

我花了相当多的时间在互联网上研究这个问题,但到目前为止还没有运气。我可以以某种方式指示 Flume 不要在我的源数据中添加这些字符吗?

我的 Flume 配置如下。我已经尝试过“exec”和“spoolDir”来源,但没有任何改变。

kafka-agent.channels=ch1

kafka-agent.channels.ch1.type=org.apache.flume.channel.kafka.KafkaChannel
kafka-agent.channels.ch1.brokerList=localhost:9092
kafka-agent.channels.ch1.topic=call-center-dimension
kafka-agent.channels.ch1.zookeeperConnect=localhost:2181
kafka-agent.channels.ch1.capacity=10000
kafka-agent.channels.ch1.transactionCapacity=10000
kafka-agent.channels.ch1.parseAsFlumeEvent = true
kafka-agent.channels.ch1.kafka.serializer.class=kafka.serializer.DefaultEncoder

kafka-agent.sources=tail

#kafka-agent.sources.tail.type=spooldir
#kafka-agent.sources.tail.channels=ch1
#kafka-agent.sources.tail.spoolDir=/home/ec2-user/flumespool
#kafka-agent.sources.tail.fileHeader=false


kafka-agent.sources.tail.type=exec
kafka-agent.sources.tail.channels=ch1
kafka-agent.sources.tail.shell=/bin/bash -c
kafka-agent.sources.tail.command=cat /tmp/call_center_dimension_1.out

kafka-agent.sinks=sink1
kafka-agent.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
kafka-agent.sinks.sink1.brokerList=localhost:9092
kafka-agent.sinks.sink1.topic=kafka
kafka-agent.sinks.sink1.channel=ch1
kafka-agent.sinks.sink1.batchSize=5
kafka-agent.sinks.sink1.kafka.serializer.class=kafka.serializer.StringEncoder

任何帮助表示赞赏。

谢谢普拉维什

4

2 回答 2

0

只是为了关闭循环。

在这上面花了很多时间。还使用 regex_replace 拦截器来删除控制字符,但没有运气。

最终评估并使用了 Apache Nifi。出色的工具,易于设置和快速实施。我能够从文件中读取数据并使用 Nifi 推送到 Kafka 主题,它使数据保持干净和完整。

谢谢, 普拉维什

于 2016-02-22T13:02:42.520 回答
0

尝试使用 Hdfs 文件的 DataStream 文件类型:

kafka-agent.sinks.sink1.hdfs.fileType = DataStream

于 2018-07-15T10:39:41.547 回答