问题标签 [dask-dataframe]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
187 浏览

python - 在 Dask Dataframe 上使用 set_index() 并写入 parquet 会导致内存爆炸

我有一大堆 Parquet 文件,我试图按列排序。未压缩的数据约为 14Gb,因此 Dask 似乎是适合这项工作的工具。我对 Dask 所做的只是:

  1. 读取镶木地板文件
  2. 对其中一列进行排序(称为“朋友”)
  3. 在单独的目录中写入 parquet 文件

如果没有 Dask 进程(只有一个,我正在使用同步调度程序)耗尽内存并被杀死,我无法做到这一点。这让我很吃惊,因为没有一个分区未压缩超过约 300 mb。

我编写了一个小脚本来使用我的数据集越来越大的部分来分析 Dask,并且我注意到 Dask 的内存消耗随着输入的大小而变化。这是脚本:

以下是电话会议产生的图表visualize()

输入限制 = 2

使用 input_limit=2

输入限制 = 4

使用 input_limit=4

输入限制 = 8

使用 input_limit=8

输入限制 = 16

使用 input_limit=16

完整的数据集是大约 50 个输入文件,因此以这种增长速度,我并不感到惊讶,因为工作会占用我 32gb 机器上的所有内存。

我的理解是,Dask 的全部意义在于允许您对大于内存的数据集进行操作。我的印象是人们正在使用 Dask 处理比我的 ~14gb 大得多的数据集。他们如何通过扩展内存消耗来避免这个问题?我在这里做错了什么?

在这一点上,我对使用不同的调度程序或并行性不感兴趣。我只是想知道为什么 Dask 消耗的内存比我认为必要的多得多。

0 投票
1 回答
221 浏览

python - Dask 客户端 + read_sql_table:distributed.protocol.core - 严重 - 无法序列化

尝试read_sql_table使用 a运行dask.distributed Client,其中表是一个查询,因为我不想获取整个表。下面的例子。

对于print(response),我得到了预期的输出:

但是,当我必须使用dask.dataframelikeprint(response.head())时,出现Failed to Serialize错误:

如果我不创建最奇怪的部分client,那么数据帧会被计算更正:

另外,如果我阅读了整个表,而不用查询它与客户端一起工作:


附加信息

版本:

client.get_versions(check=True)

schema班级

0 投票
1 回答
510 浏览

python-3.x - Dask:根据多个条件屏蔽数据帧以执行选择性计算

我希望在使用 dask 时替换满足多个条件的行上的值。我将执行替换的预设值存在于一列中,如果满足条件,那么我将用预设值替换目标值。

如果可能,由于在移动数据帧时的内存限制,我想留在 dask 中而不是使用另一个库执行此操作。

目前,我正在尝试使用 .mask 命令。

如果 GrassDeadFMC >= 12 且 Windspeed <= 10,则使 GrassFMCoefficient 等于 GFMG12L10 中的值。 ddf['GrassFMCoefficient'] = ddf['GFMG12L10'].mask(ddf['GrassDeadFMC'] >= 12 & ddf['WindSpeed'] <= 10)

我收到的错误是:

一个最小的可执行脚本,它给出了一个稍微不同的错误,但我猜可能会遇到同样的问题。

对此的任何帮助将不胜感激。

0 投票
1 回答
26 浏览

pandas - Dask .loc 只有第一个结果(iloc[0])

示例 dask 数据框:

现在我只想得到第一个(基于索引)结果 - 就像在熊猫中一样:

我知道在 dask using 中没有位置行索引iloc,但我想知道是否可以像 SQL 那样将查询限制为 1 个结果

0 投票
0 回答
105 浏览

python - Dask - 如何有效地执行正确数量的任务

我正在尝试屏蔽然后unique对一列应用操作。下面报告了我正在使用的代码的简化版本:

这个快速示例运行良好。我的真实数据由~5000列组成,其中一列用于过滤,一列用于获取唯一 ID。数据存储在200parquet 分区中,每个分区的权重为 9MB,但在加载到内存时 ( ddf.get_partition(0).compute().info()) 权重~5GB。鉴于我有400GBRAM,我会假设我可以在80分区周围加载(考虑到其他操作的开销可能会更少)。从仪表板上我可以看到 dask 正在尝试一次执行所有任务(在内存中任务总是相同的,不管有多少工人)。

