2

我正在尝试从一组压缩的 CSV 文件创建一个 dask 数据框。阅读问题,似乎 dask 需要使用 dask.distributed delayed()

import glob
import dask.dataframe as dd
import zipfile
import pandas as pd 
from dask.delayed import delayed

#Create zip_dict with key-value pairs for .zip & .csv names
file_list = glob.glob('my_directory/zip_files/')
zip_dict = {}
for f in file_list:
    key = f.split('/')[5][:-4]
    zip_dict[key] = zipfile.ZipFile(f)

zip_dict = {'log20160201': zipfile.ZipFile filename='/my_directory/zip_files/log20160201.zip' mode='r', 'log20160218': zipfile.ZipFile filename='/my_directory/zip_files/log20160218.zip' 的示例内容模式='r'}

# Create list of delayed pd.read_csv()    
d_rows = []
for k, v in zip_dict.items():

    row = delayed(pd.read_csv)(v.open(k+'.csv'),usecols=['time','cik'])
    d_rows.append(row)
    v.close()

d_rows 的样本内容 = [Delayed('read_csv-c05dc861-79c3-4e22-8da6-927f5b7da123'), Delayed('read_csv-4fe1c901-44b4-478b-9c11-4a80f7a639e2')]

big_df = dd.from_delayed(d_rows)  

返回的错误是: ValueError: Invalid file path or buffer object type: class 'list'

4

1 回答 1

3

高级方法

在这种情况下,我认为您实际上并不需要字典zip_dict用 Pandas 懒惰地读取这些压缩文件。基于这个非常相似的 SO question to read in ( .gz) 压缩*.csv文件使用 Dask(也显示在这里),并且由于您要加载多个文件,因此至少有两种可能的方法可供您使用

  1. 使用dask.delayedpandas.read_csv
    • 在这里,您可以将每个文件读入 a pandas.DataFrame,但不是实际执行读入内存,而是延迟此操作,从而创建延迟对象列表(至少有两种方法可以创建此列表,如下所示)
      • for循环创建一个列表,这就像[delayed(pd.read_csv)(f) for f in file_list]
        • 如果你有 17 个.csv.zip文件,那么这会创建一个包含 17 个延迟对象的列表
      • mapand创建一个列表functools.partial,这会创建一个单元素列表,看起来像list(map(functools.partial(delayed(pd.read_csv), file_list)))
        • 如果你有 17 个.csv.zip文件,那么这会创建一个包含 1 个延迟对象的列表
    • 然后您使用dd.from_delayed将此延迟对象列表转换为pandas.DataFrame
      • 使用循环方法,这类似于dd.from_delayed(dfs)
      • 使用map()andfunctools.partial方法,您将使用dd.from_delayed(dfs).repartition(file_list)
        • 由于这种方法只给出了一个单(延迟)元素列表,因此结果dask.dataframe将具有将所有文件垂直连接到单个dask.dataframe分区中的效果
        • 为了将 17 个文件中的每一个分隔到 的专用分区中dask.dataframe,您需要使用.repartition()
  2. dask.dataframe.read_csv(file_list) 直接使用,它实际使用pandas.read_csv,因此它接受许多来自的关键字参数pandas.read_csv

在这两种方法中

  • 指定将被读取 dtypes的列(如推荐)是 Dask 的最佳实践
    • 您可以使用看起来像的字典来执行此操作{"time": int, "cik": int},因为您只需要列time并且cik您知道它们中的每一个都应该是int(整数)dtype
  • 使用.read_csv()关键字
    • usecols指定所需的列名列表
    • compression表示.zip正在读入一个文件

Python代码

下面是实现这些方法的代码,并根据需要提供简短的注释

进口

from functools import partial
from itertools import repeat
from glob import glob
from collections import OrderedDict

import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.delayed import delayed

生成虚拟数据文件

使用这个 SO 答案,生成多个.csv文件

def generate_multiple_csvs(data_dir, file_num=1):
    col_names = list("ABCDEFG")+["time", "cik"]
    df = pd.DataFrame(np.random.randint(10, size=(10,9)), columns=col_names)
    filename = f"data_file_{file_num}.csv.zip"
    filepath = data_dir + "/" + filename
    df["filepath"] = filename
    df.to_csv(filepath, index=False, compression="zip")
    return df

# Specify path the directory where `.csv.zip` files should be created
data_dir = "data/processed"

# Specify number of files to create
num_files_wanted = 8
用于itertools.repeat创建虚拟文件
_ = list(
    map(
        generate_multiple_csvs,
        repeat(data_dir, num_files_wanted),
        list(range(1, num_files_wanted+1)),
    )
)
用于functools.partial创建虚拟文件
_ = list(
    map(
        partial(generate_multiple_csvs, data_dir),
        list(range(9, 9+num_files_wanted+1)),
    )
)

按文件类型获取文件列表

file_list = glob(data_dir + "/" + "*.zip")

dtypes为 Dask DataFrame 中的列指定列(推荐)

my_dtypes = OrderedDict([("time",int), ("cik",int)])

方法 1 -dask.delayed使用for循环

# Lazily reading files into Pandas DataFrames by looping
dfs = [
    delayed(pd.read_csv)(f, compression='zip', usecols=['time','cik'])
    for f in file_list
]

# Combine into a single Dask DataFrame
ddf_from_delayed_loop = dd.from_delayed(dfs, meta=my_dtypes)

print(type(ddf_from_delayed_loop))
print(ddf_from_delayed_loop)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
                 time    cik
npartitions=17              
                int64  int64
                  ...    ...
...               ...    ...
                  ...    ...
                  ...    ...
Dask Name: from-delayed, 34 tasks

方法 1 - 使用dask.delayedwithmap

# Lazily reading files into Pandas DataFrames with Python's built-in map()
dfs = list(
    map(
        partial(
            delayed(pd.read_csv),
            compression="zip",
            usecols=['time', 'cik'],
        ),
        file_list,
    )
)

# Combine into a single Dask DataFrame and repartition
ddf_from_delayed_map = dd.from_delayed(dfs, meta=my_dtypes).repartition(
    npartitions=len(file_list)
)

print(type(ddf_from_delayed_map))
print(ddf_from_delayed_map)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
                 time    cik
npartitions=17              
                int64  int64
                  ...    ...
...               ...    ...
                  ...    ...
                  ...    ...
Dask Name: from-delayed, 34 tasks

方法 2 - 直接使用dask.dataframe

# Lazily reading files into single Dask DataFrame
ddf_direct = dd.read_csv(
    data_dir+"/*.csv.zip",
    compression='zip',
    dtype=my_dtypes,
    blocksize=None,
    usecols=['time','cik'],
)

print(type(ddf_direct))
print(ddf_direct)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
                 time    cik
npartitions=17              
                int64  int64
                  ...    ...
...               ...    ...
                  ...    ...
                  ...    ...
Dask Name: read-csv, 17 tasks

笔记

  1. 对于上述所有方法,指定数字分区时应牢记以下几点
  2. 对带有循环的方法使用批处理,dask.delayedfor以减少大量调用的开销(有关批处理实现,dask.delayed请参阅此 SO 问题)。
于 2018-10-19T06:18:37.067 回答