0

我想使用调用 boto3 api 的函数创建一个池,并为每个线程使用不同的存储桶名称:

我的功能是:

def face_reko(source_data, target_data):

        bucket = s3.Bucket(bucket_name)
    for key in bucket.objects.all():
        key.delete()

    s3_client.put_object(Bucket=bucket_name, Key=target_img, Body=target_data)
    s3_client.put_object(Bucket=bucket_name, Key=source_img, Body=source_data)

    response = reko.compare_faces(
        SourceImage={
            'S3Object': {
                'Bucket': bucket_name,
                'Name' : source_img
            }
        },
        TargetImage={
            'S3Object' : {
                'Bucket' : bucket_name,
                'Name' : target_img
            }
        }
    )
    if len(response['FaceMatches']) > 0:
        return True
    else:
        return False

所以基本上它会删除存储桶中的所有内容,上传 2 个新图像,然后使用 Rekognition api 比较这 2 个图像。由于我不能在同一个存储桶中两次创建相同的图像,我想为每个线程创建一个存储桶,然后将一个常量传递给存储桶名称的函数而不是bucket_nameconst。

4

1 回答 1

0

所以最后我已经能够找到解决我的问题的方法。我没有将我的函数映射到一个池,而是创建了一个Worker这样定义的类:

class Worker():
def __init__(self, proc_number, splited_list):
    self.pool = Pool(proc_number)
    self.proc_number = proc_number
    self.splited_list = splited_list

def callback(self, result):
    if result:
        self.pool.terminate()

def do_job(self):
    for i in range(self.proc_number):
        self.pool.apply_async(face_reko, args=(self.splited_list[i],source_data, i), callback=self.callback)
    self.pool.close()
    self.pool.join()

所以Workerobj 是由多个进程和一个列表列表构成的(splitted_list我的主列表是拆分的number_of_proc)。然后,当do_job调用函数时,池首先创建具有i可在face_reko函数中使用的 id 的进程。face_reko返回时池停止True。要启动Worker.pool,只需创建一个Worker并调用该do_job函数,如下所示:

w = Worker(proc_number=proc_number, splited_list=splited_list)
w.do_job()

希望它会帮助别人!

于 2017-11-17T13:59:15.067 回答