我有以下项目树
.
└── src
└── dask_test
├── helpers
│ ├── commandline.py
│ ├── data
│ │ ├── dataset0.json
│ │ ├── dataset1000.json
│ │ ├── dataset300.json
│ │ ├── dataset5000.json
│ │ ├── dataset500.json
│ │ ├── events_to_be_used_final_without_google.nl.json
│ │ ├── http-malware_modified.log
│ │ └── public_suffix_list.json
│ ├── datetime.py
│ ├── datetime.pyc
│ ├── __init__.py
│ ├── __init__.pyc
│ ├── math.py
│ ├── math.pyc
│ ├── pipeline.py
│ ├── queues.py
│ ├── search.py
│ ├── services.py
│ ├── sklearn.py
│ ├── splunk_format.py
│ ├── splunk.py
│ └── sqlalchemy.py
├── __init__.py
├── __init__.pyc
├── main.py
└── riskanalysis
├── iaccess
│ ├── __init__.py
│ └── metrics
│ ├── base.py
│ ├── __init__.py
│ └── profile
│ └── __init__.py
├── __init__.py
└── metrics
└── __init__.py
在我的开头,我main.py
像这样从 `dask_test.helpers.datetime' 导入一个对象
from dask_test.helpers.datetime import Timewindow
在我的主要使用下来。在我的主文件中,我定义了一些函数并将它们应用于像这样的 dask Dataframe
dataframe = transformation1(dataframe)
dataframe = transformation2(dataframe)
dataframe = transformation3(dataframe)
dataframe = transformation4(dataframe)
转换函数采用 dask.dataframe 并通过使用 apply 他们向其中添加一个新列,如下所示:
def transformation1(dataframe):
dataframe['new_column'] = dataframe.apply(make_sequence)
return dataframe
尝试使用 dask 分布式和 LocalCluster 进行计算可以正常工作:
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
client = Client(cluster)
client.persist(dataframe)
但是当打开 dask-scheduler 和 dask-workers 我收到以下消息
return pickle.loads(x) ImportError: No module named dask_test.helpers.datetime
- 第一个问题 LocalCluster 不使用酸洗?
- 所有模块都需要包含可腌制对象才能与正确分发的 dask 一起工作?
编辑:
导入 datetime 模块和 cloudpickle 似乎 datetime 是可选的
from dask_test.helpers import datetime
import cloudpickle
cloudpickle.dumps(datetime) # this works
datetime_module = cloudpickle.loads(cloudpickle.dumps(datetime)) # this also works
编辑:经过更多调查后,我在日志文件中看到了这一点:
distributed.protocol.pickle - INFO - Failed to deserialize �cpandas.core.frame
数据框
在我的主文件中,我首先创建了 pandas Dataframe,然后使用from_pandas
方法将它变成了一个 dask DataFrame。
编辑 3:我发现了问题所在,但我不明白为什么。在我的 datetime 模块中,我定义了一个名为 TimeWindow 的对象来处理时间段。我的 datajson 文件有一个带有时间戳的字段,其形式为
timestamp_since-timestamp_until
我在数据框上应用了一个函数来更改添加一个将上述内容转换为时间窗口对象的列,如下所示:
def convert_to_time_window(item):
since = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[0]))
until = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[1]))
return my_datetime.TimeWindow(tm_since=since, tm_until=until)
并在数据框上(这是一个熊猫数据框。我在创建 dask 数据框之前这样做)
dataframe['tw'] = dataframe['time_bucket'].apply(convert_to_time_window)
没有它,工人工作得很好。但是 TimeWindow 对象和实例是可腌制的