0

参考:


需要注意的几点:

  • 我总共有 48GB 的​​内存

  • 这是我正在使用的库版本

    • Python 3.7.7
    • dask==2.18.0
    • fbprophet==0.6
    • 熊猫==1.0.3

我导入熊猫的原因仅适用于这一
pd.options.mode.chained_assignment = None
行当我使用 dask.distributed 时这有助于解决 dask 错误

所以,我有一个 21gb 的 csv 文件,我正在使用 dask 和 jupyter notebook 读取它......我试图从我的 mysql 数据库表中读取它,但是,内核最终崩溃了

我尝试了多种组合使用本地工作人员、线程和可用内存、可用 storage_memory 网络,甚至尝试完全不使用distributed。我也尝试过用 pandas 进行分块(不是上面提到的与 pandas 相关的行),但是,即使有分块,内核仍然崩溃......

我现在可以使用 dask 加载 csv,并应用一些转换,例如设置索引、添加 fbprophet 需要的列(名称)......但我仍然无法使用 计算数据帧df.compute(),因为这就是我为什么认为我收到了 fbprophet 的错误。在我使用适当的 dtypes 添加列 y 和 ds 后,我收到错误Truth of Delayed objects is not supported,我认为这是因为 fbprophet 期望数据帧不懒惰,这就是我尝试预先运行计算的原因。我还增加了客户端上的 ram 以允许它使用完整的 48gb,因为我怀疑它可能试图加载数据两次,但是,这仍然失败,所以很可能不是这种情况/不是不会导致问题。

除此之外,在 dask 的文档中还提到了 fbpropphet,用于将机器学习应用于数据帧,但是,我真的不明白为什么这不起作用......我还尝试了 modin 与 ray 和 dask,结果基本相同。

另一个问题...关于内存使用情况 distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 32.35 GB -- Worker memory limit: 25.00 GB ,我在分配客户端、读取 csv 文件以及对数据帧应用操作/转换时遇到此错误,但是分配的大小大于 csv 文件本身,所以这让我感到困惑.. .

我自己做了什么来尝试解决这个问题:-谷歌搜索当然没有找到任何东西:-/-多次询问不和谐的帮助频道-多次询问 IIRC 帮助频道

无论如何,非常感谢您对这个问题的任何帮助!!!先感谢您 :)

MCVE

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from fbprophet import Prophet

pd.options.mode.chained_assignment = None
client = Client(n_workers=2, threads_per_worker=4, processes=False, memory_limit='4GB')
csv_file = 'provide_your_own_csv_file_here.csv'
df = dd.read_csv(csv_file, parse_dates=['Time (UTC)'])
df = df.set_index('Time (UTC)')
df['y'] = df[['a','b']].mean(axis=1)
m = Prophet(daily_seasonality=True)
m.fit(df)
# ERROR: Truth of Delayed objects is not supported
4

2 回答 2

0

如前所述,一种方法是使用pandasdask.delayedDataFrame跳过dask.dataframe

您可以使用为使用 Dask 进行自定义计算显示的load管道的clean简化analyze版本。

这是一种基于此类自定义管道的可能方法,使用小型数据集(创建 MCVE) - 管道中的每一步都将被延迟

进口

import numpy as np
import pandas as pd
from dask import delayed
from dask.distributed import Client
from fbprophet import Prophet

在 a 中生成一些数据.csv,其中包含列名Time (UTC)a并且b

def generate_csv(nrows, fname):
    df = pd.DataFrame(np.random.rand(nrows, 2), columns=["a", "b"])
    df["Time (UTC)"] = pd.date_range(start="1850-01-01", periods=nrows)
    df.to_csv(fname, index=False)

首先从管道中编写函数,用 Pandasload加载,并使用装饰器延迟其执行.csvdask.delayed

  • 可以很好地用于read_csv查看nrows管道如何在数据子集上执行,而不是全部加载
  • 这将返回一个dask.delayed对象而不是pandas.DataFrame
@delayed
def load_data(fname, nrows=None):
    return pd.read_csv(fname, nrows=nrows)

现在创建process函数,使用 处理数据pandas,再次延迟,因为它的输入是dask.delayed对象而不是pandas.DataFrame

@delayed
def process_data(df):
    df = df.rename(columns={"Time (UTC)": "ds"})
    df["y"] = df[["a", "b"]].mean(axis=1)
    return df

最后一个函数 - 这个函数将训练fbprophet数据(从加载.csv并处理,但延迟)进行预测。这个analyze函数也被延迟了,因为它的一个输入是一个dask.delayed对象

@delayed
def analyze(df, horizon):
    m = Prophet(daily_seasonality=True)
    m.fit(df)
    future = m.make_future_dataframe(periods=horizon)
    forecast = m.predict(future)
    return forecast

运行管道(如果从 Python 脚本运行,需要 __name__ == "__main__"

  • 管道的输出(由 预测fbprophet)存储在一个变量中result,该变量被延迟
    • 当计算这个输出时,这将生成一个pandas.DataFrame(对应于预测的输出fbprophet),因此可以使用result.compute()
if __name__ == "__main__":
    horizon = 8
    num_rows_data = 40
    num_rows_to_load = 35
    csv_fname = "my_file.csv"

    generate_csv(num_rows_data, csv_fname)

    client = Client()  # modify this as required

    df = load_data(csv_fname, nrows=num_rows_to_load)
    df = process_data(df)
    result = analyze(df, horizon)
    forecast = result.compute()

    client.close()

    assert len(forecast) == num_rows_to_load + horizon
    print(forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].head())

输出

          ds      yhat  yhat_lower  yhat_upper
0 1850-01-01  0.330649    0.095788    0.573378
1 1850-01-02  0.493025    0.266692    0.724632
2 1850-01-03  0.573344    0.348953    0.822692
3 1850-01-04  0.491388    0.246458    0.712400
4 1850-01-05  0.307939    0.066030    0.548981
于 2020-06-15T16:45:47.410 回答
0

不幸的是,Prophet 今天不支持 Dask 数据帧。

您提到的示例显示使用 Dask 加速 Prophet 在 Pandas 数据帧上的拟合。Dask Dataframe 只是人们使用 Dask 的一种方式。

于 2020-06-13T15:56:44.057 回答