我在 EMR 上运行它,但我在此处包含了一个示例 df 以显示示例。我需要添加一个经理,以便所有工人都能看到我的字典。
在我将管理器放入之前,该脚本运行正常,并且在我通过 UDF 运行它之前它仍然正常运行。我在下面添加了一些错误消息。
该脚本接受一个代码并检查该代码是否被使用(即:在 used_codes 字典中)。如果代码没有被使用,那么它会吐出相同的代码,如果它被使用过,那么它会生成一个新代码。我正在使用所有唯一代码的新列“code_id”创建一个 new_df。
我相信问题在于这些代码行:
manager = Manager()
used_codes = manager.dict()
p1 = Process(target=get_valid_code, args=(used_codes, code))
p1.start()
def run_df(df):
print(f"running df")
df = spark.createDataFrame(data=data,schema=schema)
new_df = df.withColumn("code_id", get_valid_code_udf('id'))
p1.join()
return new_df.show()
完整脚本
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F
import pyspark.sql.types as T
import random
from multiprocessing import Manager, Process
data = [("James", "36636"),
("Michael", "36636"),
("Robert", "42114"),
("Maria", "39192"),
("Jen", "39192")
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("id", StringType(), True), \
])
df = spark.createDataFrame(data=data,schema=schema)
manager = Manager()
used_codes = manager.dict()
def generate_random_code():
random_number = random.randint(10000,90000)
return random_number
def get_valid_code(used_codes, code):
if(code != "" and code not in used_codes.keys()):
used_codes[code] = 1
return code
new_code = generate_random_code()
while (new_code in used_codes.keys()):
new_code = generate_random_code()
used_codes[new_code] = 2
return new_code
get_valid_code_udf = F.udf(lambda code: get_valid_code(used_codes, code), T.StringType())
manager = Manager()
used_codes = manager.dict()
p1 = Process(target=get_valid_code, args=(used_codes, code))
p1.start()
def run_df(df):
print(f"running df")
df = spark.createDataFrame(data=data,schema=schema)
new_df = df.withColumn("code_id", get_valid_code_udf('id'))
p1.join()
return new_df.show()
>>> df.show()
+---------+-----+
|firstname| id|
+---------+-----+
| James|36636|
| Michael|36636|
| Robert|42114|
| Maria|39192|
| Jen|39192|
+---------+-----+
#This is the ideal outcome where new codes have been generated for the repeats.
>>> new_df.show()
+---------+-----+-------+
|firstname| id|code_id|
+---------+-----+-------+
| James|36636| 36636|
| Michael|36636| 63312|
| Robert|42114| 42114|
| Maria|39192| 39192|
| Jen|39192| 76399|
+---------+-----+-------+
错误信息
>>> run_df(df)
running df
[Stage 8:> (0 + 1) / 1]21/10/20 19:48:23 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 8, ip-10-0-7-132.ec2.internal, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/worker.py", line 366, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/worker.py", line 241, in read_udfs
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/worker.py", line 168, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 587, in loads
return pickle.loads(obj, encoding=encoding)
File "/usr/lib64/python3.7/multiprocessing/managers.py", line 943, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
File "/usr/lib64/python3.7/multiprocessing/managers.py", line 793, in __init__
self._incref()
File "/usr/lib64/python3.7/multiprocessing/managers.py", line 847, in _incref
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib64/python3.7/multiprocessing/connection.py", line 492, in Client
c = SocketClient(address)
File "/usr/lib64/python3.7/multiprocessing/connection.py", line 620, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory