1

我正在使用 Datasource Writer API 将 5000 条记录写入 Hudi 写时复制表,每列有 8 列,总大小小于 1Mb。请参考以下代码。

        Dataset<Row> ds1 = spark.read().json(jsc.parallelize(records, 2));
        DataFrameWriter<Row> writer = ds1.write().format("org.apache.hudi")
                .option("hoodie.insert.shuffle.parallelism", 2)
                .option("hoodie.upsert.shuffle.parallelism", 2)
                .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
                .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
                .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), recordKey)
                .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), partitionPath)
                .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), precombineKey)
                .option(HoodieWriteConfig.TABLE_NAME, hudiTableName)

                .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hudiTableName)
                .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDatabase)
                .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveServerUrl)
                .option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser)
                .option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePassword)
                .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), hiveSyncEnabled)
                .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), partitionPath) 
                .mode(SaveMode.Append);

        writer.save(basePath);

应用程序通过 spark-submit 提交给 yarn。刚开始插入操作只需要 3~4 秒,但越来越长,比如 5 分钟后的 30 秒。从下面的 spark 日志中可以看出,大部分时间都花在了 HoodieSparkSqlWriter 计数任务上。

2020-05-25 16:36:37,851 | INFO  | [dag-scheduler-event-loop] | Adding task set 185.0 with 1 tasks | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2020-05-25 16:36:37,851 | INFO  | [dispatcher-event-loop-0] | Starting task 0.0 in stage 185.0 (TID 190, node-ana-corepOlf, executor 2, partition 0, NODE_LOCAL, 7651 bytes) | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2020-05-25 16:36:37,858 | INFO  | [dispatcher-event-loop-1] | Added broadcast_124_piece0 in memory on node-ana-corepOlf:36554 (size: 138.1 KB, free: 29.2 GB) | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2020-05-25 16:36:37,887 | INFO  | [dispatcher-event-loop-1] | Asked to send map output locations for shuffle 53 to 10.155.114.97:32461 | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2020-05-25 16:37:11,098 | INFO  | [dispatcher-event-loop-0] | Added rdd_381_0 in memory on node-ana-corepOlf:36554 (size: 387.0 B, free: 29.2 GB) | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2020-05-25 16:37:11,111 | INFO  | [task-result-getter-2] | Finished task 0.0 in stage 185.0 (TID 190) in 33260 ms on node-ana-corepOlf (executor 2) (1/1) | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2020-05-25 16:37:11,111 | INFO  | [task-result-getter-2] | Removed TaskSet 185.0, whose tasks have all completed, from pool  | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2020-05-25 16:37:11,112 | INFO  | [dag-scheduler-event-loop] | ResultStage 185 (count at HoodieSparkSqlWriter.scala:254) finished in 33.308 s | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2020-05-25 16:37:11,113 | INFO  | [Driver] | Job 70 finished: count at HoodieSparkSqlWriter.scala:254, took 33.438673 s | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)

我尝试将参数 hoodie.insert.shuffle.parallelism 调整为 20,但没有帮助。并且 CPU/Heap 的使用都是正常的。

以下是应用程序的设置。任何建议表示赞赏。

Executor instance: 2
Executor memory: 55g
Executor cores: 4
Driver memory: 4g

4

1 回答 1

0

你会在 dev@hudi.apache.org ML 或 github 问题中寻求帮助。

于 2020-05-30T01:55:13.110 回答