问题标签 [dask-dataframe]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 从一个源计算多个 dask.dataframe.from_delayed()
如何.from_delayed()
从一个延迟序列并行计算?
这里foo()
将被调用10次。是否可以创建图形以便仅调用5次?
谢谢
dask - Dask - 从 SQL 加载数据帧而不指定 index_col
我正在尝试从 SQL 连接加载 Dask 数据帧。根据read_sql_table 文档,有必要传入 index_col。如果可能没有好的列作为索引,我该怎么办?
这可能是一个合适的替代品吗?
python - 如何读取大型 CSV 文件、添加多维并将每小时转换为每天?
我有大型 CSV 文件,这些文件以 5 公里网格的每小时分辨率表示美国的天气数据。每天都会保存数据,因此我将它们连接在一起作为年度文件。最终目标是按纬度和经度计算变量(风速、温度、降水、压力等)的每日、每周和每月平均值。没有列标题,所以我在读入文件时将列名添加到文件中。
当我尝试在 Python 中使用 Pandas 读取时,它失败了,因为它不适合内存。我可以使用 Dask 阅读,但是我找不到将维度添加到 Dask 数据框或转换为 xarray 并执行相同操作的方法。有没有办法读取这些太大的内存文件,添加纬度、经度、日期时间维度,计算每个纬度/经度的每日、每周和每月平均值并输出文件?或者,我是否需要在读入之前将 csv 转换为 netCDF 或类似的?
pandas - 来自 read_csv 的 dask 数据帧使用 python 引擎不可靠的行为
我的数据是一个 10GB 的文件,格式如下:
笔记:
- 可以有任意数量的 [ key = value ] 块。
- 字符
[
和]
值本身,例如:[ hello = wo[rld] ]
- 我无法控制 abinput 文件,除非我可以在我的脚本中更改/处理它。
- 我只需要几列,但是它们有字符
[
和]
值。
在我的简单for line in f:
功能中,我可以按' ][ '
模式拆分。但是考虑到文件的大小,dask 非常有利可图。
我知道engine='c'
我不能使用多字符分隔符,但切换到engine='python'
会导致不可预测的结果。这是一个例子:
上面的代码如预期的那样导致ParserError: Too many columns specified: expected 25 and found 24
. 这个错误很难重现,因为它只是由于一些我很难识别的特定行而发生。每次有更多列时都不会发生这种情况。所以在上面的函数中我改变了:engine="python"
和sep=" \]\[ "
。这适用于我测试的小样本数据。但在 10G 文件中,我得到以下不可预测的行为:
更多示例:
dask - Dask 计算(以捕获错误),但将结果保留在工作人员身上
我想定义一组操作并在 dask 工作人员上运行它们,并捕获可能出现的任何错误,而无需调用计算并将结果提供给客户端。
例如,如果我在对象列上执行 astype(int),这应该会给我一个错误。当我使用 client.persist 时,这不会给我一个错误。
但是,client.compute 确实会引发此错误:
有没有办法像在计算中一样立即获取错误,但不强制 Dask 将结果返回给客户端?
dask - 代码非常慢而且几乎永无止境 - Dask 数据框将列分配给字典
我有 1M dask 数据框行。我正在尝试将字典值分配给数据框的新列。
实现这一目标的最佳方法是什么?
PS:我正在尝试使用进度条来跟踪进度,其中代码真的快到 67% 并且挂断了。我的 CPU 和内存消耗得非常少,但代码正在运行,感觉好像永远不会完成。
dask - Dask 多个客户
是否可以有多个客户在 dask 中?例如,我可以让多个线程在每个线程一个客户端运行,这样当一个线程阻塞时,其他线程可以继续吗?在这种情况下,每个客户端都会有独立的任务图,它们不相互依赖。
作为后续问题,如果这是可能的,那么我如何指定在哪里运行特定任务?当我执行 dd.read_csv,然后调用计算时,我如何知道哪个客户端及其关联的调度程序/工作人员正在执行此操作?
python - Dask pivot_table 与列类型对象与列类型类别
除非我的对象类型列转换为分类列,否则我无法旋转 dask 数据框。
我很难理解为什么 dask 要求我将列的类型专门设置为 acategory
而不是也接受object (aka dtype('O') )
.
它在熊猫中完美运行。
dask - 调用 to_csv 时出现 Dask DataFrame MemoryError
我目前正在通过以下方式使用 Dask ......
- S3 上有以下格式的文件列表:
<day1>/filetype1.gz
<day1>/filetype2.gz
<day2>/filetype1.gz
<day2>/filetype2.gz
...ETC
我的代码读取所有文件
filetype1
并建立一个数据框并设置索引(例如df1 = ddf.read_csv(day1/filetype1.gz, blocksize=None, compression='gzip').set_index(index_col)
:)读取所有文件
filetype2
并建立一个大数据框(类似于上面)。通过 将两个数据帧合并在一起
merged_df = ddf.merge(df1, df2, how='inner', left_index=True, right_index=True)
。通过以下方式将结果写入 S3:
merged_df.to_csv(<s3_output_location>)
注意:这里的目标实际上是在特定日期内合并(即,合并给定日期的 filetype1 和 filetype2),每天重复,并存储所有这些连接的并集,但看起来就像一天做连接一次不会利用并行性,让 Dask 管理更大的连接会更有性能。我认为 Dask 会根据文档中的以下行(https://docs.dask.org/en/latest/dataframe-joins.html)以内存感知方式管理更大的连接:
如果找不到足够的内存,那么 Dask 将不得不读写数据到磁盘,这可能会导致其他性能成本。
我看到 aMemoryError
发生在对 的调用中to_csv
。我猜这是因为to_csv
callscompute
尝试计算连接的完整结果,然后尝试存储该结果。完整的文件内容当然无法放入内存,但我认为(希望)Dask 会运行计算并以内存感知的方式存储生成的 Dataframe。关于我应该如何使用 Dask 来完成我想做的事情的任何指导或建议?提前致谢。
python - 向 dask 数据帧添加新列会引发 ValueError:值的长度与索引的长度不匹配
我明白这个追溯
ValueError: Length of values does not match length of index
产生于在 中或相同操作期间一个dataframe
比另一个长或短 的事实。dataframe
ddf.assign(new_col=ts_col
ddf['ts_col'] = ts_col
问题是,我看不出长度有何不同 - 用代码解释: