4

我正在尝试实现一个时间折叠函数,以“映射”到一个 dask 数据帧的各个分区,这反过来又改变了相关数据帧的形状(或者生成一个改变形状的新数据帧)。这就是我已经走了多远。在计算上返回的结果“res”是一个包含 3 个延迟对象的列表。当我尝试在循环中计算它们中的每一个(最后两行代码)时,这会导致“TypeError:'DataFrame' object is not callable”在浏览完map_partitions 的示例之后,我还尝试更改输入 DF(就地) 在没有返回值的函数中,这会导致与 NoneType 类似的 TypeError。我错过了什么?

此外,查看可视化(附加)我觉得有必要将单独计算(折叠)的分区减少到单个 DF 中。我该怎么做呢?

#! /usr/bin/env python

# Start dask scheduler and workers
# dask-scheduler &
# dask-worker --nthreads 1 --nprocs 6 --memory-limit 3GB localhost:8786 --local-directory /dev/shm &

from dask.distributed import Client
from dask.delayed import delayed
import pandas as pd
import numpy as np
import dask.dataframe as dd
import math

foldbucketsecs=30
periodicitysecs=15
secsinday=24 * 60 * 60
chunksizesecs=60 # 1 minute
numts = 5
start = 1525132800 # 01/05
end = 1525132800 + (3 * 60) # 3 minute

c = Client('127.0.0.1:8786')

def fold(df, start, bucket):
    return df

def reduce_folds(df):
    return df

def load(epoch):
    idx = []
    for ts in range(0, chunksizesecs, periodicitysecs):
        idx.append(epoch + ts)
    d = np.random.rand(chunksizesecs/periodicitysecs, numts)
    ts = []
    for i in range(0, numts):
        tsname = "ts_%s" % (i)
        ts.append(tsname)
        gts.append(tsname)
    res = pd.DataFrame(index=idx, data=d, columns=ts, dtype=np.float64)
    res.index = pd.to_datetime(arg=res.index, unit='s')
    return res

gts = []
load(start)
cols = len(gts)

idx1 = pd.DatetimeIndex(start=start, freq=('%sS' % periodicitysecs), end=start+periodicitysecs, dtype='datetime64[s]')
meta = pd.DataFrame(index=idx1[:0], data=[], columns=gts, dtype=np.float64)
dfs = [delayed(load)(fn) for fn in range(start, end, chunksizesecs)]

from_delayed = dd.from_delayed(dfs, meta, 'sorted')

nfolds = int(math.ceil((end - start)/foldbucketsecs))
cprime = nfolds * cols

gtsnew = []

for i in range(0, cprime):
    gtsnew.append("ts_%s,fold=%s" % (i%cols, i/cols))

idx2 = pd.DatetimeIndex(start=start, freq=('%sS' % periodicitysecs), end=start+foldbucketsecs, dtype='datetime64[s]')
meta = pd.DataFrame(index=idx2[:0], data=[], columns=gtsnew, dtype=np.float64)
folded_df = from_delayed.map_partitions(delayed(fold)(from_delayed, start, foldbucketsecs), meta=meta)
result = c.submit(reduce_folds, folded_df)

c.gather(result).visualize(filename='/usr/share/nginx/html/svg/df4.svg')

res = c.gather(result).compute()

for f in res:
    f.compute()
4

1 回答 1

0

没关系!这是我的错,而不是延迟包装我的函数,我只是像这样将它传递给 map_partitions 调用并且它起作用了。

folded_df = from_delayed.map_partitions(fold, start, foldbucketsecs, nfolds, meta=meta)

于 2018-07-05T10:16:10.107 回答