我正在尝试使用 Flume 1.6 读取我的源文件(管道分隔的文本文件)并将它们提供给 kafka。
所有管道似乎都工作正常,所有记录都成功进入卡夫卡。但是,Flume 似乎在每个数据行的开头添加了 NUL 和 STX 控制字符。
这对我来说有点问题,因为最终我将所有数据从 Kafka 流式传输到 Amazon S3,最后进入 Amazon Redshift。由于这些控制字符,我将数据加载到 Redshift(COPY 命令)失败。
我花了相当多的时间在互联网上研究这个问题,但到目前为止还没有运气。我可以以某种方式指示 Flume 不要在我的源数据中添加这些字符吗?
我的 Flume 配置如下。我已经尝试过“exec”和“spoolDir”来源,但没有任何改变。
kafka-agent.channels=ch1
kafka-agent.channels.ch1.type=org.apache.flume.channel.kafka.KafkaChannel
kafka-agent.channels.ch1.brokerList=localhost:9092
kafka-agent.channels.ch1.topic=call-center-dimension
kafka-agent.channels.ch1.zookeeperConnect=localhost:2181
kafka-agent.channels.ch1.capacity=10000
kafka-agent.channels.ch1.transactionCapacity=10000
kafka-agent.channels.ch1.parseAsFlumeEvent = true
kafka-agent.channels.ch1.kafka.serializer.class=kafka.serializer.DefaultEncoder
kafka-agent.sources=tail
#kafka-agent.sources.tail.type=spooldir
#kafka-agent.sources.tail.channels=ch1
#kafka-agent.sources.tail.spoolDir=/home/ec2-user/flumespool
#kafka-agent.sources.tail.fileHeader=false
kafka-agent.sources.tail.type=exec
kafka-agent.sources.tail.channels=ch1
kafka-agent.sources.tail.shell=/bin/bash -c
kafka-agent.sources.tail.command=cat /tmp/call_center_dimension_1.out
kafka-agent.sinks=sink1
kafka-agent.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
kafka-agent.sinks.sink1.brokerList=localhost:9092
kafka-agent.sinks.sink1.topic=kafka
kafka-agent.sinks.sink1.channel=ch1
kafka-agent.sinks.sink1.batchSize=5
kafka-agent.sinks.sink1.kafka.serializer.class=kafka.serializer.StringEncoder
任何帮助表示赞赏。
谢谢普拉维什