7

所以,我有大约 4,000 个 CSV 文件,我需要将它们全部外部连接。每个文件有两列(一个字符串和一个浮点数)和 10,000-1,000,000 行之间,我想通过第一列(即字符串变量)加入。

我试过numpy.lib.recfunctions.join_by了,但那速度很慢。我切换到了pandas.merge,这要快得多,但考虑到我拥有的桌子的数量(和大小),它仍然太慢了。而且它似乎真的是内存密集型的——当被合并的文件有数十万行时,机器变得无法使用(我主要使用的是 MacBook Pro,2.4GHz,4GB)。

所以现在我正在寻找替代方案 - 我还缺少其他潜在的解决方案吗?Python 还存在哪些其他外连接实现?是否有论文/网站讨论和比较每个实现的时间复杂度?如果我只是简单地调用 Python 调用 sqlite3,然后让 sqlite3 进行连接会更有效吗?字符串键是问题吗?如果我可以使用数字键,它应该更快吗?

如果它有助于让您更具体地了解我想要实现的目标,这里是我的代码使用pandas.merge

import os
import pandas as pd

def load_and_merge(file_names, path_to_files, columns):
    '''
    seq, str, dict -> pandas.DataFrame
    '''
    output = pd.DataFrame(columns = ['mykey']) # initialize output DataFrame
    for file in file_names:

        # load new data
        new_data = pd.read_csv(path + file,
                               usecols = [col for col in columns.keys()],
                               dtype = columns,
                               names = ['mykey', file.replace('.csv', '')],
                               header = None)

        # merge with previous data
        output = pd.merge(output, new_data, on = 'mykey', how = 'outer')
        output = output.fillna(0) # kill NaNs

    return output

path = '/Users/username/data/'
files_list = [file for file in os.listdir(path) if '.csv' in file]
merged_table = load_and_merge(files_list, path, {0: 'S30', 1: 'float'})

(Mac OS X 10.6.8 和 Python 2.7.5;Ubuntu 12.04 和 Python 2.7.3)

4

2 回答 2

7

以下是我将如何解决这个问题。

不要迭代合并。您正在合并一个较小的框架(称为“合并”)和一个较大的框架(称为“合并”)。然后重复此操作,导致“合并”变得更大并拥有更多行。

相反,您可以进行重复的分层合并。假设您将合并编号为 1-4000。

合并 1 和 2 形成 1_2

然后重复,然后你合并 1_2 和 3_4 形成 1_2_3_4

这不会改变您正在做的工作量,但它使每次合并变得更加简单,从而降低了内存障碍并花费了时间(因为它必须通过键的笛卡尔积)。随机化合并顺序可能是有意义的。

此外,这是完全可并行化的,因为您可以让独立的进程处理这个问题,生成中间合并。这本质上变成了一个 map-reduce 类型的问题。

您还可以将中间合并存储在 HDF5 文件中(使用HDFStore),这将使存储非常高效。请注意,这些应该是单独的文件,以避免使用多个进程写入同一个文件(HDF5 不支持)。

于 2013-10-18T17:16:59.223 回答
0

好的,如果我理解正确的话,这是 Jeff 方法的部分实现(请参阅上面的答案)。我发布这个以防其他人试图做类似的事情。并且如果有人可以帮助改进或“美化”这段代码(现在这是一个长而丑陋的代码流......我想我应该以某种方式模块化它。)

这是一个部分实现,因为我没有并行化合并。我尝试过,使用 Python 的multiprocessing模块,但显然我没有足够的计算机内存 - 两个同时进行的进程足以冻结一切(或者我可能只是做了一些完全愚蠢的事情 - 很有可能,因为我以前从未使用multiprocessing过)。但剩下的就在这里:分层合并和 HDF5(用于存储中间文件)。

#!/usr/bin/env python

import os
import sys
import pandas as pd

def merge_csv(folder, cols_info):
    '''
    str, dict -> pandas.DataFrame

    This function outer joins all CSV files in the specified folder. It 
    joins them hierarchically and stores the intermediate files in an HDF5 
    container. The first parameter is the path to the folder and the second 
    parameter is a dictionary mapping each column to the corresponding data 
    type. You can only specify two columns.

    Example: 

    merge_csv('/Users/username/data/', {0: 'S30', 2: 'float'})

    Dependencies:

    - HDF5
    - PyTables
    - pandas
    '''

    # ensure that user is specifying only two columns
    if len(cols_info) != 2:
        sys.exit('Error: You can only specify two columns.')

    # ensure that path to input folder ends with '/'
    folder = folder + '/' if folder[-1] != '/' else folder

    # create HDF5 file to store intermediate files
    store = pd.HDFStore(folder + 'store.h5', mode = 'w')

    # load CSV files and write data to HDF5 file
    flist = [file for file in os.listdir(folder) if file[-4:] == '.csv']
    if len(flist) == 0:
        sys.exit('There are no CSV files in the specified folder.')
    for file in flist:
        case = file.replace('.csv', '')
        store[case] = pd.read_csv(folder + file, 
                                  usecols = [col for col in cols_info], 
                                  names = ['key', case], 
                                  dtype = cols_info)
        store.flush()

    # start merge routine
    flist = store.keys()
    counter = 0
    while len(flist) > 1:
        counter += 1

        # merge current set of files, two by two
        length = (len(flist) - 1) if (len(flist) % 2 == 1) else len(flist)
        for i in range(length)[0:length:2]:
            merged_data = pd.merge(store[flist[i]], store[flist[i + 1]], 
                                   on = 'key', 
                                   how = 'outer',
                                   sort = False)
            outputfile = 'file' + str(i) + str(i + 1)

            # if number of files is odd, make last pair a trio
            if (i == len(flist) - 3) and (len(flist) % 2 == 1):
                merged_data = pd.merge(merged_data, store[flist[i + 2]], 
                                       on = 'key',
                                       how = 'outer', 
                                       sort = False)
                outputfile += str(i + 2)

            # save merged pair (or trio) to HDF5 file
            merged_data = merged_data.fillna(0)
            store.put('/tmp' + str(counter) + '/' + outputfile, merged_data)
            store.flush()

        # clean up
        to_remove = [file for file in store.keys() 
                     if 'tmp' + str(counter) + '/' not in file]
        for file in to_remove:
            store.remove(file)

        # move on to next set of intermediate files
        flist = store.keys()

    # wrap up
    store.close()
    return merged_data

编辑

仍然没有好处:中间矩阵最终变得太大并超过计算机内存并且代码崩溃(tables.exceptions.HDF5ExtError: Problems creating the Array.)。我试过 sqlite3,但这也没有用,所以我想我只需要继续寻找。

于 2013-10-24T01:50:43.260 回答