0

我正在尝试实现一个简单的 Flume HDFS 接收器,它将从 Kafka 通道获取事件并将它们作为文本文件写入 hdfs。

该架构非常简单。这些事件从 twitter 流式传输到 kafka 主题,flume hdfs sink 确实会将这些事件写入 hdfs。这是Kafka-producer stackoverflow question的第 2 部分。

执行此命令时没有错误,似乎工作正常,但我无法在 hdfs 中看到文本文件。我无法调试或调查,因为文件/var/log/flume/夹中没有创建日志文件。我正在使用 Hortonworks 沙箱 2.3.1 和 hue 来浏览文件系统。


执行flume的命令: flume-ng agent -n KafkaSink -c conf -f tweets_sink_flume.properties

Flume 属性文件: tweets_sink_flume.properties

# agent name, in this case KafkaSink.
KafkaSink.sources  = kafka-source-1
KafkaSink.channels = hdfs-channel-1
KafkaSink.sinks    = hdfs-sink-1

# kafka source properties.
KafkaSink.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
KafkaSink.sources.kafka-source-1.zookeeperConnect = sandbox.hortonworks.com:2181
KafkaSink.sources.kafka-source-1.topic = raw_json_tweets
KafkaSink.sources.kafka-source-1.batchSize = 100
KafkaSink.sources.kafka-source-1.channels = hdfs-channel-1

# hdfs flume sink properties
KafkaSink.channels.hdfs-channel-1.type   = memory
KafkaSink.sinks.hdfs-sink-1.channel = hdfs-channel-1
KafkaSink.sinks.hdfs-sink-1.type = hdfs
KafkaSink.sinks.hdfs-sink-1.hdfs.writeFormat = Text
KafkaSink.sinks.hdfs-sink-1.hdfs.fileType = DataStream
KafkaSink.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
KafkaSink.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true

# created /user/ruben/flume/ folder in hdfs to avoid permission error issues
KafkaSink.sinks.hdfs-sink-1.hdfs.path = /user/ruben/flume/%{topic}/%y-%m-%d
KafkaSink.sinks.hdfs-sink-1.hdfs.rollCount=100
KafkaSink.sinks.hdfs-sink-1.hdfs.rollSize=0

# specify the capacity of the memory channel.
KafkaSink.channels.hdfs-channel-1.capacity = 10000
KafkaSink.channels.hdfs-channel-1.transactionCapacity = 1000

以下是相关 Flume 控制台输出的一部分:

..
16/08/26 22:20:48 INFO kafka.KafkaSource: Kafka source kafka-source-1 started.
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], begin rebalancing consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping leader finder thread
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Shutting down
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Stopped
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Shutdown completed
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping all fetchers
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] All connections stopped
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Cleared all relevant queues for this fetcher
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Cleared the data chunks in all the consumer message iterators
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Committing all offsets after clearing the fetcher queues
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Releasing partition ownership
16/08/26 22:36:38 INFO consumer.RangeAssignor: Consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f rebalancing the following partitions: ArrayBuffer(0) for topic raw_json_tweets with consumers: List(flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0)
16/08/26 22:36:38 INFO consumer.RangeAssignor: flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0 attempting to claim partition 0
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0 successfully owned partition 0 for topic raw_json_tweets
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f selected partitions : raw_json_tweets:0: fetched offset = -1: consumed offset = -1
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], end rebalancing consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Starting
16/08/26 22:36:38 INFO utils.VerifiableProperties: Verifying properties
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property client.id is overridden to flume
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to sandbox.hortonworks.com:6667
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
16/08/26 22:36:38 INFO client.ClientUtils$: Fetching metadata from broker id:0,host:sandbox.hortonworks.com,port:6667 with correlation id 0 for 1 topic(s) Set(raw_json_tweets)
16/08/26 22:36:38 INFO producer.SyncProducer: Connected to sandbox.hortonworks.com:6667 for producing
16/08/26 22:36:38 INFO producer.SyncProducer: Disconnecting from sandbox.hortonworks.com:6667
16/08/26 22:36:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0-0], Starting
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Added fetcher for partitions ArrayBuffer([[raw_json_tweets,0], initOffset -1 to broker id:0,host:sandbox.hortonworks.com,port:6667] )
4

1 回答 1

3

默认情况下,Flume 的 Kafka 源将仅获取在具有源的代理启动后写入 Kafka 的消息 - 它不会从一开始就使用主题。

所以要么:

  1. 确保在 Flume 开始后继续写该主题
  2. 使用 readSmallestOffset=true 配置要求 Kafka 源从头开始。
于 2016-08-28T00:22:28.647 回答