0

我是 Web 开发的新手。我的任务是从 web api 下载大量数据,并将它们上传到谷歌云上的 postgres 数据库中。

因为数据量很大,所以我有一个for循环,逐部分刮取数据,将每一部分插入到云表中,最后commit一切。我使用 sqlalchemy 和 pg8000 来完成这项工作。

这是我的代码的基本结构:

engine = create_engine("postgresql+pg8000://connection/info")
session = scoped_session(sessionmaker(autocommit=False,autoflush=False,bind=engine))
Base = declarative_base()

class MyTableClass(Base):
    some columns

Base.metadata.create_all(engine)

for part in scraping_data():
    engine.execute(MyTableClass.__table__.insert(), part)

session.commit()
session.close()
engine.dispose()

for 循环成功运行了 12 个小时。然后我收到一个网络错误。这是第一部分。错误消息的其余部分很长,只是第一部分引起的一串错误。

ERROR:sqlalchemy.pool.impl.QueuePool:Exception closing connection <pg8000.legacy.Connection object at 0x7fa2ba514160>
Traceback (most recent call last):
    File "/home/user/anaconda3/envs/myenv/lib/python3.8/site-packages/pg8000/core.py", line 760, in handle_messages
    code, data_len = ci_unpack(self._read(5))
    struct.error: unpack_from requires a buffer of at least 5 bytes for unpacking 5 bytes at offset 0 (actual buffer size is 0)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/anaconda3/envs/myenv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1782, in _execute_context
    self.dialect.do_executemany(
  File "/home/user/anaconda3/envs/myenv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 716, in do_executemany
    cursor.executemany(statement, parameters)
  File "/home/user/anaconda3/envs/myenv/lib/python3.8/site-packages/pg8000/legacy.py", line 304, in executemany
    self.execute(operation, parameters)
  File "/home/user/anaconda3/envs/myenv/lib/python3.8/site-packages/pg8000/legacy.py", line 252, in execute
    self._context = self._c.execute_unnamed(
  File "/home/user/anaconda3/envs/myenv/lib/python3.8/site-packages/pg8000/core.py", line 649, in execute_unnamed
    self.handle_messages(context)
  File "/home/user/anaconda3/envs/myenv/lib/python3.8/site-packages/pg8000/core.py", line 762, in handle_messages
    raise InterfaceError("network error on read") from e
pg8000.exceptions.InterfaceError: network error on read

任何人都可以帮助我阐明错误吗?

我的连接似乎刚刚超时。谁能帮助我更好地构建代码以避免在超时发生时丢失大量下载数据?

谢谢!

4

1 回答 1

0

正如@Eddie 所证实的那样,他的问题通过将他的出口工作分解成更小的工作来解决。我将发布评论(故障排除步骤/潜在问题)作为任何遇到相同问题的人的答案。

根据此链接,异常 pg8000.errors.InterfaceError(Error) 是针对与数据库接口而非数据库本身相关的错误引发的通用异常。例如,如果接口尝试使用 SSL 连接但服务器拒绝,则会引发 InterfaceError。

  1. 检查您是否在项目中启用了 Service Networking API。如果您尝试将私有 IP 地址分配给 Cloud SQL 实例,并且您使用的是共享 VPC,则还需要为宿主项目启用服务网络 API。

  2. CSV 和 SQL 格式的导出方式不同。SQL 格式包括整个数据库,可能需要更长时间才能完成。使用 CSV 格式并运行多个较小的导出作业,以减少每个操作的大小和长度,以避免导出期间超时。也许 temp_file_limit 标志设置得太低,不适合您的数据库使用。增加 temp_file_limit 大小。请参阅配置数据库标志

  3. 检查您是否正确地确定了 sqlalchemy 会话的范围,如下所述: https ://flask.palletsprojects.com/en/1.1.x/patterns/sqlalchemy/

    具体来说,当使用 scoped_session 时,您不会使用 @app.teardown_context 装饰器删除会话。

  4. 请按照本文档查看您的情况是否满足所有连接问题检查表。

于 2021-12-25T12:28:47.847 回答