我试图让表达式 'df.resample('1T', how='mean').sum()' 在 Dask 中工作,但是遇到了一个问题,似乎 Dask 需要我在执行重采样之前的 DataFrame。我收到如下错误...
>>> c.gather(df).compute()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/site-packages/distributed/client.py", line 1508, in gather
asynchronous=asynchronous)
File "/usr/local/lib/python2.7/site-packages/distributed/client.py", line 615, in sync
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/distributed/utils.py", line 253, in sync
six.reraise(*error[0])
File "/usr/local/lib/python2.7/site-packages/distributed/utils.py", line 238, in f
result[0] = yield make_coro()
File "/usr/local/lib64/python2.7/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/usr/local/lib64/python2.7/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "/usr/local/lib64/python2.7/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/usr/local/lib/python2.7/site-packages/distributed/client.py", line 1385, in _gather
traceback)
File "/usr/local/lib/python2.7/site-packages/dask/dataframe/core.py", line 1633, in resample
return _resample(self, rule, how=how, closed=closed, label=label)
File "/usr/local/lib/python2.7/site-packages/dask/dataframe/tseries/resample.py", line 33, in _resample
return getattr(resampler, how)()
File "/usr/local/lib/python2.7/site-packages/dask/dataframe/tseries/resample.py", line 151, in mean
return self._agg('mean')
File "/usr/local/lib/python2.7/site-packages/dask/dataframe/tseries/resample.py", line 126, in _agg
meta_r = self.obj._meta_nonempty.resample(self._rule, **self._kwargs)
File "/usr/local/lib64/python2.7/site-packages/pandas/core/generic.py", line 7104, in resample
base=base, key=on, level=level)
File "/usr/local/lib64/python2.7/site-packages/pandas/core/resample.py", line 1148, in resample
return tg._get_resampler(obj, kind=kind)
File "/usr/local/lib64/python2.7/site-packages/pandas/core/resample.py", line 1276, in _get_resampler
"but got an instance of %r" % type(ax).__name__)
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex, but got an instance of 'Index'
下面是我正在使用的 python 代码。由于我的延迟对象返回的 pandas DF 已经被时间戳索引,我的期望是 Dask 从这些 DF 的时间戳索引推断/构造一个索引,而不是我必须显式设置一个。虽然,我不确定在这种情况下如何调用显式 set_index (要传递的参数是什么?)。在元数据框(如下注释行)上设置 pd.DatetimeIndex 有效。手动构建索引并将其提供给 meta 是唯一现实的方法吗?我错过了什么吗?
#! /usr/bin/env python
# Start dask scheduler and workers
# dask-scheduler &
# dask-worker --nthreads 1 --nprocs 6 --memory-limit 3GB localhost:8786 --local-directory /dev/shm &
from dask.distributed import Client
from dask.delayed import delayed
import pandas as pd
import numpy as np
import dask.dataframe as dd
import time
c = Client('127.0.0.1:8786')
def load(epoch):
# 1525132800 - 1/5
# 1527811200 - 1/6
num_ts=100
idx = []
for ts in range(0, 86400, 15):
idx.append(epoch + ts)
d = np.random.rand(86400/15, num_ts)
ts = []
for i in range(0, num_ts):
# tsname = "ts_%s_%s" % (i, epoch)
tsname = "ts_%s" % (i)
ts.append(tsname)
gts.append(tsname)
res = pd.DataFrame(index=idx, data=d, columns=ts, dtype=np.float64)
res.index = pd.to_datetime(arg=res.index, unit='s')
return res
gts = []
load(1525132800)
print time.time()
i = pd.DatetimeIndex(start=1525132800, freq='15S', end=1527811185, dtype='datetime64[s]')
# meta = pd.DataFrame(index=i, data=[], columns=gts, dtype=np.float64)
meta = pd.DataFrame(index=[], data=[], columns=gts, dtype=np.float64)
dfs = [delayed(load)(fn) for fn in range(1525132800, 1527811200, 86400)]
print time.time()
df = dd.from_delayed(dfs, meta, 'sorted')
print time.time()
df.npartitions
df.divisions
print time.time()
df = c.submit(dd.DataFrame.resample, df, rule='1T', how='mean')
print time.time()
#df = c.submit(dd.DataFrame.sum, df, axis=1)
print time.time()
c.gather(df).compute()
print time.time()
#c.gather(df).visualize(filename='/usr/share/nginx/html/svg/df4.svg')