2

我一直在尝试使用 sqlalchemy 的 bulk_insert_mappings。我知道我可以创建一个会话并连接到数据库。我已经初始化了我的引擎,但我似乎无法从表中获取我需要的映射器。

from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker,Session
from sqlalchemy_utils import get_mapper

engine = create_engine('mysql+pymysql://{}:{}@IP:PORT/'.format(USER,PW)) # removed my config here
connection = engine.connect()
m = MetaData(bind=engine,schema='test')
m.reflect()

Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()

我最近发现了一堆关于 SO 的相关问题

SQLAlchemy 从 Table 对象(从 Metadata 或 Session 或其他)获取 Mapper 对象

但 sqlalchemy_utils.get_mapper 提出:

“ValueError:无法获取表'test'的映射器。”

sqlalchemy.orm.mapperlib._mapper_registry似乎是空的。也许是因为我没有将它绑定到我的引擎。但不知道该怎么做。

PS:test是一个非常简单的TEXT类型的一栏表

这是 m.tables['test.test'] 的输出

Table('test', MetaData(bind=Engine(mysql+pymysql://USER:***@IP:PORT/)), Column('a', TEXT(), table=<test>), schema='test')
4

2 回答 2

6

SQLAlchemy 的工作Mapper是:

定义类属性与数据库表列的相关性。

...它是 SQLAlchemy ORM 的基础。使用 ORM,Python 类表示数据库中的表,并且需要某种机制将类的属性与表中的列相关联。如果您不使用 ORM,则您的表不会映射到 Python 类,因此没有使用映射器。这就是为什么您从get_mapper().

在您的示例中:

m = MetaData(bind=engine,schema='test')
m.reflect()

MetaData是:

对象的集合Table及其关联的模式构造。

MetaData.reflect

为数据库中可用但尚未出现Table在.MetaDataMetaData

因此,此时,您有一个Table对象集合,并且您希望对其中一个对象执行批量插入。不要将Table对象与 ORM 映射类混淆,它们不是一回事。

有关状态的文档bulk_insert_mappings

对给定的映射字典列表执行批量插入。

给定字典中的值通常不加修改地传递到核心 Insert() 构造中

您正在尝试实现数据的批量插入,我们可以跳过 ORM 方法(任何涉及 的方法Session)并显式地与核心交互。

该表达式pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records")返回dicts like:的列表[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}],因此为了简单起见,我将使用此处的示例输出。

您的元数据对象中有已使用 检索的表m.tables['test.test'],并且该Table对象可用于生成其自己的插入语句:

print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)

为了执行多个语句,我们可以将字典列表传递给Connection.execute()如下所示。

ORM 的好处之一Session是它允许显式事务管理,您可以在哪里调用Session.rollback()Session.commit()在任何需要的地方进行。连接对象也可以在类似于Sessionusing的事务中显式操作Engine.begin()

例如,使用上下文管理器:

with engine.begin() as conn:
    conn.execute(
        m.tables['test.test'].insert(),
        *[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
    )

如果上下文中没有错误,这将自动提交查询,如果有错误则回滚。

引擎日志显示此表达式发出以下查询:

INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})

以下人为设计的示例通过 using 显示了您的原始查询Session.bulk_insert_mappings()。我不得不创建一个 ORM 模型来表示表并向表中添加一个id字段,因为 ORM 不喜欢在没有主键的情况下工作。

m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)

class Test(Base):
    __tablename__ = 'test'
    id = Column(Integer, primary_key=True)
    a = Column(Text)


Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()

这是引擎日志中执行的查询:

INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})

您会注意到,这与我们通过直接使用 Core 实现的查询完全相同。

于 2019-08-28T00:28:27.537 回答
0

我一直在谷歌上搜索完全相同的问题。但是,我找到了解决此问题的方法。

class Helper():
   pass
new_mapper = sqlalchemy.orm.mapper(Helper, local_table = m.tables['test.test'])
session.bulk_insert_mappings(new_mapper, 
df.to_dict(orient="records"), return_defaults = False)
session.commit()
session.close()

根据以下链接,我认为 df.to_sql 在将大量数据帧插入 sql 表时表现非常差。然而,结果证明 bulk_insert_mappings 慢得多。我希望它有所帮助。

于 2019-08-27T19:09:46.287 回答