我正在尝试将一个巨大的 csv.gz 文件从 url 读取成块并将其即时写入数据库。我必须在内存中完成所有这些,磁盘上不能存在任何数据。
我有下面的生成器函数,它将响应块生成到 Dataframe 对象中。
它使用请求的 response.raw 作为 pd.read_csv 函数的输入来工作,但它看起来不可靠,有时会引发超时错误:urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(10054, \'WSAECONNRESET\')",)', OSError("(10054, 'WSAECONNRESET')",))
response = session.get(target, stream=True)
df_it = pd.read_csv(response.raw, compression='gzip', chunksize=10**6,
header=None, dtype=str, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
if df.empty:
continue
if (i % 10) == 0:
time.sleep(10)
yield df
我决定改用 iter_content,因为我读到它应该更可靠。我已经实现了以下功能,但我收到了这个错误:EOFError: Compressed file ended before the end-of-stream marker was reached
。
我认为这与我传入压缩的 Bytes 对象(?)的事实有关,但我不确定如何将 pandas.read_csv 传递给它将接受的对象。
response = session.get(target, stream=True)
for chunk in response.iter_content(chunk_size=10**6):
file_obj = io.BytesIO()
file_obj.write(chunk)
file_obj.seek(0)
df_it = pd.read_csv(file_obj, compression='gzip', dtype=str,
header=None, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
if df.empty:
continue
if (i % 10) == 0:
time.sleep(10)
yield df
任何想法都非常感谢!
谢谢