我写这个是为了测试处理一个分区所花费的时间:

它需要周围60s,它需要周围7GB的 RAM。如果我启动 ProcessPool 并假设我一次只运行50分区,则需要4-5几分钟。

我知道 Dask 的核心与我对单个分区所做的完全一样,所以我的问题是为什么 Dask 会尝试并行执行所有任务而不是一次执行一个任务?有没有办法限制任务执行?这是这里真正的问题还是我错过了什么?

我在这里发现了几个问题来限制任务执行。这里的所有要点:https ://distributed.dask.org/en/latest/resources.html 。但是,我相信我不应该强迫这种行为,让 Dask 尽力而为。我还应该提到,Dask 能够在单线程中设置 5 个工作人员时运行代码,每个工作人员具有 80GB 的 RAM(但这比我提到的进程池方法花费的时间要多得多)。

我在 python3.6.10和 Dask上2.17.2

0 投票
1 回答
88 浏览

python-3.x - Dask - 在 map_partition 调用上返回一个 dask.dataframe

我想知道当我调用 a 而不是 pd.Dataframe 时如何返回 dask Dataframemap_partitions以避免内存问题。

输入数据框

预期输出map_partitions

如果我返回 pd.Dataframe,我已经完成了以下功能并且正在工作。但是,如果我返回一个 dask.dataframe,*** AssertionError则会引发

我正在调用函数如下

0 投票
0 回答
196 浏览

python - Dask:“DataFrame”对象没有属性“_meta”

我尝试连接 Ms SQL 服务器并将数据框加载到 SQL 服务器中,同时连接时我不断收到“无属性 '_meta'”。

我是 Dask Dataframe 的新手,有人可以帮帮我。这将非常有帮助。

询问:

错误:

0 投票
2 回答
242 浏览

pandas - 是否可以将 Series.str.extract 与 Dask 一起使用?

我目前正在使用 Pandas 处理一个大型数据集,我必须使用pandas.Series.str.extract. 它看起来像这样:

但是,它运行良好,因为它必须执行大约十次(使用各种源列),性能不是很好。为了通过使用多个内核来提高性能,我想尝试 Dask,但它似乎不受支持(我在 dask 的文档中找不到对提取方法的任何引用)。

有没有办法并行执行这样的 Pandas 动作?我发现了这种方法,您基本上将数据帧拆分为多个数据帧,为每个子帧创建一个进程,然后将它们连接回来。

0 投票
0 回答
1764 浏览

dask - 镶木地板文件中的 Dask Dataframe:OSError:无法反序列化节俭:TProtocolException:无效数据

我正在生成一个 Dask 数据帧,以在 dask-ml 提供的聚类算法中下游使用。在我管道的上一步中,我使用 读取磁盘中的数据帧,使用dask.dataframe.read_parquet应用转换来添加列map_partitions,然后使用 将生成的数据帧写回磁盘dask.dataframe.to_parquet。再次读取并compute()调用生成的数据帧时会出现此问题。

运行以下代码:

产生以下回溯:

环境是 Amazon Linux 2,Python 3.7.9,dask == 2.30.0,pyarrow == 2.0.0,pandas == 1.1.5,numpy == 1.19.4。dask 数据帧由 404 列组成,从大约 14,000 个 parquet 文件(分区)中读取。其中四列包含 type 项object(三列包含字符串,一列包含字符串的嵌套列表),其余 400 列包含 type float64

0 投票
1 回答
195 浏览

python - Dask 在进行归约时如何分散数据

我正在使用 Dask 进行复杂的操作。首先我做了一个减少,产生一个中等大小的df(几MB),然后我需要将它传递给每个工人来计算最终结果,所以我的代码看起来有点像这样

但是我收到看起来像这样的警告消息

我试过这样做

但这不起作用,因为函数现在将其视为 Future 对象,而不是它应该是的数据类型。我似乎找不到任何关于如何使用 scatter 减少的文档,有人知道如何做到这一点吗?还是我应该忽略警告消息并像我一样传递中等大小的df?