2

我将尝试改写我的问题:

如何将 dask.dataframe 与 zip 之类的功能结合起来?

假设我们有一个名为“accounts.0.csv”的文件,其中包含以下数据

id,names,amount
352,Dan,4837
387,Tim,208
42,Jerry,21
129,Patricia,284

我写了这段代码

import dask.dataframe as dd
import itertools
from dask.threaded import get


df = dd.read_csv('accounts.0.csv')

dsk = {'a': (dd.read_csv,('accounts.0.csv')),       
       'b': (itertools.repeat,(True)),       
       'res': (zip, 'a'[id],'b')       
       }

get(dsk, 'res')

此代码应生成如下内容:

352, True
387, True
42 , True
129, True

我怎样才能做到这一点 ?

4

3 回答 3

0

您需要从计算内部“提升”(从 Haskell 中的 Monads 借用术语)迭代器,dask在开始任何计算之前构建任务列表,因此您需要从任何计算的“外部”获取迭代器。你的电话compute让你“走出去”,这就是为什么它起作用的原因。

我不确定一个很好的例子,因为你会做什么取决于接下来的其他任务,但作为一个不是很好但最小的例子:

import dask.imperative as di

arr = []
for col in df:
    arr.append(ddf[col].map(lambda x: (x,True)))
task = di.value([])+arr

创建一个映射到每个系列中的值的任务列表。然后使用命令式模块将所有内容包装在一个任务中——找不到更好的方法,抱歉!

然后,您可以compute将任务返回系列列表,或将其用于其他内容。

于 2015-10-21T14:35:07.327 回答
0

Zip 适用于 Python 迭代器,而不是 Pandas 或 Dask DataFrames。

要实现上面的示例,您可以使用该assign方法

熊猫

In [1]: import pandas as pd

In [2]: df = pd.DataFrame({'x': [1, 2, 3]})

In [3]: df
Out[3]: 
   x
0  1
1  2
2  3

In [4]: df.assign(y=True)
Out[4]: 
   x     y
0  1  True
1  2  True
2  3  True

dask.dataframe

In [5]: import dask.dataframe as dd

In [6]: ddf = dd.from_pandas(df, npartitions=1)

In [7]: ddf.assign(y=True).compute()
Out[7]: 
   x     y
0  1  True
1  2  True
2  3  True

通常不要将图表与数据框混合

字典样式的图表dsk = {...}不应与 dask.dataframe 对象混合。dask.dataframe 对象在内部使用图表。它们不应该被放置在其中。

于 2015-10-21T21:29:41.910 回答
0

改写问题

我将尝试将您的问题改写如下:

如何将 dask.dataframe 与自定义 dask 图结合起来?

df = dd.read_csv('myfile.csv')
dsk = {'x': (add, 1, 2)}

dataframe 是一个高级集合,dask graph 是更底层的。我们必须将其中之一提升到其他人的水平。

使用 dask 命令式

我们可以使用 dask.imperative 将自定义函数转换为高级 dsak 对象

# dsk = {'x': (inc, 1, 2)}
x = dask.do(add)(1, 2)

然后,您可以dask.compute在其中一个或两个对象上使用。

x_result = dask.compute(x)
or
df_result = dask.compute(df)
or
x_result, df_result = dask.compute(x, df)

在任何地方使用低级 dask 图

任何 DataFrame 对象的低级图和最终键都可以从.dask._keys()属性访问。

from toolz import merge
graph = merge(dsk, df.dask)  # merge both graphs together
keys = ['x', df._keys()]     # final keys to compute

x_results, df_results = get(graph, keys)

df_result = df._finalize(df_results)  # turn graph outputs back to pandas dataframe
于 2015-10-21T15:26:33.703 回答