问题标签 [dask]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2669 浏览

python - 在 numpy 行上并行化循环

我需要将相同的函数应用于 numpy 数组中的每一行,并将结果再次存储在 numpy 数组中。

function对我的数据进行一些重要的过滤,并在条件为真和条件为假时返回一个数组。function可以是纯 python 或 cython 编译。对行的过滤操作很复杂,并且可能取决于行中的先前值,这意味着我不能以逐个元素的方式对整个数组进行操作

例如,有没有办法在 dask 中做这样的事情?

0 投票
1 回答
671 浏览

python - 在 dask 数组中使用 float32 时出现内存错误

我正在尝试使用将 1.25 GB 数据集导入 pythondask.array

该文件是一个 1312*2500*196 的数组uint16。我需要将其转换为float32数组以供以后处理。

我已经设法将这个 Dask 数组拼接在一起uint16,但是当我尝试转换为时float32出现内存错误

不管我对块大小做什么,我都会遇到内存错误。

我通过以 100 行连接数组来创建数组(将 2500 维分解为 100 行的小片段,因为dask无法原生读取.RAW图像文件,我必须使用它numpy.memmap()来读取文件然后创建数组。下面我将提供“尽可能短”的代码片段:

我尝试了两种方法:

1)创建完整的uint16数组,然后尝试转换为float32

(注意:这memmap是一个 1312x100x196 数组,行数范围从 0 到 24)

然后我用

在方法 2 中:

这两种方法都会导致内存错误。

这有什么原因吗?

我读到该dask数组能够进行多达 100 GB 的数据集计算。

我尝试了所有块大小(从小到 10x10x10 到单行)

0 投票
1 回答
1497 浏览

python - 选择一个框架用于使用 python 进行大于内存的数据分析

我正在解决一个大于内存的数据集的问题。原始数据集是一个 .csv 文件。其中一列是来自 musicbrainz 服务的曲目 ID。

我已经做了什么

我用dask读取 .csv 文件并将其转换为磁盘上的castra格式以获得更高的性能。我还使用peewee查询了 musicbrainz API 并填充了一个 sqlite 数据库,并得到了一些相关的结果。我选择使用数据库而不是另一个 dask.dataframe,因为这个过程需要几天时间,而且我不想在任何失败的情况下丢失数据。

我还没有开始真正分析数据。在重新排列数据的过程中,我设法弄得一团糟。

当前的问题

我很难将 SQL DB 中的列连接到 dask / castra 数据框。实际上,我不确定这是否可行。

替代方法

似乎我在为任务选择最佳工具时犯了一些错误。Castra 可能还不够成熟,我认为这是问题的一部分。此外,最好选择 SQLAlchemy 来支持 peewee,因为它被 pandas 使用,而 peewee 没有。

Blaze + HDF5 可能作为 dask + castra 的良好替代品,主要是因为 HDF5 比 castra 更稳定/成熟/完整,并且 blaze 在数据存储方面不太固执己见。例如,它可以简化 SQL DB 到主数据集的连接。

另一方面,我熟悉 pandas 并且 dask 公开了“相同”的 API。使用 dask 我也获得了并行性。

TL;博士

我有一个大于内存数据集 + sqlite 数据库,我需要加入到主数据集中。我怀疑是否使用 dask + castra (不知道 dask.dataframe 的其他相关数据存储),并使用 SQLAlchemy 一次将部分 SQL DB 加载到带有 pandas 的数据框中。我看到的最好的选择是改用 blaze + HDF5。在这种情况下,您有什么建议?

欢迎任何其他选项/意见。我希望这对 SO 来说足够具体。

0 投票
0 回答
143 浏览

python - 分配()到dask DataFrames中的变量列名

pandas我有可以在dask. 这里有一个部分解决方案,但它不允许我使用变量作为我分配给的列的名称。

这是工作pandas代码:

这是损坏的dask代码:

这会将结果分配给一个名为的新列,c而不是修改data[c](我想要的)的值。如果我可以将列名作为变量,那么创建一个新列会很好。例如,如果这有效:

出于显而易见的原因,python 不允许在 an 左侧使用表达式=,因此会忽略name.

如何使用变量作为列名?for 循环的迭代次数远远超过我愿意复制/粘贴的次数。

0 投票
3 回答
585 浏览

