这是classic
您可以通过设置和分配来使用Spark的多线程 scheduling
功能的情况spark.scheduler.mode=FAIR
pools
你需要做的是
- 预先创建你
list
的分区
- 将此列表用作可迭代对象
- 对于此列表中的每个迭代器,在不同的池中触发一个 spark-job
- 无需使用 differents3distcp
示例如下所示:
在做火花提交之前=>
# Create a List of all *possible* partitions like this
# Example S3 prefixes :
# s3://my_bucket/my_table/year=2019/month=02/day=20
# ...
# ...
# s3://my_bucket/my_table/year=2020/month=03/day=15
# ...
# ...
# s3://my_bucket/my_table/year=2020/month=09/day=01
# WE SET `TARGET_PREFIX` as:
TARGET_PREFIX="s3://my_bucket/my_table"
# And Create a List ( till Day=nn part)
# By looping twice
# Increase loop numbers if partition is till hour
aws s3 ls "${TARGET_PREFIX}/"|grep PRE|awk '{print $2}'|while read year_part ;
do
full_year_part="${TARGET_PREFIX}/${year_part}";
aws s3 ls ${full_year_part}|grep PRE|awk '{print $2}'|while read month_part;
do
full_month_part=${full_year_part}${month_part};
aws s3 ls ${full_month_part}|grep PRE|awk -v pref=$full_month_part '{print pref$2}';
done;
done
完成后,我们运行此脚本并将结果保存在如下文件中:
bash build_year_month_day.sh > s3_<my_table_day_partition>_file.dat
现在我们准备在多线程中运行 spark
Spark 代码需要两件事(除了scheduler.mode=FAIR
1. creating an iterator from the file created above # s3_<my_table_day_partition>_file.dat
2. sc.setLocalProperty
请参阅它是如何完成的。
一个。我们在spark-app Python中读取文件
year_month_date_index_file = "s3_<my_table_day_partition>_file.dat"
with open(year_month_date_index_file, 'r') as f:
content = f.read()
content_iter = [(idx, c) for idx, c in enumerate(content.split("\n")) if c]
B . 并使用 100 天的切片来触发 100 个线程:
# Number of THREADS can be Increased or Decreased
strt = 0
stp = 99
while strt < len(content_iter):
threads_lst = []
path_slices = islice(content_iter, strt, stp)
for s3path in path_slices:
print("PROCESSING FOR PATH {}".format(s3path))
pool_index = int(s3path[0]) # Spark needs a POOL ID
my_addr = s3path[1]
# CALLING `process_in_pool` in each thread
agg_by_day_thread = threading.Thread(target=process_in_pool, args=(pool_index, <additional_args>)) # Pool_index is mandatory argument.
agg_by_day_thread.start() # Start opf Thread
threads_lst.append(agg_by_day_thread)
for process in threads_lst:
process.join() # Wait for All Threads To Finish
strt = stp
stp += 100
需要注意的两件事
path_slices = islice(content_iter, strt, stp)
=>返回大小的切片(strt - stp)
pool_index = int(s3path[0])
=>的索引content_iter
,我们将使用它来分配池 id。
现在是代码的肉
def process_in_pool(pool_id, <other_arguments>):
sc.setLocalProperty("spark.scheduler.pool", "pool_id_{}".format(str(int(pool_id) % 100)))
如您所见,我们希望将线程限制为100 个池因此,我们设置spark.scheduler.pool
为pool_idex
%100 在此 ` process_in_pool()函数中写入您的实际转换/操作
完成后,通过释放该池退出函数
...
sc.setLocalProperty("spark.scheduler.pool", None)
return
最后
运行你 spark-submit 喜欢
spark-submit \
-- Other options \
--conf spark.scheduler.mode=FAIR \
--other options \
my_spark_app.py
如果使用正确的执行器/核心/内存进行调整,您会看到巨大的性能提升。
也可以这样做scala
但那concurrent.futures
是另一天。