我正在尝试拆分一个非常大的 (22GB) CSV 文件,同时使用块和数据中给定列的第 n 个字符的条件。
我一直在无助地尝试将其组合起来: Python:根据第一列的第一个字符拆分 CSV 文件
有这样的东西,但我碰壁了。我有一个列不为空的条件,但我想根据给定列的第 n 个字符拆分我的文件。
反正有没有根据这样的条件创建更小的 csv 文件。任何帮助将不胜感激。
我的数据摘要如下所示:
源名称 | date_naissance | date_deces | date_mariage | 地方 |
---|---|---|---|---|
dgfkf47 | YYYYMMDD | YYYYMMDD | 等等 | 等等 |
fhfidk67 | YYYYMMDD | YYYYMMDD | 等等 | 等等 |
kgodj45 | YYYYMMDD | 等等 | 等等 | |
paoror76 | YYYYMMDD | 万维网 | 等等 | 等等 |
poldidj90 | YYYYMMDD |
我想要做的是创建一系列较小的文件,以便稍后通过根据列 ID 的第 7 个字符拆分数据来分析数据。我知道如何在 5X10 中做到这一点,因为它适合我的记忆,我只是使用 groupby,但我被困在一个非常大的范围内。as ask 似乎并没有让我迭代 groupby。
我现在的策略是对 Dask 进行所有清理操作,包括创建一个仅包含第 7 个字符的新列,然后输出可以在 pandas 中加载并按此列分组的较小文件。
目前我已经做到了这一点,但我很想知道是否有一种简单的方法可以做到这一点:
import dask.dataframe as dd
import glob, os
import pyarrow.parquet
import pyarrow as pa
from dask import multiprocessing
PATH = r"/RAW DATA/TEST/"
os.chdir(PATH)
for file in glob.glob("**/*_clean.csv", recursive=True):
ddf = dd.read_csv(file, encoding='iso-8859-1', sep=';', dtype={'insee': 'object',
'ref_document': 'object', 'ref_interne_sourcename': 'object',
'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object'})
# read to parquet
ddf.to_parquet('file_clean.csv.parquet', engine='pyarrow', schema={'insee': pa.string(),
'ref_document': pa.string(), 'ref_interne_sourcename': pa.string(),
'sexe': pa.string(), 'profession': pa.string(), 'date_naissance': pa.string(), 'date_mariage': pa.string(), 'date_deces': pa.string()})
ddf = dd.read_parquet('file_clean.csv.parquet', engine='pyarrow')
# split dates
ddf['date_naissance']= dd.to_datetime(ddf['date_naissance'], format='%Y%m%d', errors='coerce')
ddf['date_mariage']= dd.to_datetime(ddf['date_mariage'], format='%Y%m%d', errors='coerce')
ddf['date_deces']= dd.to_datetime(ddf["date_deces"], format='%Y%m%d', errors='coerce')
ddf['jour_naissance'] = ddf['date_naissance'].dt.day
ddf['mois_mariage'] = ddf['date_naissance'].dt.month
ddf['annee_mariage'] = ddf['date_naissance'].dt.year
ddf['jour_mariage'] = ddf['date_mariage'].dt.day
ddf['mois_mariage'] = ddf['date_mariage'].dt.month
ddf['annee_mariage'] = ddf['date_mariage'].dt.year
ddf['jour_deces'] = ddf['date_deces'].dt.day
ddf['mois_deces'] = ddf['date_deces'].dt.month
ddf['annee_deces'] = ddf['date_deces'].dt.year
# drop columns
del ddf['date_naissance']
del ddf['date_mariage']
del ddf['date_deces']
# create IDTAG column
ddf['IDTAG'] = ddf['sourcename'].str[6].fillna('')
namefile = os.path.splitext(file)[0]
ddf.to_csv(namefile)
for partfile in glob.glob('**/*.part'):
os.rename(partfile, (os.path.splitext(partfile)[0]) + "_{}_part.csv".format(namefile))
for partfilecsv in glob.glob("**/*_part.csv", recursive=True):
part_df = pd.read_csv(partfilecsv, encoding='iso-8859-1', sep=';')
# get a list of columns
cols = list(part_df)
# move the column to head of list
cols.insert(0, cols.pop(cols.index('IDTAG')))
# reorder
part_df = part_df.loc[:, cols]
# group by IDTAG
def firstletter(Index):
firstentry = part_df.iloc[Index, 0]
return firstentry[0]
for letter, group in part_df.groupby(firstletter):
group.to_csv((os.path.splitext(partfilecsv)[0]) + '_source_{}.csv'.format(letter))