5

我有一个加载、转换和计算数据的 python 脚本。在 sql-server 中有一个存储过程,它需要一个表值参数、2 个必需参数和 2 个可选参数。在 sql server 中,我可以调用这个 SP:

USE [InstName]
GO

DECLARE @return_value int
DECLARE @MergeOnColumn core.MatchColumnTable

INSERT INTO @MergeOnColumn
SELECT 'foo.ExternalInput','bar.ExternalInput'

EXEC    @return_value = [core].[_TableData]
        @Target = N'[dbname].[tablename1]',
        @Source = N'[dbname].[table2]',
        @MergeOnColumn  = @MergeOnColumn,
        @Opt1Param = False,
        @Opt2Param = False

SELECT  'Return Value' = @return_value

GO

经过全面搜索,我发现了以下帖子:

如何使用需要用户定义类型表参数的 SQLAlchemy 调用存储过程

它建议使用 PYTDS 和 sql-alchemy 的方言“sql alchemy pytds”来调用具有表值参数的 SP。通过这篇文章和文档,我创建了以下 Python 脚本:

import pandas as pd
import pytds
from pytds import login
import sqlalchemy as sa
from sqlalchemy import create_engine
import sqlalchemy_pytds

def connect():
    return pytds.connect(dsn='ServerName',database='DBName', auth=login.SspiAuth())

engine = sa.create_engine('mssql+pytds://[ServerName]', creator=connect)
conn = engine.raw_connection()
with conn.cursor() as cur:
    arg = ("foo.ExternalInput","bar.ExternalInput")
    tvp = pytds.TableValuedParam(type_name="MergeOnColumn", rows=(arg))
cur.execute('EXEC test_proc %s', ("[dbname].[table2]", "[dbname].[table1]", tvp,))
cur.fetchall()

当我运行此代码时,我收到以下错误消息:

TypeError: not all arguments converted during string formatting

有谁知道如何正确传递多个参数或建议我如何直接处理这个调用 SP?

4

3 回答 3

4

根据对我的问题的评论,我设法让存储过程使用表值参数运行(并从 SP 获取返回值)最终脚本如下:

import pandas as pd
import pytds
from pytds import login
import sqlalchemy as sa
from sqlalchemy import create_engine
import sqlalchemy_pytds

def connect():
    return pytds.connect(dsn='ServerName',database='DBName',autocommit=True, auth=login.SspiAuth())

engine = sa.create_engine('mssql+pytds://[ServerName]', creator=connect)
conn = engine.raw_connection()

with conn.cursor() as cur:
    arg = [["foo.ExternalInput","bar.ExternalInput"]]
    tvp = pytds.TableValuedParam(type_name="core.MatchColumnTable", rows=arg)
    cur.execute("EXEC test_proc @Target = N'[dbname].[tablename1]', @Source = N'[dbname].[table2]', @CleanTarget = 0, @UseColumnsFromTarget = 0, @MergeOnColumn = %s", (tvp,))
    result = cur.fetchall()
    print(result)

在连接中添加自动提交(在游标中提交事务),表值参数 (marchcolumntable) 需要 2 列,因此修改 arg 以适应 2 列。

除了 tvp 之外还需要的参数包含在 exec 字符串中。执行字符串中的最后一个参数是用 tvp 填充的 tvp 参数(mergeoncolumn)的名称。

您可以选择添加结果状态或行数,如 pytds 文档中所述: https ://python-tds.readthedocs.io/en/latest/index.html

笔记!:在存储过程中,您必须确保添加了 SET NOCOUNT ON 否则您将无法将任何结果返回给 Python

于 2018-08-21T11:37:49.260 回答
2

mssql+pyodbc://

pyodbc 在 2018 年 12 月 13 日发布的版本 4.0.25 中增加了对表值参数 (TVP) 的支持。只需将 TVP 值作为元组列表提供:

proc_name = "so51930062"
type_name = proc_name + "Type"

# set up test environment
with engine.begin() as conn:
    conn.exec_driver_sql(f"""\
        DROP PROCEDURE IF EXISTS {proc_name} 
    """)
    conn.exec_driver_sql(f"""\
        DROP TYPE IF EXISTS {type_name} 
    """)
    conn.exec_driver_sql(f"""\
        CREATE TYPE {type_name} AS TABLE (
        id int,
        txt nvarchar(50)
        ) 
    """)
    conn.exec_driver_sql(f"""\
        CREATE PROCEDURE {proc_name} 
        @prefix nvarchar(10),
        @tvp {type_name} READONLY
        AS
        BEGIN
            SET NOCOUNT ON;
            SELECT id, @prefix + txt AS new_txt FROM @tvp;
        END
    """)

#run test
with engine.begin() as conn:
    data = {"prefix": "new_", "tvp": [(1, "foo"), (2, "bar")]}
    sql = f"{{CALL {proc_name} (:prefix, :tvp)}}"
    print(conn.execute(sa.text(sql), data).fetchall())
    # [(1, 'new_foo'), (2, 'new_bar')]
于 2021-04-21T15:05:25.750 回答
2

pytds

使用纯 Python TDS(表格数据流)协议实现的 MSSQL 的 Python DBAPI 驱动程序

我通过针对 SQL Server的存储过程使用pytds进行合并/更新插入。

例子

下面是一个基本函数的例子,一个行数据用 Tuple 表示:

def get_connection(instance: str, database: str, user: str, password: str):
    return pytds.connect(
        dsn=instance, database=database, user=user, password=password, autocommit=True
    )

def execute_with_tvp(connection: pytds.Connection, procedure_name: str, rows: list):
    with connection.cursor() as cursor:
         tvp = pytds.TableValuedParam(type_name=my_type, rows=rows)
         cursor.callproc(procedure_name, tvp)
于 2019-08-16T20:25:17.660 回答