0

我正在尝试将 s3 中的数据移动到静止时(源)上的日期字符串分区的另一个位置,在该位置它在静止时(目标)分区为 year=yyyy/month=mm/day=dd/

虽然我能够读取 Spark 中的整个源位置数据并在 tmp hdfs 中以目标格式对其进行分区,但 s3DistCp 无法将其从 hdfs 复制到 s3。它因 OutOnMemory 错误而失败。

我正在尝试编写近 200 万个小文件(每个 20KB)

我的 s3Distcp 使用以下参数运行 sudo -H -u hadoop nice -10 bash -c "if hdfs dfs -test -d hdfs:///<source_path>; then /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar -libjars /usr/share/aws/emr/s3-dist-cp/lib/ -Dmapreduce.job.reduces=30 -Dmapreduce.child.java.opts=Xmx2048m --src hdfs:///<source_path> --dest s3a://<destination_path> --s3ServerSideEncryption;fi"

它失败了

[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # java.lang.OutOfMemoryError: Java heap space
[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # -XX:OnOutOfMemoryError="kill -9 %p"```

The emr cluster I am running this is 
"master_instance_type": "r5d.8xlarge",
"core_instance_type": "r5.2xlarge",
"core_instance_count": "8",
"task_instance_types": [ "r5.2xlarge","m5.4xlarge"],
"task_instance_count": "1000"

Any suggestions what I could increase configurations on s3Distcp for it to be able to copy this without running out of memory? 
4

2 回答 2

0

我最终迭代地运行它,对于所说的 aws 堆栈,它能够在每次迭代中处理大约 300K 文件而无需 OOM

于 2020-09-04T08:34:42.900 回答
0

这是classic您可以通过设置和分配来使用Spark的多线程 scheduling功能的情况spark.scheduler.mode=FAIRpools

你需要做的是

  • 预先创建list的分区
  • 将此列表用作可迭代对象
  • 对于此列表中的每个迭代器,在不同的池中触发一个 spark-job
  • 无需使用 differents3distcp

示例如下所示:

在做火花提交之前=>

# Create a List of all *possible* partitions like this 
# Example S3 prefixes :
#     s3://my_bucket/my_table/year=2019/month=02/day=20
#    ...
#   ...
#      s3://my_bucket/my_table/year=2020/month=03/day=15
#     ...
#     ...
#     s3://my_bucket/my_table/year=2020/month=09/day=01

# WE SET `TARGET_PREFIX` as:
TARGET_PREFIX="s3://my_bucket/my_table"

# And Create a List ( till Day=nn part)
# By looping twice

# Increase loop numbers if partition is till hour
aws s3 ls "${TARGET_PREFIX}/"|grep PRE|awk '{print $2}'|while read year_part ;

do

full_year_part="${TARGET_PREFIX}/${year_part}";

aws s3 ls ${full_year_part}|grep PRE|awk '{print $2}'|while read month_part;

do

full_month_part=${full_year_part}${month_part};

aws s3 ls ${full_month_part}|grep PRE|awk -v pref=$full_month_part '{print pref$2}';

done;

done

完成后,我们运行此脚本并将结果保存在如下文件中: bash build_year_month_day.sh > s3_<my_table_day_partition>_file.dat

现在我们准备在多线程中运行 spark

Spark 代码需要两件事(除了scheduler.mode=FAIR

1. creating an iterator from the file created above # s3_<my_table_day_partition>_file.dat

2. sc.setLocalProperty

请参阅它是如何完成的。

一个我们在spark-app Python中读取文件

 year_month_date_index_file = "s3_<my_table_day_partition>_file.dat"
 with open(year_month_date_index_file, 'r') as f:
        content = f.read()
 content_iter = [(idx, c) for idx, c in enumerate(content.split("\n")) if c]

B . 并使用 100 天的切片来触发 100 个线程:

    # Number of THREADS can be Increased or Decreased 
    strt = 0
    stp = 99
    while strt < len(content_iter):
        
        threads_lst = []
        path_slices = islice(content_iter, strt, stp)
        for s3path in path_slices:
            print("PROCESSING FOR PATH {}".format(s3path))
            pool_index = int(s3path[0]) # Spark needs a POOL ID
            my_addr = s3path[1]
            # CALLING `process_in_pool` in each thread
            agg_by_day_thread = threading.Thread(target=process_in_pool, args=(pool_index, <additional_args>)) # Pool_index is mandatory argument.
            agg_by_day_thread.start() # Start opf Thread
            threads_lst.append(agg_by_day_thread)
        
        for process in threads_lst:
            process.join() # Wait for All Threads To Finish
        
        strt = stp
        stp += 100 

需要注意的两件事 path_slices = islice(content_iter, strt, stp) =>返回大小的切片(strt - stp)

pool_index = int(s3path[0]) =>的索引content_iter,我们将使用它来分配池 id。

现在是代码的肉

def process_in_pool(pool_id, <other_arguments>):
    sc.setLocalProperty("spark.scheduler.pool", "pool_id_{}".format(str(int(pool_id) % 100)))

如您所见,我们希望将线程限制为100 个池因此,我们设置spark.scheduler.poolpool_idex%100 在此 ` process_in_pool()函数中写入您的实际转换/操作

完成后,通过释放该池退出函数

...
sc.setLocalProperty("spark.scheduler.pool", None)
return

最后 运行你 spark-submit 喜欢

spark-submit \
-- Other options \
--conf spark.scheduler.mode=FAIR \
--other options \
my_spark_app.py 

如果使用正确的执行器/核心/内存进行调整,您会看到巨大的性能提升。

也可以这样做scala但那concurrent.futures 是另一天。

于 2020-09-04T21:15:41.320 回答