0

我必须阅读包含全文数据的csv,它可以是多行的。我可以通过纯 pandas 读取这个 csv(在版本0.25.3和测试上测试1.0.3)没有任何问题,但是当我尝试通过 dask 读取这个 csv 时,我收到ParserError: Error tokenizing data. C error: EOF inside string starting at row 28的行号取决于我尝试读取的文件。

我准备了人工数据框来重现此错误。我可以调整 dask 参数、预处理输入文件还是这是 dask 实现问题?

multiplication_factor = 71 # 70 works fine, 71 fail
number_of_columns = 100

import pandas as pd
import dask.dataframe as dd
import textwrap

pandas_default_kwargs = {    
    'cache_dates': True,
#     'chunksize': None, # not support by dask
    'comment': None,
#     'compression': 'infer', # not support by dask
    'converters': None,
    'date_parser': None,
    'dayfirst': False,
    'decimal': b'.',
    'delim_whitespace': False,
    'delimiter': None,
    'dialect': None,
    'doublequote': True,
    'dtype': object,
    'encoding': None,
    'engine': None,
    'error_bad_lines': True,
    'escapechar': None,
    'false_values': None,
    'float_precision': None,
    'header': 'infer',
#     'index_col': None, # not support by dask
    'infer_datetime_format': False,
#     'iterator': False, # not support by dask
    'keep_date_col': False,
    'keep_default_na': True,
    'lineterminator': None,
    'low_memory': True,
    'mangle_dupe_cols': True,
    'memory_map': False,
    'na_filter': True,
    'na_values': None,
    'names': None,
    'nrows': None,
    'parse_dates': False,
    'prefix': None,
    'quotechar': '"',
    'quoting': 0,
    'sep': ',',
    'skip_blank_lines': True,
    'skipfooter': 0,
    'skipinitialspace': False,
    'skiprows': None,
    'squeeze': False,
    'thousands': None,
    'true_values': None,
    'usecols': None,
    'verbose': False,
    'warn_bad_lines': True,
}

artificial_df_1_row = pd.DataFrame(
    data=[
        (
            textwrap.dedent(
                f"""
                some_data_for
                
                column_number_{i}
                """
            )
            for i 
            in range(number_of_columns)
        )
    ],
    columns=[f'column_name_number_{i}' for i in range(number_of_columns)]
)

path_to_single_line_csv = './single_line.csv'
path_to_multi_line_csv = './multi_line.csv'

# prepare data to save
single_line_df = artificial_df_1_row
multi_line_df = pd.concat(
    [single_line_df] * multiplication_factor,
)

# save data
single_line_df.to_csv(path_to_single_line_csv, index=False)
multi_line_df.to_csv(path_to_multi_line_csv, index=False)

# read 1 row csv by dask - works
dask_single_line_df = dd.read_csv(
    path_to_single_line_csv, 
    blocksize=None, # read as single block
    **pandas_default_kwargs
)
dask_single_line_df_count = dask_single_line_df.shape[0].compute()
print('[DASK] single line count', dask_single_line_df_count)

# read multiline csv by pandas - works
pandas_multi_line_df = pd.read_csv(
    path_to_multi_line_csv,
    **pandas_default_kwargs
)
pandas_multi_line_df_shape_0 = pandas_multi_line_df.shape[0]
print('[PANDAS] multi line count', pandas_multi_line_df_shape_0)

# read multine csv by dask - depends on number of rows fails or not
dask_multi_line_df = dd.read_csv(
    path_to_multi_line_csv, 
    blocksize=None,  # read as single block
    **pandas_default_kwargs
)
dask_multi_line_df_shape_0 = dask_multi_line_df.shape[0].compute()
print('[DASK] multi line count', dask_multi_line_df_shape_0)
4

1 回答 1

0

读取此类文件的唯一方法是确保块边界不在带引号的字符串内,除非您对数据布局有很多了解,否则意味着根本不分块文件(但您仍然可以在文件之间进行并行化) .

这是因为,要知道你是否在引用的字符串中,唯一的方法是从头开始解析文件,而 dask 实现并行性的方式是让每个块读取任务完全独立,只需要一个文件偏移量。在实践中,dask 从偏移量读取,并将第一个换行标记视为开始解析的点。

于 2020-07-14T16:10:56.203 回答