2

我有以下项目树

.
└── src
    └── dask_test
        ├── helpers
        │   ├── commandline.py
        │   ├── data
        │   │   ├── dataset0.json
        │   │   ├── dataset1000.json
        │   │   ├── dataset300.json
        │   │   ├── dataset5000.json
        │   │   ├── dataset500.json
        │   │   ├── events_to_be_used_final_without_google.nl.json
        │   │   ├── http-malware_modified.log
        │   │   └── public_suffix_list.json
        │   ├── datetime.py
        │   ├── datetime.pyc
        │   ├── __init__.py
        │   ├── __init__.pyc
        │   ├── math.py
        │   ├── math.pyc
        │   ├── pipeline.py
        │   ├── queues.py
        │   ├── search.py
        │   ├── services.py
        │   ├── sklearn.py
        │   ├── splunk_format.py
        │   ├── splunk.py
        │   └── sqlalchemy.py
        ├── __init__.py
        ├── __init__.pyc
        ├── main.py
        └── riskanalysis
            ├── iaccess
            │   ├── __init__.py
            │   └── metrics
            │       ├── base.py
            │       ├── __init__.py
            │       └── profile
            │           └── __init__.py
            ├── __init__.py
            └── metrics
                └── __init__.py

在我的开头,我main.py像这样从 `dask_test.helpers.datetime' 导入一个对象

from dask_test.helpers.datetime import Timewindow

在我的主要使用下来。在我的主文件中,我定义了一些函数并将它们应用于像这样的 dask Dataframe

dataframe = transformation1(dataframe)
dataframe = transformation2(dataframe)
dataframe = transformation3(dataframe)
dataframe = transformation4(dataframe)

转换函数采用 dask.dataframe 并通过使用 apply 他们向其中添加一个新列,如下所示:

def transformation1(dataframe):
    dataframe['new_column'] = dataframe.apply(make_sequence)
    return dataframe

尝试使用 dask 分布式和 LocalCluster 进行计算可以正常工作:

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
client = Client(cluster)
client.persist(dataframe)

但是当打开 dask-scheduler 和 dask-workers 我收到以下消息

return pickle.loads(x) ImportError: No module named dask_test.helpers.datetime

  1. 第一个问题 LocalCluster 不使用酸洗?
  2. 所有模块都需要包含可腌制对象才能与正确分发的 dask 一起工作?

编辑:

导入 datetime 模块和 cloudpickle 似乎 datetime 是可选的

from dask_test.helpers import datetime
import cloudpickle

cloudpickle.dumps(datetime)  # this works
datetime_module = cloudpickle.loads(cloudpickle.dumps(datetime)) # this also works

编辑:经过更多调查后,我在日志文件中看到了这一点:

distributed.protocol.pickle - INFO - Failed to deserialize �cpandas.core.frame

数据框

在我的主文件中,我首先创建了 pandas Dataframe,然后使用from_pandas方法将它变成了一个 dask DataFrame。

编辑 3:我发现了问题所在,但我不明白为什么。在我的 datetime 模块中,我定义了一个名为 TimeWindow 的对象来处理时间段。我的 datajson 文件有一个带有时间戳的字段,其形式为

timestamp_since-timestamp_until

我在数据框上应用了一个函数来更改添加一个将上述内容转换为时间窗口对象的列,如下所示:

def convert_to_time_window(item):
    since = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[0]))
    until = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[1]))

    return my_datetime.TimeWindow(tm_since=since, tm_until=until)

并在数据框上(这是一个熊猫数据框。我在创建 dask 数据框之前这样做)

    dataframe['tw'] = dataframe['time_bucket'].apply(convert_to_time_window)

没有它,工人工作得很好。但是 TimeWindow 对象和实例是可腌制的

4

1 回答 1

2

听起来您的 dask-worker 进程无法像您dask_test.helpers.datetime的客户端进程那样访问模块。从您描述项目的方式来看,听起来您依赖于从与文件相同的位置运行 Python 进程。您可以执行以下任一操作:

  1. 将您的软件安装为适当的 Python 模块(有关更多信息,请参阅 Python 文档)
  2. 从与客户端进程相同的目录运行 dask-worker 进程

Distributed.protocol.pickle - 信息 - 无法反序列化 �cpandas.core.frame

此错误听起来像您的 dask-worker 进程无权访问 Pandas。通常,您的 dask-worker 进程需要在任何地方都具有相同的软件环境。

验证统一的当前工作目录

要验证您的所有工作人员是否具有相同的当前工作目录,请尝试os.getcwd在所有工作人员上运行

In [6]: client.run(os.getcwd)
Out[6]: 
{'tcp://127.0.0.1:34115': '/home/foo',
 'tcp://127.0.0.1:39449': '/home/foo',
 'tcp://127.0.0.1:40322': '/home/foo',
 'tcp://127.0.0.1:41050': '/home/foo'}

os.getcwd()您可以将此与在您的 python 进程中本地调用进行比较。

于 2017-06-24T15:16:58.410 回答