问题标签 [dask-dataframe]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - ModuleNotFoundError:没有名为“dask.dataframe”的模块;'dask' 不是一个包
对于当前的项目,我计划将两个非常大的 CSV 文件与 Dask 合并,以替代 Pandas。我已经彻底安装了 Dask pip install "dask[dataframe]"
。
然而,在跑步时import dask.dataframe as dd
,我收到了反馈ModuleNotFoundError: No module named 'dask.dataframe'; 'dask' is not a package
。
几个用户似乎遇到了同样的问题,并被推荐通过 Conda 安装模块,这对我来说也没有帮助。
找不到模块的原因是什么?
python-3.x - Pandas:在聚合函数中获取 abs() mean()
从文档中可以得到这样.aggregate
的dataframe.groupby
对象:
然后aggregate
像这样:
那很好,但我想在这里取abs(np.mean)
of 专栏A
。是否可以将其定义为 的一部分my_agg
?什么是 dask 的解决方案?
dask - 是为控制流“计算”还是构建一个完全“延迟”的任务图更好?
我有一个现有的 Pandas 代码库,并且刚刚开始尝试将其转换为 Dask。我仍在尝试围绕 Dask dataframe
、delayed
和distributed
。通过阅读dask.delayed
文档,似乎理想的情况是为我想要执行的整个操作集构建一个任务/计算图,包括delayed
用于用户消息的函数,然后在一大块中运行所有计算结尾。这样,调用进程就不需要在 Dask 集群执行实际工作时继续运行。
不过,我一直面临的问题是,似乎在某些情况下这是不可行的,尤其是在涉及 Python 控制流时。例如:
我真的不明白如何在不调用df.isnull().any().compute()
.
我现在也不知道调用compute()
或persist()
脚本中是否有任何“坏”(与最佳实践相反)。在网上查看很多示例时,它们似乎是基于实验性/基于 Jupyter 的环境,这load -> preparation -> persist() -> experimentation
似乎是标准方法。由于我有一组相对线性的操作 ( load -> op1 -> op2 -> ... -> opn -> save
),我认为我应该尽可能快地尝试简单地安排任务而不进行任何计算并避免compute
/ persist
,我现在觉得这让我陷入了死胡同。
所以总结一下,我想我有两个问题想要回答,第一个是“使用起来很糟糕compute
吗?”,第二个是“如果是,我怎样才能避免compute
但仍然拥有良好且可读的控制流?”。
python - 将 ast.literal_eval 与 dask 系列一起使用
我有一个字符串,其格式"[[Integer1, tag1], [Integer2, tag2]]"
为 dask 系列中的值,并希望用于df[col] = df[col].apply(ast.literal_eval)
将它们转换为该 dask 系列中的正常列表值。此列表值的 len 在系列中的不同位置可能会有所不同。
但得到错误ValueError('malformed node or string: <_ast.Name object at 0x7f593adfe198>',)
这适用于 pandas 系列,但使用 dask 系列会出现此错误。
dask - 字符串数据转换处理 300MB,在 Dask Distributed 中有 30M 记录
在 Node1 (4CPU, 8GB) 上启动 Dask 调度程序:
Dask调度程序:dask-scheduler --host 0.0.0.0 --port 8786
在 Node2(8CPU,32GB)和 Node3(8CPU,32GB)上启动 Worker: Dask Worker:
dask-worker tcp://http://xxx.xxx.xxx.xxx:8786 --nanny-port 3000:3004 --worker-port 3100:3104 --dashboard-address :8789
这是我的原型,编辑some_private_processing
和some_processing
方法:
此处理需要 8 多个小时,我看到所有数据仅在 Node2 或 Node3 上的一个节点上处理,而不是在 Node2 和 Node3 上处理。
需要帮助来理解洞察力并了解我做错了什么以使这个简单的数据转换运行超过 8 小时但仍未完成。
python - 如何将 Dask DataFrame 与 Numpy Busday_count 一起使用?
我正在努力将 Pandas/Numpy 代码转换为 Dask 以处理更大的数据集。我似乎无法重新创建以下 Pandas/Numpy 代码:
这将返回 time_order_date 和 time_complete_date 之间的整数天数,同时考虑工作周和假期列表。它在我的数据框中创建并填充了一个新列,没有问题。
在 Dask,我尝试了以下方法:
map_partitions 调用 numpy 函数:
ddf['days_to_complete'] = ddf.time_order.map_partitions(func=np.busday_count,args= ddf['time_order_date'].values.astype('datetime64[D]'),ddf['time_complete_date']),meta=(None, 'i8'))
还使用 lambda 的 map_partitions:
ddf['days_to_complete'] = ddf.map_partitions(lambda ddf: ddf.assign(result = np.busday_count(begindates=ddf['time_order_date'].values.astype('datetime64[D]'),enddates=ddf['time_complete_date'],weekmask='1111111',holidays=hols_list)),meta=(None,'i8'))
并在运行 ddf.compute() 后得到以下错误:
您如何以并行处理/对 Dask 友好的方式最好地使用这个 numpy 函数? 我没有成功使用 Dask 文档/示例或其他 SO 线程。我还想使用 Pandas CustomBusinessHour rollfoward,就像我在这里使用基本熊猫一样:
这会将订单时间“前滚”到定义的客户营业时间内(周六订单现在是周一早上 7 点,一个工作日)。谢谢!
编辑: 我试过编写和调用一个函数:
我收到以下错误:TypeError: bdays() got multiple values for argument 'df'
python - Python Dask - 所有列的分组性能
我想计算数据中唯一行的数量。下面是一个快速输入/输出示例。
我的管道中的数据有超过 5000 列和超过 1M 行,每个单元格是 0 或 1。下面是我使用 Dask 进行缩放的两次尝试(26 列):
结果:
第一个解决方案首先将每一列连接成一个字符串,然后在其上运行 group-by。第二个只是按所有列分组。我倾向于使用第一个,因为它在我的测试中更快,但我愿意接受建议。如果在性能方面有任何更好的东西,请随意完全改变我的解决方案(另外,有趣的是, sort=False 不会加速分组,这实际上可能与此有关:https ://github.com/dask /dask/issues/5441和这个https://github.com/rapidsai/cudf/issues/2717)
注意:经过一些测试后,第一个解决方案与列数的比例相对较好。我想一项改进可能是将字符串散列以始终具有固定长度。在这种情况下对分区号有什么建议吗?从远程仪表板我可以看到,经过几次操作后,计算图中的节点减少到只有 3 个,没有利用其他可用的工作人员。
当列增加时,第二种解决方案失败。
注意 2:此外,对于第一个解决方案,我猜 Dask 调度和映射操作的方式发生了一些非常奇怪的事情。正在发生的事情是,一段时间后,单个工作人员获得的任务比其他工作人员多得多,然后工作人员超过 95% 的内存,崩溃,然后任务被正确拆分,但一段时间后另一个工作人员获得更多任务(并且循环重新开始)。管道运行良好,但我想知道这是否是预期的行为。附上截图:
dask - 有没有办法增量更新 Dask 元数据文件?
我正在尝试处理数据集并在 Dask 中将其写入时进行增量更新。Dask 元数据文件在重新读取处理后的数据时会有很大帮助。但是,当我将新的分区/子集写入相同的路径时,那里的元数据会被新的分区/子集覆盖,而不是在包含它们的情况下进行更新。
看了几个地方,还没有找到明显的方法来做到这一点。有谁知道是否有人做过处理这种用例的事情?可以增量更新元数据文件,也可以对其中的一些文件进行编辑/组合。任何建议将不胜感激。
python - Dask Dataframe GroupBy.size() 返回 memoryError
我有两个大型 CSV 文件,每个文件约 2800 万行。我正在执行内部联接,针对新的 Dask Dataframe 添加列,然后GroupBy.Size()
在某些列上请求 a 返回计数。在示例中,输入来自两个 parquet 文件,这些文件是从原始 CSV 生成的。
端到端程序确实可以在 8 核 / 32GB Ram 计算机上运行,并生成 groupBy 大小的 4x6 Pandas DF,但是在 16GB 和 10GB RAM 设备上运行时,出现内存错误。
我能做些什么来避免这个内存错误?
这是有问题的代码:
在低内存设备上运行时,完成 70% 后我收到以下错误:
python - LocalCluster() 如何影响任务数量?
是否需要在 LocalCluster 内部或外部进行计算(如 dask 方法 dd.merge)?最终计算(如 .compute)需要在 LocalCluster 内部还是外部进行?
我的主要问题是 - LocalCluster() 如何影响任务数量?
我和我的同事注意到,将 dd.merge 放在 LocalCLuster() 之外会显着降低任务数量(比如 10 倍或类似的东西)。这是什么原因?
伪例子
许多任务:
几个任务: