我想同时运行一个简单的函数,它将进程的输出写入 txt.file,然后将其存储到 DBFS(Databricks 文件系统)。在我的示例中,我同时使用了 ThreadPoolExecutor 类()和 ProcessPoolExecutor 类(),尽管 ThreadPoolExecutor 类成功运行,而第二个类生成了酸洗错误。我想用这两个类运行我的函数。如何解决 PicklingError?
请在下面找到我运行以复制我的问题的代码,
如果您在本地而不是在 databricks 集群中运行它
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
创建 spark df 和参数
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copyreg as copy_reg
import types
from itertools import cycle
from datetime import datetime, timedelta
import time
import os
import pandas as pd
date_format = '%Y-%m-%d %H-%M-%S'
timestamp_snapshot=datetime.utcnow()
timestamp_snap=timestamp_snapshot.strftime(date_format)
pandas_df = pd.DataFrame({ 'id' : ['001', '001', '001', '001', '001', '002', '002', '002', '002', '002'],
'PoweredOn':[0, 0, 0, 1, 0, 0, 0, 1, 0, 0]
})
spark_df=spark.createDataFrame(pandas_df)
device_ids=list(pandas_df['id'].unique())
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)
方法 1 | 使用 ThreadPoolExecutor 类 - 完美运行
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
if __name__ == "__main__":
#main function
def testing_function_map(iterables_tuple):
print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), iterables_tuple[2]))
filtered_dataset=iterables_tuple[4].where(iterables_tuple[4].id.isin([iterables_tuple[0]]))
filtered_dataset.groupBy('PoweredOn').count()
message_list=filtered_dataset.groupBy('PoweredOn', 'id').count().collect()
filename='message_{0}_{1}.txt'.format(iterables_tuple[0], iterables_tuple[3])
with open(os.path.join(os.getcwd(),filename), 'w') as file:
file.writelines("Number of Powered on devices for asset id {0}: {1} & ".format(iterables_tuple[0], message_list[1][2]))
file.writelines("Number of Powered off devices for asset id {0}: {1}".format(iterables_tuple[0], message_list[0][2]))
print("Data saved successfully in dbfs!\n")
print("{0}: FINSIH EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), len(device_ids)))
#wait function
def wait_on_device(iterables_tuple):
time.sleep(1)
testing_function_map(iterables_tuple)
executor = ThreadPoolExecutor(max_workers=2)
# executor = ProcessPoolExecutor(max_workers=2)
tasks=[*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([spark_df]))]
list(executor.map(wait_on_device, tasks))
方法 2 | 使用 ProcessPoolExecutor 类 - 为 wait_on_device() 函数生成酸洗错误
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
if __name__ == "__main__":
def testing_function_map(iterables_tuple):
print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), iterables_tuple[2]))
filtered_dataset=iterables_tuple[4].where(iterables_tuple[4].id.isin([iterables_tuple[0]]))
filtered_dataset.groupBy('PoweredOn').count()
message_list=filtered_dataset.groupBy('PoweredOn', 'id').count().collect()
filename='message_{0}_{1}.txt'.format(iterables_tuple[0], iterables_tuple[3])
with open(os.path.join(os.getcwd(),filename), 'w') as file:
file.writelines("Number of Powered on devices for asset id {0}: {1} & ".format(iterables_tuple[0], message_list[1][2]))
file.writelines("Number of Powered off devices for asset id {0}: {1}".format(iterables_tuple[0], message_list[0][2]))
print("Data saved successfully in dbfs!\n")
print("{0}: FINSIH EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), len(device_ids)))
def wait_on_device(iterables_tuple):
time.sleep(1)
testing_function_map(iterables_tuple)
# executor = ThreadPoolExecutor(max_workers=2)
executor = ProcessPoolExecutor(max_workers=2)
tasks=[*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([spark_df]))]
list(executor.map(wait_on_device, tasks))
使用 ProcessPoolExecutor 类,我得到一个 PicklingError:
在一般测试 ProcessPoolExecutor 的这个应用程序时,它一直在函数 wait_on_device() 上给我一个泡菜错误
如何解决酸洗错误?我已经搜索了各种方法,比如使用类对主函数进行全局调用,或者通过创建一个函数,import copyreg as copy_reg
尽管它们都不能解决我的问题,可能是因为我没有正确创建它们。
到目前为止我的方法
正如@Steven Bethard在这里介绍的那样
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copyreg as copy_reg
import types
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
if __name__ == "__main__":
# The rest of my code already presented above
但是 PicklingError 仍然存在。
[更新]---当我在 Databricks 上运行代码时生成上述 PicklingError ...在 Jupyter Notebook 的机器上本地运行相同的代码我只收到以下错误与 ProcessPoolExecutor,