6

我正在尝试使用 pandas to_parquet 保存一个非常大的数据集,并且在超过某个限制时似乎会失败,无论是使用“pyarrow”还是“fastparquet”。我使用以下代码重现了我遇到的错误,并且很高兴听到有关如何克服该问题的想法:

使用 Pyarrow:

low = 3
high = 8
for n in np.logspace(low, high, high-low+1):
    t0 = time()
    df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50))     for x in range(int(n))], columns=['a', 'b']).set_index('a')
    df.to_parquet(tmp_file, engine='pyarrow', compression='gzip')
    pd.read_parquet(tmp_file, engine='pyarrow')
    print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')

10^3.0 read-write took 0.012851715087890625 seconds
10^4.0 read-write took 0.05722832679748535 seconds
10^5.0 read-write took 0.46846866607666016 seconds
10^6.0 read-write took 4.4494054317474365 seconds
10^7.0 read-write took 43.0602171421051 seconds
---------------------------------------------------------------------------
ArrowIOError                              Traceback (most recent call last)
<ipython-input-51-cad917a26b91> in <module>()
      5     df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50)) for x in range(int(n))], columns=['a', 'b']).set_index('a')
      6     df.to_parquet(tmp_file, engine='pyarrow', compression='gzip')
----> 7     pd.read_parquet(tmp_file, engine='pyarrow')
      8     print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')

~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
    255 
    256     impl = get_engine(engine)
--> 257     return impl.read(path, columns=columns, **kwargs)

~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
    128         kwargs['use_pandas_metadata'] = True
    129         return self.api.parquet.read_table(path, columns=columns,
--> 130                                            **kwargs).to_pandas()
    131 
    132     def _validate_write_lt_070(self, df):

~/.conda/envs/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py in read_table(source, columns, nthreads, metadata, use_pandas_metadata)
    939     pf = ParquetFile(source, metadata=metadata)
    940     return pf.read(columns=columns, nthreads=nthreads,
--> 941                    use_pandas_metadata=use_pandas_metadata)
    942 
    943 

~/.conda/envs/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py in read(self, columns, nthreads, use_pandas_metadata)
    148             columns, use_pandas_metadata=use_pandas_metadata)
    149         return self.reader.read_all(column_indices=column_indices,
--> 150                                     nthreads=nthreads)
    151 
    152     def scan_contents(self, columns=None, batch_size=65536):

_parquet.pyx in pyarrow._parquet.ParquetReader.read_all()

error.pxi in pyarrow.lib.check_status()
ArrowIOError: Arrow error: Invalid: BinaryArray cannot contain more than 2147483646 bytes, have 2147483650

使用快速镶木地板:

low = 3
high = 8
for n in np.logspace(low, high, high-low+1):
    t0 = time()
    df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50)) for x in range(int(n))], columns=['a', 'b']).set_index('a')
    df.to_parquet(tmp_file, engine='fastparquet', compression='gzip')
    pd.read_parquet(tmp_file, engine='fastparquet')
    print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')

10^3.0 read-write took 0.17770028114318848 seconds
10^4.0 read-write took 0.06351733207702637 seconds
10^5.0 read-write took 0.46896958351135254 seconds
10^6.0 read-write took 5.464379549026489 seconds
10^7.0 read-write took 50.26520347595215 seconds
---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
<ipython-input-49-234a889ae790> in <module>()
      4     t0 = time()
      5     df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50)) for x in range(int(n))], columns=['a', 'b']).set_index('a')
----> 6     df.to_parquet(tmp_file, engine='fastparquet', compression='gzip')
      7     pd.read_parquet(tmp_file, engine='fastparquet')
      8     print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')

~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/core/frame.py in to_parquet(self, fname, engine, compression, **kwargs)
   1647         from pandas.io.parquet import to_parquet
   1648         to_parquet(self, fname, engine,
-> 1649                    compression=compression, **kwargs)
   1650 
   1651     @Substitution(header='Write out the column names. If a list of strings '

~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in to_parquet(df, path, engine, compression, **kwargs)
    225     """
    226     impl = get_engine(engine)
--> 227     return impl.write(df, path, compression=compression, **kwargs)
    228 
    229 

~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in write(self, df, path, compression, **kwargs)
    198         with catch_warnings(record=True):
    199             self.api.write(path, df,
--> 200                            compression=compression, **kwargs)
    201 
    202     def read(self, path, columns=None, **kwargs):

~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/writer.py in write(filename, data, row_group_offsets, compression, file_scheme, open_with, mkdirs, has_nulls, write_index, partition_on, fixed_text, append, object_encoding, times)
    846     if file_scheme == 'simple':
    847         write_simple(filename, data, fmd, row_group_offsets,
--> 848                      compression, open_with, has_nulls, append)
    849     elif file_scheme in ['hive', 'drill']:
    850         if append:

~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/writer.py in write_simple(fn, data, fmd, row_group_offsets, compression, open_with, has_nulls, append)
    715                    else None)
    716             rg = make_row_group(f, data[start:end], fmd.schema,
--> 717                                 compression=compression)
    718             if rg is not None:
    719                 fmd.row_groups.append(rg)

~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/writer.py in make_row_group(f, data, schema, compression)
    612                 comp = compression
    613             chunk = write_column(f, data[column.name], column,
--> 614                                  compression=comp)
    615             rg.columns.append(chunk)
    616     rg.total_byte_size = sum([c.meta_data.total_uncompressed_size for c in

~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/writer.py in write_column(f, data, selement, compression)
    545                                    data_page_header=dph, crc=None)
    546 
--> 547     write_thrift(f, ph)
    548     f.write(bdata)
    549 

~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/thrift_structures.py in write_thrift(fobj, thrift)
     49     pout = TCompactProtocol(fobj)
     50     try:
---> 51         thrift.write(pout)
     52         fail = False
     53     except TProtocolException as e:

~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/parquet_thrift/parquet/ttypes.py in write(self, oprot)
   1028     def write(self, oprot):
   1029         if oprot._fast_encode is not None and self.thrift_spec is not None:
-> 1030             oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec]))
   1031             return
   1032         oprot.writeStructBegin('PageHeader')

OverflowError: int out of range
4

2 回答 2

8

看来您使用 Pyarrow 成功写入但无法读取,而使用 fastparquet 写入失败,因此无法读取。我建议你用 Pyarrow 写数据,用 fastparquet 分块读取,遍历行组:

from fastparquet import ParquetFile

df.to_parquet(tmp_file, engine='pyarrow', compression='gzip')
pf = ParquetFile(tmp_file)
for df in pf.iter_row_groups():
    print(df.head(n=10))
于 2018-06-11T08:51:15.517 回答
0

我有一个类似的问题,升级到 pyarrow 0.12 对我有用,让我一口气读取文件(而不是块)。

于 2019-03-17T07:06:43.590 回答