3

操作 将多个 LAZ 点云文件读取到 Dask DataFrame。

问题 将 LAZ(压缩)解压缩为 LAS(未压缩)需要大量内存。Dask 创建的不同文件大小和多个进程会导致MemoryError's.

尝试

我尝试限制遵循指南的工人数量,但似乎不起作用。

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)

dfs = [load(file) for file in lasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/raw', compression='GZIP')

问题 如何以非标准格式加载如此大量的数据?

例子

以下示例是我当前的实现。它每 5 个对所有输入文件进行分组,以限制最多 5 个并行解压缩进程。然后重新分区并写入 Parquet 以启用进一步处理。对我来说,这个实现似乎完全错过了 Dask 的重点。

from laspy.file import File
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

@delayed
def load(file):
    with File(file.as_posix(), mode='r') as las_data:
        las_df = pd.DataFrame(las_data.points['point'], dtype=float)
        return las_df

meta = pd.DataFrame(np.empty(0, dtype=[('X',float),('Y',float),('Z',float),('intensity',float),('raw_classification',int)]))

lasfile_dir = Path('/data/las/')
lasfiles = sorted(list(lasfile_dir.glob('*.laz')))

part_size = 5000000

for idx, sublasfiles in enumerate([lasfiles[i:i+5] for i in range(0,len(lasfiles),5)]):
    try:
        dfs = [load(file) for file in sublasfiles]
        df = dd.from_delayed(dfs, meta=meta)
        df = df.repartition(npartitions=len(df) // part_size)
        df.to_parquet('/data/las/parquet/'+str(idx), compression='GZIP')
4

1 回答 1

1

你的实现对我来说似乎很好。

我要在这里改变的一件事是我会避免调用len(df),这将强制计算整个数据帧(如果不读取所有文件,就无法确定数据帧的长度)。

需要明确的是,Dask 将无法在您的load函数中进行并行化(它没有 LAZ 文件的概念),因此您的并行性将受到您拥有的文件数量的限制。

于 2017-12-06T12:48:05.230 回答