0

我正在尝试使用. dask.dataframe原始数据框有列'date'、'ticker'、'open'、'close'等...

我的目标是创建一个新的数据框,其中索引“日期”和列作为每个唯一代码的收盘价。

以下代码可以解决问题,但速度很慢,使用了将近一分钟的时间N = 6。我怀疑 dask 尝试在 for 循环中多次读取 CSV 文件,但我不知道如何让这更快。我最初的猜测是在df.groupby('ticker')某个地方使用会有所帮助,但我对熊猫还不够熟悉。

import dask.dataframe as dd
from functools import reduce

def load_and_fix_csv(path: str, N: int, tickers: list = None) -> dd.DataFrame:
    raw = dd.read_csv(path, parse_dates=["date"])
    if tickers is None:
        tickers = raw.ticker.unique().compute()[:N] # Get unique tickers
    dfs = []
    for tick in tickers:
        tmp = raw[raw.ticker == tick][["date", "close"]] # Temporary dataframe from specific ticker with columns date, close
        dfs.append(tmp)
    df = reduce(lambda x, y: dd.merge(x, y, how="outer", on="date"), dfs) # Merge all dataframes on date
    df = df.set_index("date").compute()
    return df

感谢您提供各种帮助!谢谢你。

4

1 回答 1

1

我很确定你是对的,Dask 很可能会在每个循环中“回到井中”。这是因为 Dask 构建了一个操作图并尝试将计算推迟到强制或必要时。我喜欢做的一件事是通过以下方式减少图形的读取操作Client.persist

from distributed import Client

client = Client()


def persist_load_and_fix_csv(path: str, N: int, tickers: list = None) -> dd.DataFrame:
    raw = dd.read_csv(path, parse_dates=["date"])

    # This "cuts the graph" prior operations (just the `read_csv` here)
    raw = client.persist(raw)
    if tickers is None:
        tickers = raw.ticker.unique().compute()[:N] # Get unique tickers
    dfs = []
    for tick in tickers:
        tmp = raw[raw.ticker == tick][["date", "close"]] # Temporary dataframe from specific ticker with columns date, close
        dfs.append(tmp)
    df = reduce(lambda x, y: dd.merge(x, y, how="outer", on="date"), dfs) # Merge all dataframes on date
    df = df.set_index("date").compute()
    return df

在一次 Kaggle 会议中,我测试了这两个函数,persist_load_and_fix_csv(csv_path, N=3)并设法将时间缩短了一半。通过仅保留最终使用的列,您还将获得更好的性能。

注意:我发现,至少对我和我的代码而言,如果我开始看到.compute()函数中出现我应该退后一步并重新评估代码路径;我将其视为代码异味

于 2020-07-08T20:49:55.463 回答