问题标签 [dask-dataframe]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
370 浏览

python - 从一个源计算多个 dask.dataframe.from_delayed()

如何.from_delayed()从一个延迟序列并行计算?

这里foo()将被调用10次。是否可以创建图形以便仅调用5次?

谢谢

0 投票
1 回答
313 浏览

dask - Dask - 从 SQL 加载数据帧而不指定 index_col

我正在尝试从 SQL 连接加载 Dask 数据帧。根据read_sql_table 文档,有必要传入 index_col。如果可能没有好的列作为索引,我该怎么办?

这可能是一个合适的替代品吗?

0 投票
1 回答
67 浏览

python - 如何读取大型 CSV 文件、添加多维并将每小时转换为每天?

我有大型 CSV 文件,这些文件以 5 公里网格的每小时分辨率表示美国的天气数据。每天都会保存数据,因此我将它们连接在一起作为年度文件。最终目标是按纬度和经度计算变量(风速、温度、降水、压力等)的每日、每周和每月平均值。没有列标题,所以我在读入文件时将列名添加到文件中。

当我尝试在 Python 中使用 Pandas 读取时,它失败了,因为它不适合内存。我可以使用 Dask 阅读,但是我找不到将维度添加到 Dask 数据框或转换为 xarray 并执行相同操作的方法。有没有办法读取这些太大的内存文件,添加纬度、经度、日期时间维度,计算每个纬度/经度的每日、每周和每月平均值并输出文件?或者,我是否需要在读入之前将 csv 转换为 netCDF 或类似的?

0 投票
1 回答
36 浏览

pandas - 来自 read_csv 的 dask 数据帧使用 python 引擎不可靠的行为

我的数据是一个 10GB 的文件,格式如下:

笔记:

  1. 可以有任意数量的 [ key = value ] 块。
  2. 字符[]值本身,例如:[ hello = wo[rld] ]
  3. 我无法控制 abinput 文件,除非我可以在我的脚本中更改/处理它。
  4. 我只需要几列,但是它们有字符[]值。

在我的简单for line in f:功能中,我可以按' ][ '模式拆分。但是考虑到文件的大小,dask 非常有利可图。

我知道engine='c'我不能使用多字符分隔符,但切换到engine='python'会导致不可预测的结果。这是一个例子:

上面的代码如预期的那样导致ParserError: Too many columns specified: expected 25 and found 24. 这个错误很难重现,因为它只是由于一些我很难识别的特定行而发生。每次有更多列时都不会发生这种情况。所以在上面的函数中我改变了:engine="python"sep=" \]\[ "。这适用于我测试的小样本数据。但在 10G 文件中,我得到以下不可预测的行为:

更多示例:

0 投票
1 回答
191 浏览

dask - Dask 计算(以捕获错误),但将结果保留在工作人员身上

我想定义一组操作并在 dask 工作人员上运行它们,并捕获可能出现的任何错误,而无需调用计算并将结果提供给客户端。

例如,如果我在对象列上执行 astype(int),这应该会给我一个错误。当我使用 client.persist 时,这不会给我一个错误。

但是,client.compute 确实会引发此错误:

有没有办法像在计算中一样立即获取错误,但不强制 Dask 将结果返回给客户端?

0 投票
1 回答
62 浏览

dask - 代码非常慢而且几乎永无止境 - Dask 数据框将列分配给字典

我有 1M dask 数据框行。我正在尝试将字典值分配给数据框的新列。

实现这一目标的最佳方法是什么?

PS:我正在尝试使用进度条来跟踪进度,其中代码真的快到 67% 并且挂断了。我的 CPU 和内存消耗得非常少,但代码正在运行,感觉好像永远不会完成。

0 投票
1 回答
726 浏览

dask - Dask 多个客户

是否可以有多个客户在 dask 中?例如,我可以让多个线程在每个线程一个客户端运行,这样当一个线程阻塞时,其他线程可以继续吗?在这种情况下,每个客户端都会有独立的任务图,它们不相互依赖。

作为后续问题,如果这是可能的,那么我如何指定在哪里运行特定任务?当我执行 dd.read_csv,然后调用计算时,我如何知道哪个客户端及其关联的调度程序/工作人员正在执行此操作?

0 投票
0 回答
60 浏览

python - Dask pivot_table 与列类型对象与列类型类别

除非我的对象类型列转换为分类列,否则我无法旋转 dask 数据框。

我很难理解为什么 dask 要求我将列的类型专门设置为 acategory而不是也接受object (aka dtype('O') ).

它在熊猫中完美运行。

0 投票
1 回答
362 浏览

dask - 调用 to_csv 时出现 Dask DataFrame MemoryError

我目前正在通过以下方式使用 Dask ......

  • S3 上有以下格式的文件列表:

<day1>/filetype1.gz

<day1>/filetype2.gz

<day2>/filetype1.gz

<day2>/filetype2.gz

...ETC

  • 我的代码读取所有文件filetype1并建立一个数据框并设置索引(例如df1 = ddf.read_csv(day1/filetype1.gz, blocksize=None, compression='gzip').set_index(index_col):)

  • 读取所有文件filetype2并建立一个大数据框(类似于上面)。

  • 通过 将两个数据帧合并在一起merged_df = ddf.merge(df1, df2, how='inner', left_index=True, right_index=True)

  • 通过以下方式将结果写入 S3:merged_df.to_csv(<s3_output_location>)

注意:这里的目标实际上是在特定日期内合并(即,合并给定日期的 filetype1 和 filetype2),每天重复,并存储所有这些连接的并集,但看起来就像一天做连接一次不会利用并行性,让 Dask 管理更大的连接会更有性能。我认为 Dask 会根据文档中的以下行(https://docs.dask.org/en/latest/dataframe-joins.html)以内存感知方式管理更大的连接:

如果找不到足够的内存,那么 Dask 将不得不读写数据到磁盘,这可能会导致其他性能成本。

我看到 aMemoryError发生在对 的调用中to_csv。我猜这是因为to_csvcallscompute尝试计算连接的完整结果,然后尝试存储该结果。完整的文件内容当然无法放入内存,但我认为(希望)Dask 会运行计算并以内存感知的方式存储生成的 Dataframe。关于我应该如何使用 Dask 来完成我想做的事情的任何指导或建议?提前致谢。

0 投票
1 回答
207 浏览

python - 向 dask 数据帧添加新列会引发 ValueError:值的长度与索引的长度不匹配

我明白这个追溯

ValueError: Length of values does not match length of index

产生于在 中或相同操作期间一个dataframe 比另一个长或短 的事实。dataframeddf.assign(new_col=ts_colddf['ts_col'] = ts_col

问题是,我看不出长度有何不同 - 用代码解释: