0

我在 EMR 上运行它,但我在此处包含了一个示例 df 以显示示例。我需要添加一个经理,以便所有工人都能看到我的字典。

在我将管理器放入之前,该脚本运行正常,并且在我通过 UDF 运行它之前它仍然正常运行。我在下面添加了一些错误消息。

该脚本接受一个代码并检查该代码是否被使用(即:在 used_codes 字典中)。如果代码没有被使用,那么它会吐出相同的代码,如果它被使用过,那么它会生成一个新代码。我正在使用所有唯一代码的新列“code_id”创建一个 new_df。

我相信问题在于这些代码行:

manager = Manager()
used_codes = manager.dict()

p1 = Process(target=get_valid_code, args=(used_codes, code))
p1.start()

def run_df(df):
    print(f"running df")
    df = spark.createDataFrame(data=data,schema=schema)
    new_df = df.withColumn("code_id", get_valid_code_udf('id')) 
    p1.join()
    return new_df.show()

完整脚本

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F
import pyspark.sql.types as T
import random
from multiprocessing import Manager, Process

data = [("James", "36636"),
    ("Michael", "36636"),
    ("Robert", "42114"),
    ("Maria", "39192"),
    ("Jen", "39192")
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("id", StringType(), True), \
  ])

df = spark.createDataFrame(data=data,schema=schema)

manager = Manager()
used_codes = manager.dict()

def generate_random_code():
        random_number = random.randint(10000,90000)
        return random_number

def get_valid_code(used_codes, code):
    if(code != "" and code not in used_codes.keys()):
        used_codes[code] = 1 
        return code
    new_code = generate_random_code()
    while (new_code in used_codes.keys()):
        new_code = generate_random_code() 
    used_codes[new_code] = 2
    return new_code

get_valid_code_udf = F.udf(lambda code: get_valid_code(used_codes, code), T.StringType())

manager = Manager()
used_codes = manager.dict()

p1 = Process(target=get_valid_code, args=(used_codes, code))
p1.start()

def run_df(df):
    print(f"running df")
    df = spark.createDataFrame(data=data,schema=schema)
    new_df = df.withColumn("code_id", get_valid_code_udf('id')) 
    p1.join()
    return new_df.show()

>>> df.show()
+---------+-----+                                                               
|firstname|   id|
+---------+-----+
|    James|36636|
|  Michael|36636|
|   Robert|42114|
|    Maria|39192|
|      Jen|39192|
+---------+-----+

#This is the ideal outcome where new codes have been generated for the repeats. 
>>> new_df.show()
+---------+-----+-------+
|firstname|   id|code_id|
+---------+-----+-------+
|    James|36636|  36636|
|  Michael|36636|  63312|
|   Robert|42114|  42114|
|    Maria|39192|  39192|
|      Jen|39192|  76399|
+---------+-----+-------+

错误信息

>>> run_df(df)
running df
[Stage 8:>                                                          (0 + 1) / 1]21/10/20 19:48:23 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 8, ip-10-0-7-132.ec2.internal, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/worker.py", line 366, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/worker.py", line 241, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/worker.py", line 168, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1634739266145_0001/container_1634739266145_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 587, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/usr/lib64/python3.7/multiprocessing/managers.py", line 943, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
  File "/usr/lib64/python3.7/multiprocessing/managers.py", line 793, in __init__
    self._incref()
  File "/usr/lib64/python3.7/multiprocessing/managers.py", line 847, in _incref
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 620, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

4

0 回答 0