python - 如何将 dask.dataframe 与自定义 dsk 图一起使用

我将尝试改写我的问题:

如何将 dask.dataframe 与 zip 之类的功能结合起来?

假设我们有一个名为“accounts.0.csv”的文件,其中包含以下数据

我写了这段代码

此代码应生成如下内容:

我怎样才能做到这一点 ?

0 投票
1 回答
2008 浏览

python - dask assign() 或 apply() 中的变量列名

pandas我有可以在dask. 这里有一个部分解决方案,但它不允许我使用变量作为我正在创建/分配的列的名称。

这是工作pandas代码:

这是dask不符合我要求的代码:

这会将结果分配给一个名为的新列,c而不是修改data[c](我想要的)的值。如果我可以将列名作为变量,那么创建一个新列会很好。例如,如果这有效:

出于显而易见的原因,python 不允许在 an 左侧使用表达式,=并忽略name.

如何使用变量作为我要分配的列的名称?循环的迭代次数远远超过我愿意复制/粘贴的次数。

0 投票
2 回答
3330 浏览

python - Dask DataFrame:对具有多行的 groupby 对象重新采样

我有以下从 Castra 创建的 dask 数据框:

产量:

我想做的是:

  1. user_id按和分组ts
  2. 在 3 小时内重新采样
  3. 在重采样步骤中,任何合并的行都应该连接文本

示例输出:

我尝试了以下方法:

并得到以下错误:

我尝试传入set_index('ts')管道,但它似乎不是Series.

关于如何实现这一目标的任何想法?

TL;博士

如果它使问题变得更容易,我还可以更改我创建的 Castra DB 的格式。我目前的实现很大程度上取自这篇很棒的帖子。

我将索引(在to_df()函数中)设置如下:

并且有:

以下是生成的 dtypes:

0 投票
1 回答
1035 浏览

dask - 可以使用无休止的流式输入进行工作

我知道 dask 在这样的批处理模式下运行良好

  1. 我们可以使用 dask 来处理流通道,其中块的数量是未知的甚至是无穷无尽的吗?
  2. 它可以以增量方式执行计算吗?例如,上面的“分析”步骤可以处理正在进行的块吗?
  3. 我们必须在所有数据块都知道之后才调用“get”操作,我们可以在调用“get”之后添加新块吗
0 投票
1 回答
1235 浏览

python - Dask 支持带有标题的 CSV 文件

似乎dask不支持读取包含标题的 CSV 文件。当我尝试阅读它时,我得到

我可以毫无问题地使用 pandas 和没有标题的 dask 读取相同的文件,但它们 dask 将第一行属性作为标题。这是一个错误吗?

0 投票
3 回答
1152 浏览

python - 大包不使用所有核心?备择方案?

我有一个 python 脚本,它执行以下操作:i。它采用数据的输入文件(通常是嵌套的 JSON 格式) ii。将数据逐行传递给另一个函数,该函数将数据处理为所需的格式 iii. 最后将输出写入文件。

这是我当前执行此操作的简单 python 行...

这是可行的,但是由于 python GIL 将其限制为服务器上的一个核心,它的速度非常慢,尤其是在处理大量数据的情况下。

我通常处理的数据量约为 4 gigs gzip 压缩,但有时我必须处理数百 gigs gzip 压缩的数据。它不一定是大数据,但仍然不能全部在内存中处理,并且使用 Python 的 GIL 处理速度非常慢。

在寻找优化数据处理的解决方案时,我遇到了 dask。虽然 PySpark 在当时对我来说似乎是显而易见的解决方案,但 dask 的承诺和它的简单性赢得了我的青睐,我决定试一试。

在对 dask 以及如何使用它进行了大量研究之后,我编写了一个非常小的脚本来复制我当前的过程。脚本如下所示:

这可以工作并产生与我原来的非 dask 脚本相同的结果,但它仍然只使用服务器上的一个 CPU。所以,它根本没有帮助。事实上,它更慢。

我究竟做错了什么?我错过了什么吗?我对 dask 还是很陌生,所以让我知道我是否忽略了某些事情,或者我是否应该做一些完全不同的事情。

此外,是否有任何替代 dask 可以使用服务器的全部容量(即所有 CPU)来完成我需要做的事情?

谢谢,