3

我正在尝试将一个巨大的 csv.gz 文件从 url 读取成块并将其即时写入数据库。我必须在内存中完成所有这些,磁盘上不能存在任何数据。

我有下面的生成器函数,它将响应块生成到 Dataframe 对象中。

它使用请求的 response.raw 作为 pd.read_csv 函数的输入来工作,但它看起来不可靠,有时会引发超时错误:urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(10054, \'WSAECONNRESET\')",)', OSError("(10054, 'WSAECONNRESET')",))

response = session.get(target, stream=True)
df_it = pd.read_csv(response.raw, compression='gzip', chunksize=10**6, 
                    header=None, dtype=str, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
    if df.empty:
        continue
    if (i % 10) == 0:
        time.sleep(10)
    yield df

我决定改用 iter_content,因为我读到它应该更可靠。我已经实现了以下功能,但我收到了这个错误:EOFError: Compressed file ended before the end-of-stream marker was reached

我认为这与我传入压缩的 Bytes 对象(?)的事实有关,但我不确定如何将 pandas.read_csv 传递给它将接受的对象。

response = session.get(target, stream=True)
for chunk in response.iter_content(chunk_size=10**6):
    file_obj = io.BytesIO()
    file_obj.write(chunk)
    file_obj.seek(0)
    df_it = pd.read_csv(file_obj, compression='gzip', dtype=str,
                        header=None, names=columns, parse_dates=['datetime'])
    for i, df in enumerate(self.process_df(df_it)):
        if df.empty:
            continue
        if (i % 10) == 0:
            time.sleep(10)
        yield df

任何想法都非常感谢!

谢谢

4

1 回答 1

0

你不妨试试这个:

def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
        def readable(self):
            return True
        def readinto(self, b):
            try:
                l = len(b)  # We're supposed to return at most this much
                chunk = self.leftover or next(iterable)
                output, self.leftover = chunk[:l], chunk[l:]
                b[:len(output)] = output
                return len(output)
            except StopIteration:
                return 0    # indicate EOF
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)

然后

response = session.get(target, stream=True)
response.raw.decode_content = decode
df = pd.read_csv(iterable_to_stream(response.iter_content()), sep=';')

我用它来流式传输 .csv 文件odsclient。它似乎有效,虽然我没有尝试使用 gz 压缩。

来源:https ://stackoverflow.com/a/20260030/7262247

于 2020-02-23T22:42:47.393 回答