操作 将多个 LAZ 点云文件读取到 Dask DataFrame。
问题
将 LAZ(压缩)解压缩为 LAS(未压缩)需要大量内存。Dask 创建的不同文件大小和多个进程会导致MemoryError
's.
尝试
我尝试限制遵循指南的工人数量,但似乎不起作用。
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)
dfs = [load(file) for file in lasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/raw', compression='GZIP')
问题 如何以非标准格式加载如此大量的数据?
例子
以下示例是我当前的实现。它每 5 个对所有输入文件进行分组,以限制最多 5 个并行解压缩进程。然后重新分区并写入 Parquet 以启用进一步处理。对我来说,这个实现似乎完全错过了 Dask 的重点。
from laspy.file import File
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
@delayed
def load(file):
with File(file.as_posix(), mode='r') as las_data:
las_df = pd.DataFrame(las_data.points['point'], dtype=float)
return las_df
meta = pd.DataFrame(np.empty(0, dtype=[('X',float),('Y',float),('Z',float),('intensity',float),('raw_classification',int)]))
lasfile_dir = Path('/data/las/')
lasfiles = sorted(list(lasfile_dir.glob('*.laz')))
part_size = 5000000
for idx, sublasfiles in enumerate([lasfiles[i:i+5] for i in range(0,len(lasfiles),5)]):
try:
dfs = [load(file) for file in sublasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/data/las/parquet/'+str(idx), compression='GZIP')