我正在尝试N
从存储在 Zarr 组中的所有子组中读取行,并且使用 for 循环非常慢,所以有没有办法使用 Dask(我对 Dask 完全陌生,找不到任何类似于我想要的东西做)来实现这种相对快速且内存效率高的方式
一些代码片段可以帮助我尝试做的事情。
import os
import zarr
from numcodecs import Blosc, PackBits
import numpy as np
from multiprocessing import Process
# Zarr store related #
store_file = "data/bloomfilters.zarr"
store = zarr.DirectoryStore(store_file)
sync_dir = os.path.splitext(store_file)[0] + ".sync"
synchronizer = zarr.ProcessSynchronizer(sync_dir)
Blosc.use_threads = False
compressor = Blosc(cname="zstd", clevel=9, shuffle=Blosc.BITSHUFFLE)
root_group = zarr.group(store=store, synchronizer=synchronizer)
bf_group = root_group.create_group(name="BloomFilterIndex")
# Parallelize bloom filter writes to zarr #
# samples_list size is 215M sometimes even more
bloomfilter_size = 300000
n_cores = 64
processes = []
for samp_list in np.array_split(samples_list, n_cores):
proc = Process(target=write_bf_to_zarr, args=(bf_group, compressor, bloomfilter_size, samples_list))
proc.start()
processes.append(proc)
for proc in processes:
proc.join()
# Write function #
def write_bf_to_zarr(bf_group_handle, compressor, bloomfilter_size, samples_list):
for sample_info in samples_list:
col_idx = sample_info[0]
file_paths = sample_info[1]
bloomfilter = bf_group_handle.zeros(
f"sample_{col_idx}",
shape=(bloomfilter_size),
chunks=(int(bloomfilter_size/4)),
dtype="bool",
filters=[PackBits()],
compressor=compressor,
)
# Note: bloomfilter_cython will create bloom filter as NumPy boolean array and will return it
bloomfilter[:] = bloomfilter_cython(file_paths, bloomfilter_size)
# To slice 'n' rows (example 10 rows) from each of the sub-group
m = len(bf_group.keys())
n = 10
n_rows = np.zeros((n, m), dtype=bool)
start = 0
stop = 10
for col_idx, key in enumerate(bf_group.keys()):
n_rows[:, col_idx] = bf_group.get(key)[start:stop]
因此,我没有使用 for 循环,而是想知道是否有一种方法可以轻松提取N
行,然后我可以简单地将其用作下游工作的 2D 矩阵。
或者:通过连接所有这些子组而不消耗太多内存,知道如何在磁盘上创建二维数组(可能作为 Zarr 存储)也很好。