我正在尝试实现一个简单的 Flume HDFS 接收器,它将从 Kafka 通道获取事件并将它们作为文本文件写入 hdfs。
该架构非常简单。这些事件从 twitter 流式传输到 kafka 主题,flume hdfs sink 确实会将这些事件写入 hdfs。这是Kafka-producer stackoverflow question的第 2 部分。
执行此命令时没有错误,似乎工作正常,但我无法在 hdfs 中看到文本文件。我无法调试或调查,因为文件/var/log/flume/
夹中没有创建日志文件。我正在使用 Hortonworks 沙箱 2.3.1 和 hue 来浏览文件系统。
执行flume的命令: flume-ng agent -n KafkaSink -c conf -f tweets_sink_flume.properties
Flume 属性文件: tweets_sink_flume.properties
# agent name, in this case KafkaSink.
KafkaSink.sources = kafka-source-1
KafkaSink.channels = hdfs-channel-1
KafkaSink.sinks = hdfs-sink-1
# kafka source properties.
KafkaSink.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
KafkaSink.sources.kafka-source-1.zookeeperConnect = sandbox.hortonworks.com:2181
KafkaSink.sources.kafka-source-1.topic = raw_json_tweets
KafkaSink.sources.kafka-source-1.batchSize = 100
KafkaSink.sources.kafka-source-1.channels = hdfs-channel-1
# hdfs flume sink properties
KafkaSink.channels.hdfs-channel-1.type = memory
KafkaSink.sinks.hdfs-sink-1.channel = hdfs-channel-1
KafkaSink.sinks.hdfs-sink-1.type = hdfs
KafkaSink.sinks.hdfs-sink-1.hdfs.writeFormat = Text
KafkaSink.sinks.hdfs-sink-1.hdfs.fileType = DataStream
KafkaSink.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
KafkaSink.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
# created /user/ruben/flume/ folder in hdfs to avoid permission error issues
KafkaSink.sinks.hdfs-sink-1.hdfs.path = /user/ruben/flume/%{topic}/%y-%m-%d
KafkaSink.sinks.hdfs-sink-1.hdfs.rollCount=100
KafkaSink.sinks.hdfs-sink-1.hdfs.rollSize=0
# specify the capacity of the memory channel.
KafkaSink.channels.hdfs-channel-1.capacity = 10000
KafkaSink.channels.hdfs-channel-1.transactionCapacity = 1000
以下是相关 Flume 控制台输出的一部分:
..
16/08/26 22:20:48 INFO kafka.KafkaSource: Kafka source kafka-source-1 started.
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], begin rebalancing consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping leader finder thread
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Shutting down
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Stopped
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Shutdown completed
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping all fetchers
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] All connections stopped
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Cleared all relevant queues for this fetcher
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Cleared the data chunks in all the consumer message iterators
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Committing all offsets after clearing the fetcher queues
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Releasing partition ownership
16/08/26 22:36:38 INFO consumer.RangeAssignor: Consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f rebalancing the following partitions: ArrayBuffer(0) for topic raw_json_tweets with consumers: List(flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0)
16/08/26 22:36:38 INFO consumer.RangeAssignor: flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0 attempting to claim partition 0
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0 successfully owned partition 0 for topic raw_json_tweets
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f selected partitions : raw_json_tweets:0: fetched offset = -1: consumed offset = -1
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], end rebalancing consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Starting
16/08/26 22:36:38 INFO utils.VerifiableProperties: Verifying properties
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property client.id is overridden to flume
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to sandbox.hortonworks.com:6667
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
16/08/26 22:36:38 INFO client.ClientUtils$: Fetching metadata from broker id:0,host:sandbox.hortonworks.com,port:6667 with correlation id 0 for 1 topic(s) Set(raw_json_tweets)
16/08/26 22:36:38 INFO producer.SyncProducer: Connected to sandbox.hortonworks.com:6667 for producing
16/08/26 22:36:38 INFO producer.SyncProducer: Disconnecting from sandbox.hortonworks.com:6667
16/08/26 22:36:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0-0], Starting
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Added fetcher for partitions ArrayBuffer([[raw_json_tweets,0], initOffset -1 to broker id:0,host:sandbox.hortonworks.com,port:6667] )