编辑*:这是完整的配置文件:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.command = /usr/bin/vmstat 1
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.consumer.timeout.ms = 20000000
tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList= ip.address:9092
tier1.channels.channel1.topic= test1
tier1.channels.channel1.zookeeperConnect=ip.address:2181
tier1.channels.channel1.parseAsFlumeEvent=false
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/flume/
tier1.sinks.sink1.hdfs.rollInterval = 5000
tier1.sinks.sink1.hdfs.rollSize = 5000
tier1.sinks.sink1.hdfs.rollCount = 1000
tier1.sinks.sink1.hdfs.idleTimeout= 10
tier1.sinks.sink1.hdfs.maxOpenFiles=1
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
直到最近我还没有 idleTimeout 和 maxOpenFiles。因此,即使使用这两个选项的默认配置,它也无法正常工作。
关于使用 Flume 聚合 Kafka 数据的问题。目前,Flume 每秒创建一个新文件用于读取流数据。这些是我的设置:
tier1.sinks.sink1.hdfs.rollInterval = 500 (should be 500 seconds)
tier1.sinks.sink1.hdfs.rollSize = 5000 (should be bytes)
tier1.sinks.sink1.hdfs.rollCount = 1000 (number of events)
我不完全确定的一个设置是rollCount,所以一些附加信息:
我得到 80 字节/秒,我的一些文件是 80 字节,有 2 条消息,有些是 160 字节,但有 4 条消息。所以它不是根据时间或大小来做的,所以它可能必须与计数有关,但我不明白为什么这么小的消息会注册为 1000 个事件?
感谢您的帮助!