8

我有 2 个文本文件 (*.txt),其中包含以下格式的唯一字符串:

udtvbacfbbxfdffzpwsqzxyznecbqxgebuudzgzn:refmfxaawuuilznjrxuogrjqhlmhslkmprdxbascpoxda
ltswbjfsnejkaxyzwyjyfggjynndwkivegqdarjg:qyktyzugbgclpovyvmgtkihxqisuawesmcvsjzukcbrzi

第一个文件包含5000 万行 (4.3 GB),第二个文件包含100 万行 (112 MB)。一行包含 40 个字符、分隔符 : 和另外 45 个字符。

任务:获取两个文件的唯一值。也就是说,您需要一个csv 或 txt文件,其中的行位于第二个文件中,而哪些行不在第一个文件中。

我正在尝试使用vaexVaex)来做到这一点:

import vaex

base_files = ['file1.txt']
for i, txt_file in enumerate(base_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_base/base_{i:02}_{j:02}.hdf5')

check_files = ['file2.txt']
for i, txt_file in enumerate(check_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_check/check_{i:02}_{j:02}.hdf5')

dv_base = vaex.open('hdf5_base/*.hdf5')
dv_check = vaex.open('hdf5_check/*.hdf5')
dv_result = dv_check.join(dv_base, on='data', how='inner', inplace=True)
dv_result.export(path='result.csv')

结果,我得到了具有唯一行值的result.csv文件。但是验证过程需要很长时间。此外,它使用所有可用的 RAM 和所有处理器资源。如何加速这个过程?我究竟做错了什么?有什么可以做得更好?是否值得使用其他库(pandas,dask)进行此检查,它们会更快吗?


UPD 10.11.2020 到目前为止,我没有发现比以下选项更快的东西:

from io import StringIO


def read_lines(filename):
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')


def read_in_chunks(file_obj, chunk_size=10485760):
    while True:
        data = file_obj.read(chunk_size)
        if not data:
            break
        yield data


file_check = open('check.txt', 'r', errors='ignore').read()

check_set = {elem for elem in read_lines(file_check)}

with open(file='base.txt', mode='r', errors='ignore') as file_base:
    for idx, chunk in enumerate(read_in_chunks(file_base), 1):
        print(f'Checked [{idx}0 Mb]')
        for elem in read_lines(chunk):
            if elem in check_set:
                check_set.remove(elem)

print(f'Unique rows: [{len(check_set)}]')

UPD 11.11.2020:感谢 @m9_psy 提供提高性能的提示。它真的更快!目前,最快的方法是:

from io import BytesIO

check_set = {elem for elem in BytesIO(open('check.txt', 'rb').read())}

with open('base.txt', 'rb') as file_base:
    for line in file_base:
        if line in check_set:
            check_set.remove(line)

print(f'Unique rows: [{len(check_set)}]')

有没有办法进一步加快这个过程?

4

4 回答 4

8

我怀疑该join操作需要n * m比较操作,其中nm是两个数据帧的长度。

此外,您的描述和代码之间存在不一致:

  • “也就是说,您需要一个 csv 或 txt 文件,其中的行位于第二个文件中,而哪些行不在第一个文件中。” ⟶ 这意味着 indv_check但不是 indv_base
  • dv_check.join(dv_base, on='data', how='inner', inplace=True)⟶ 这意味着在dv_checkdv_base

无论如何,一个想法是使用,set因为检查集合中的成员资格的时间复杂度为 ,O(1)而检查列表中的成员资格的复杂度为O(n). 如果您熟悉 SQL 世界,这相当于从 LOOP JOIN 策略转移到 HASH JOIN 策略:

# This will take care of removing the duplicates
base_set = set(dv_base['data'])
check_set = set(dv_check['data'])

# In `dv_check` but not `dv_base`
keys = check_set - base_set

# In both `dv_check` and `dv_base`
keys = check_set & base_set

这只会为您提供满足您条件的密钥。您仍然必须过滤两个数据框以获取其他属性。

在我的 2014 iMac 上用 1 分 14 秒完成,内存为 16GB。

于 2020-11-01T23:31:52.107 回答
6

让我们生成一个数据集来模仿您的示例

import vaex
import numpy as np
N = 50_000_000  # 50 million rows for base
N2 = 1_000_000  # 1 million for check
M = 40+1+45     # chars for each string
N_dup = 10_000  # number of duplicate rows in the checks

s1 = np.random.randint(ord('a'), ord('z'), (N, M), np.uint32).view(f'U{M}').reshape(N)
s2 = np.random.randint(ord('a'), ord('z'), (N2, M), np.uint32).view(f'U{M}').reshape(N2)
# make sure s2 has rows that match s1
dups = np.random.choice(N2, N_dup, replace=False)
s2[dups] = s1[np.random.choice(N, N_dup, replace=False)]

# save the data to disk
vaex.from_arrays(s=s1).export('/data/tmp/base.hdf5')
vaex.from_arrays(s=s2).export('/data/tmp/check.hdf5')

现在,要在检查中找到不在基数中的行,我们可以将它们连接起来,并删除不匹配的行:

import vaex
base = vaex.open('/data/tmp/base.hdf5')
check = vaex.open('/data/tmp/check.hdf5')
# joined contains rows where s_other is missing
joined = check.join(base, on='s', how='left', rsuffix='_other')
# drop those
unique = joined.dropmissing(['s_other'])
# and we have everything left
unique
#      s                                                    s_other
0      'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfh...  'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfhb...
1      'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikh...  'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikhh...
2      'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhact...  'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhactn...
3      'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjk...  'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjku...
4      'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvb...  'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvbs...
...    ...                                                  ...
9,995  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwn...  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwno...
9,996  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogms...  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogmsb...
9,997  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknji...  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknjii...
9,998  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmv...  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmvo...
9,999  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfeb...  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfebi...
于 2020-11-06T11:39:41.123 回答
4

这是另一种方法。检查文件大约为 0.1 GB(适合内存)。基本文件最大为 100 GB(因此一次处理一行)。

创建测试数据和生成器函数以导入数据

from io import StringIO

# test data for base (>50 million lines)
base_file = '''a
b
c
d
e
'''

# test data for check (1 million lines)
check_file = '''d
e
f
g
'''

def read_lines(filename):
    ''' Read data file one line at a time (with generator function).'''
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')

仅在检查文件中查找元素(check_set - base_set在 @CodeDifferent 的示例中)

check_set = {elem for elem in read_lines(check_file)}

for elem in read_lines(base_file):
    if elem in check_set:
        check_set.remove(elem)
print(check_set)
{'g', 'f'}

查找交点(check_set & base_set在@CodeDifferent 的示例中)

check_set = {elem for elem in read_lines(check_file)}

common_elements = set()
for elem in read_lines(base_file):
    if elem in check_set:
        common_elements.add(elem)
print(common_elements)
{'d', 'e'}

我认为当(a)基本文件比检查文件大得多并且(b)基本文件对于内存数据结构来说太大时,这种方法最有效。

于 2020-11-05T17:23:05.463 回答
3

笔记!我原来的答案是错误的。@codediff是对的。这是我略有不同的版本。这可能对某人有所帮助。我假设文本文件只包含一列。

import pandas as pd

filepath_check = './data/checkfile.csv'
filepath_base = './data/basefile.csv'

# load the small data into memory
dfcheck = pd.read_csv(filepath_check)
dfcheck = set(dfcheck['data'])

# but load the big data in chunk
chunk_iter = pd.read_csv(filepath_base, chunksize=100000)

# for each chunk, remove intersect if any.
for chunk in chunk_iter:
    dfcheck = dfcheck - set(chunk['data'])
    print(len(dfcheck))

# write result
with open('./results.txt', 'w') as filehandler:
    for item in dfcheck:
        filehandler.write('%s\n'% item)

旧答案

我现在遇到了类似的问题。我的解决方案是使用 Dask,但可以肯定的是,Vaex 应该没问题。

import dask.dataframe as dd

base_file = dd.read_csv('./base_file.csv')
check_file = dd.read_csv('./check_file.csv')

base_file = base_file.set_index('data')
check_file = check_file.set_index('data')

base_file.to_parquet('./results/base_file', compression=None)
check_file.to_parquet('./results/base_file', compression=None)

base_file.read_parquet('./results/base_file')
check_file.read_parquet('./results/check_file')
merged = dd.merge(base_file, check_file, left_index=True, right_index=True)

# save to csv from dask dataframe
merged.to_csv('/results/dask_result.csv', single_file = True)

# or save to csv from pandas dataframe
pandas_merged = merged.compute() # convert to pandas
pandas_merged.to_csv(...)
  1. 为什么要设置索引?它使加入过程更快。https://docs.dask.org/en/latest/dataframe-best-practices.html#joins
  2. 为什么保存到镶木地板只是为了以后阅读?以我的经验,即使使用 Dask,直接读取 CSV 也会占用更多内存。加载镶木地板文件肯定更快。https://docs.dask.org/en/latest/dataframe-best-practices.html#store-data-in-apache-parquet-format。我有许多保存/加载行来保存详尽的过程(例如加入和 set_index)后的结果。
  3. 如果 check_file 足够小,您可以check_file = check_file.persist()在加载文件后或任何需要的地方将整个 check_file 加载到内存中。
于 2020-11-09T23:41:41.137 回答