4

我正在尝试使用 dask(async) 框架执行一个简单的任务(一个实例方法),但它因序列化错误而失败。

有人可以指出我正确的方向。

这是我正在运行的代码:

from dask.distributed import Client, as_completed
import time

class DaskConnect:

def __init__(self):
    print("Initialized:",self.__class__.__name__)
    self.scheduler_host="192.168.0.4"
    self.scheduler_port="8786"

def connect(self):
    self.client = Client(self.scheduler_host+":"+self.scheduler_port)
    # self.client = Client()
    return self.client

def disconnect(self):
    self.client.close()

class TestDask:
def __init__(self):
    print("Initialized:",self.__class__.__name__)
    self.dask_client=DaskConnect().connect()

def do_task(self,msg):
    time.sleep(30)
    return msg

def run(self):
    tasks=[1]
    # tasks = [1, 2, 3, 4, 5]
    futures=[]
    for task in tasks:
        print("Submitting:",task)
        future = self.dask_client.submit(self.do_task, "Task:"+str(task))
        futures.append(future)

    for future in as_completed(futures):
        result = future.result()
        print("Result",result)

TestDask().run()

错误:

Distributed.protocol.pickle - 信息 - 无法在 0x101c408d0>> 处序列化 main.TestDask 对象。例外:无法腌制 select.kqueue 对象 Traceback(最近一次调用最后一次):

4

1 回答 1

8

Dask 客户端当前不可序列化。任何包含 Dask Client 的对象也将不可序列化。一般来说,序列化任何包含活动网络连接、锁等的东西都是一个挑战。

也许还有另一种方法可以解决您的问题?

于 2017-08-31T11:25:52.863 回答