0

I am using flume to write to Google Cloud Storage. Flume listens on HTTP:9000. It took me some time to make it work (add gcs libaries, use a credentials file...) but now it seems to communicate over the network.

I am sending very small HTTP request for my tests, and I have plenty of RAM available:

curl -X POST -d '[{ "headers" : { timestamp=1417444588182, env=dev, tenant=myTenant, type=myType }, "body" : "some body ONE"  }]' localhost:9000

I encounter this memory exception on first request (then of course, it stops working):

2014-11-28 16:59:47,748 (hdfs-hdfs_sink-call-runner-0) [INFO - com.google.cloud.hadoop.util.LogUtil.info(LogUtil.java:142)] GHFS version: 1.3.0-hadoop2
2014-11-28 16:59:50,014 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:467)] process failed
java.lang.OutOfMemoryError: Java heap space
        at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:76)
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:79)
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:820)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)

(see complete stack trace as a gist for full details)

The strange part is that folders and files are created the way I want, but files are empty.

gs://my_bucket/dev/myTenant/myType/2014-12-01/14-36-28.1417445234193.json.tmp

Is it something wrong with the way I configured flume + GCS or is it a bug in GCS.jar ?

Where should I check to gather more data ?

ps : I am running flume-ng inside docker.


My flume.conf file:

# Name the components on this agent
a1.sources = http
a1.sinks = hdfs_sink
a1.channels = mem

# Describe/configure the source
a1.sources.http.type =  org.apache.flume.source.http.HTTPSource
a1.sources.http.port = 9000

# Describe the sink
a1.sinks.hdfs_sink.type = hdfs
a1.sinks.hdfs_sink.hdfs.path = gs://my_bucket/%{env}/%{tenant}/%{type}/%Y-%m-%d
a1.sinks.hdfs_sink.hdfs.filePrefix = %H-%M-%S
a1.sinks.hdfs_sink.hdfs.fileSuffix = .json
a1.sinks.hdfs_sink.hdfs.round = true
a1.sinks.hdfs_sink.hdfs.roundValue = 10
a1.sinks.hdfs_sink.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.mem.type = memory
a1.channels.mem.capacity = 10000
a1.channels.mem.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.http.channels = mem
a1.sinks.hdfs_sink.channel = mem

related question in my flume/gcs journey: What is the minimal setup needed to write to HDFS/GS on Google Cloud Storage with flume?

4

1 回答 1

3

上传文件时,GCS Hadoop FileSystem 实现为每个 FSDataOutputStream 留出相当大的 (64MB) 写入缓冲区(打开文件以进行写入)。这可以通过在 core-site.xml中将“fs.gs.io.buffersize.write”设置为较小的值(以字节为单位)来更改。我想 1MB 足以满足低容量的日志收集。

此外,检查启动 JVM for flume 时设置的最大堆大小。flume-ng 脚本将默认 JAVA_OPTS 值设置为 -Xmx20m 以将堆限制为 20MB。这可以在flume-env.sh 中设置为更大的值(有关详细信息,请参阅flume tarball 发行版中的conf/flume-env.sh.template)。

于 2014-12-01T19:25:01.827 回答