1

We have an application that moves csv files from HDFS to Hive. We are using Storm Topology for that process.

8 machines have been using. Each of them has 22 cores and 512 GB RAM. However, our code runs really slow. It takes 10 minutes to finish to transfer 6 million data.

10 MB of 60 files are transferring to HDFS in one second. We are trying to optimize our code, but it is obvious that we are doing something very wrong.

For Hive table, we have 64 buckets.

In our topology, we have 1 Spout and 2 Bolts. Basically our Spout gets the CSV file, emits lines to first Bolt which is responsible for parsing the data then the Bolt emits to second Bolt which is responsible for HDFS process.

HDFS Spout;

HdfsSpout hdfsSpout = new HdfsSpout()
    .withOutputFields(TextFileReader.defaultFields)
    .setReaderType("text")
    .setHdfsUri(hdfsUri)
    .setSourceDir("/data/in")
    .setArchiveDir("/data/done")
    .setBadFilesDir("/data/bad")
    .setClocksInSync(true) // NTP installed on all hosts
    .setIgnoreSuffix("_COPYING_") 
// do not begin reading file until it is completely copied to HDFS
    .setMaxOutstanding(50_000);

Mapper;

DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
    .withColumnFields(new Fields(TTDPIRecord.fieldsList))
    .withPartitionFields(new Fields(TTDPIRecord.partitionFieldsList));

Hive Options;

HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
    .withAutoCreatePartitions(true)
    .withHeartBeatInterval(3)
    .withCallTimeout(10_000) // default = 10.000
    .withTxnsPerBatch(2)
    .withBatchSize(50_000) 
// doing below because its affecting storm metrics most likely
    .withTickTupleInterval(1);

Config;

Config conf = new Config();
conf.setNumWorkers(6);
conf.setNumAckers(6);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

Topology Builder;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hdfsSpout", hdfsSpout, 8);
builder.setBolt("recordParserBolt", recordParserBolt, 8).localOrShuffleGrouping("hdfsSpout");
builder.setBolt("hiveBolt", hiveBolt, 8).localOrShuffleGrouping("recordParserBolt");

We are not sure with the following Parameters;

in HDFS Spout; .setMaxOutstanding(50_000);

in Hive Spout Options; .withTxnsPerBatch(2) .withBatchSize(50_000) .withTickTupleInterval(1);

in Config; .setNumWorkers(6); .setNumAckers(6);

Parallelism in Spout and Bolt; We gave 8 for each.

What should be the values for those parameters? Thanks in advance.

Edit; Here is our test result for 10 mb of 100 csv files;

hdfsSpout Executors: 8 Complete Latency: 1834.209 ms

recordParserBolt Executors: 8 Complete Latency: 0.019 ms

hiveBolt Executors: 8 Complete Latency: 1092.624 ms

4

1 回答 1

0

您正在这样做conf.setNumWorkers(6);意味着您只使用 8 台机器中的 6 台,您可以将其设置为 8 以利用您拥有的所有硬件。

您可以更改的另一个参数是螺栓的并行提示,这意味着组件的执行器(线程)的初始数量。您只给了 8 个并行度,您可以将其增加到 100/200 并查看性能如何变化。

你可以通过这个来了解并行性是如何在风暴中工作的。

你能告诉你max-spout-pending的配置是什么吗?

于 2018-12-30T05:59:06.247 回答