问题标签 [dask-dataframe]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 在 Dask Dataframe 上使用 set_index() 并写入 parquet 会导致内存爆炸
我有一大堆 Parquet 文件,我试图按列排序。未压缩的数据约为 14Gb,因此 Dask 似乎是适合这项工作的工具。我对 Dask 所做的只是:
- 读取镶木地板文件
- 对其中一列进行排序(称为“朋友”)
- 在单独的目录中写入 parquet 文件
如果没有 Dask 进程(只有一个,我正在使用同步调度程序)耗尽内存并被杀死,我无法做到这一点。这让我很吃惊,因为没有一个分区未压缩超过约 300 mb。
我编写了一个小脚本来使用我的数据集越来越大的部分来分析 Dask,并且我注意到 Dask 的内存消耗随着输入的大小而变化。这是脚本:
以下是电话会议产生的图表visualize()
:
输入限制 = 2
输入限制 = 4
输入限制 = 8
输入限制 = 16
完整的数据集是大约 50 个输入文件,因此以这种增长速度,我并不感到惊讶,因为工作会占用我 32gb 机器上的所有内存。
我的理解是,Dask 的全部意义在于允许您对大于内存的数据集进行操作。我的印象是人们正在使用 Dask 处理比我的 ~14gb 大得多的数据集。他们如何通过扩展内存消耗来避免这个问题?我在这里做错了什么?
在这一点上,我对使用不同的调度程序或并行性不感兴趣。我只是想知道为什么 Dask 消耗的内存比我认为必要的多得多。
python - Dask 客户端 + read_sql_table:distributed.protocol.core - 严重 - 无法序列化
尝试read_sql_table
使用 a运行dask.distributed Client
,其中表是一个查询,因为我不想获取整个表。下面的例子。
对于print(response)
,我得到了预期的输出:
但是,当我必须使用dask.dataframe
likeprint(response.head())
时,出现Failed to Serialize
错误:
如果我不创建最奇怪的部分client
,那么数据帧会被计算更正:
另外,如果我阅读了整个表,而不用查询它与客户端一起工作:
附加信息
版本:
client.get_versions(check=True)
:
schema
班级
python-3.x - Dask:根据多个条件屏蔽数据帧以执行选择性计算
我希望在使用 dask 时替换满足多个条件的行上的值。我将执行替换的预设值存在于一列中,如果满足条件,那么我将用预设值替换目标值。
如果可能,由于在移动数据帧时的内存限制,我想留在 dask 中而不是使用另一个库执行此操作。
目前,我正在尝试使用 .mask 命令。
如果 GrassDeadFMC >= 12 且 Windspeed <= 10,则使 GrassFMCoefficient 等于 GFMG12L10 中的值。
ddf['GrassFMCoefficient'] = ddf['GFMG12L10'].mask(ddf['GrassDeadFMC'] >= 12 & ddf['WindSpeed'] <= 10)
我收到的错误是:
一个最小的可执行脚本,它给出了一个稍微不同的错误,但我猜可能会遇到同样的问题。
对此的任何帮助将不胜感激。
pandas - Dask .loc 只有第一个结果(iloc[0])
示例 dask 数据框:
现在我只想得到第一个(基于索引)结果 - 就像在熊猫中一样:
我知道在 dask using 中没有位置行索引iloc
,但我想知道是否可以像 SQL 那样将查询限制为 1 个结果?
python - Dask - 如何有效地执行正确数量的任务
我正在尝试屏蔽然后unique
对一列应用操作。下面报告了我正在使用的代码的简化版本:
这个快速示例运行良好。我的真实数据由~5000
列组成,其中一列用于过滤,一列用于获取唯一 ID。数据存储在200
parquet 分区中,每个分区的权重为 9MB,但在加载到内存时 ( ddf.get_partition(0).compute().info()
) 权重~5GB
。鉴于我有400GB
RAM,我会假设我可以在80
分区周围加载(考虑到其他操作的开销可能会更少)。从仪表板上我可以看到 dask 正在尝试一次执行所有任务(在内存中任务总是相同的,不管有多少工人)。
我写这个是为了测试处理一个分区所花费的时间:
它需要周围60s
,它需要周围7GB
的 RAM。如果我启动 ProcessPool 并假设我一次只运行50
分区,则需要4-5
几分钟。
我知道 Dask 的核心与我对单个分区所做的完全一样,所以我的问题是为什么 Dask 会尝试并行执行所有任务而不是一次执行一个任务?有没有办法限制任务执行?这是这里真正的问题还是我错过了什么?
我在这里发现了几个问题来限制任务执行。这里的所有要点:https ://distributed.dask.org/en/latest/resources.html 。但是,我相信我不应该强迫这种行为,让 Dask 尽力而为。我还应该提到,Dask 能够在单线程中设置 5 个工作人员时运行代码,每个工作人员具有 80GB 的 RAM(但这比我提到的进程池方法花费的时间要多得多)。
我在 python3.6.10
和 Dask上2.17.2
。
python-3.x - Dask - 在 map_partition 调用上返回一个 dask.dataframe
我想知道当我调用 a 而不是 pd.Dataframe 时如何返回 dask Dataframemap_partitions
以避免内存问题。
输入数据框
预期输出map_partitions
如果我返回 pd.Dataframe,我已经完成了以下功能并且正在工作。但是,如果我返回一个 dask.dataframe,*** AssertionError
则会引发
我正在调用函数如下
python - Dask:“DataFrame”对象没有属性“_meta”
我尝试连接 Ms SQL 服务器并将数据框加载到 SQL 服务器中,同时连接时我不断收到“无属性 '_meta'”。
我是 Dask Dataframe 的新手,有人可以帮帮我。这将非常有帮助。
询问:
错误:
pandas - 是否可以将 Series.str.extract 与 Dask 一起使用?
我目前正在使用 Pandas 处理一个大型数据集,我必须使用pandas.Series.str.extract
. 它看起来像这样:
但是,它运行良好,因为它必须执行大约十次(使用各种源列),性能不是很好。为了通过使用多个内核来提高性能,我想尝试 Dask,但它似乎不受支持(我在 dask 的文档中找不到对提取方法的任何引用)。
有没有办法并行执行这样的 Pandas 动作?我发现了这种方法,您基本上将数据帧拆分为多个数据帧,为每个子帧创建一个进程,然后将它们连接回来。
dask - 镶木地板文件中的 Dask Dataframe:OSError:无法反序列化节俭:TProtocolException:无效数据
我正在生成一个 Dask 数据帧,以在 dask-ml 提供的聚类算法中下游使用。在我管道的上一步中,我使用 读取磁盘中的数据帧,使用dask.dataframe.read_parquet
应用转换来添加列map_partitions
,然后使用 将生成的数据帧写回磁盘dask.dataframe.to_parquet
。再次读取并compute()
调用生成的数据帧时会出现此问题。
运行以下代码:
产生以下回溯:
环境是 Amazon Linux 2,Python 3.7.9,dask == 2.30.0,pyarrow == 2.0.0,pandas == 1.1.5,numpy == 1.19.4。dask 数据帧由 404 列组成,从大约 14,000 个 parquet 文件(分区)中读取。其中四列包含 type 项object
(三列包含字符串,一列包含字符串的嵌套列表),其余 400 列包含 type float64
。
python - Dask 在进行归约时如何分散数据
我正在使用 Dask 进行复杂的操作。首先我做了一个减少,产生一个中等大小的df(几MB),然后我需要将它传递给每个工人来计算最终结果,所以我的代码看起来有点像这样
但是我收到看起来像这样的警告消息
我试过这样做
但这不起作用,因为函数现在将其视为 Future 对象,而不是它应该是的数据类型。我似乎找不到任何关于如何使用 scatter 减少的文档,有人知道如何做到这一点吗?还是我应该忽略警告消息并像我一样传递中等大小的df?