我在 Python 中使用以下代码(使用 pyodbc 作为 MS-Access 基础)。
cursor.execute("select a from tbl where b=? and c=?", (x, y))
没关系,但出于维护目的,我需要知道发送到数据库的完整且准确的 SQL 字符串。
有可能吗?怎么做?
因司机而异。这里有两个例子:
import MySQLdb
mc = MySQLdb.connect()
r = mc.cursor()
r.execute('select %s, %s', ("foo", 2))
r._executed
"select 'foo', 2"
import psycopg2
pc = psycopg2.connect()
r = pc.cursor()
r.execute('select %s, %s', ('foo', 2))
r.query
"select E'foo', 2"
答案是不。我在项目的主页谷歌代码(和谷歌组)上发布了我的问题,答案是:
l...@deller.id.au 对问题 163 的评论 #1:cursor.mogrify 返回查询字符串 http://code.google.com/p/pyodbc/issues/detail?id=163
作为参考,这里是记者所指的“mogrify”光标方法的 pyscopg 文档的链接:http: //initd.org/psycopg/docs/cursor.html#cursor.mogrify
pyodbc 不执行任何此类 SQL 转换:它将参数化 SQL 直接逐字传递给 ODBC 驱动程序。唯一涉及的处理是将参数从 Python 对象转换为 ODBC API 支持的 C 类型。
在将 SQL 发送到服务器之前,可能会在 ODBC 驱动程序中对 SQL 执行一些转换(例如 Microsoft SQL Native Client 执行此操作),但这些转换对 pyodbc 是隐藏的。
因此,我认为在 pyodbc 中提供 mogrify 功能是不可行的。
您可以使用print cursor._last_executed
来获取最后执行的查询。
阅读此答案,您还可以使用print cursor.mogrify(query,list)
它在执行之前或之后查看完整的查询。
为了调试目的,我创建了一个简单地替换的检查函数?使用查询值......这不是高科技:)但它有效!:D
def check_sql_string(sql, values):
unique = "%PARAMETER%"
sql = sql.replace("?", unique)
for v in values: sql = sql.replace(unique, repr(v), 1)
return sql
query="""SELECT * FROM dbo.MA_ItemsMonthlyBalances
WHERE Item = ? AND Storage = ? AND FiscalYear = ? AND BalanceYear = ? AND Balance = ? AND BalanceMonth = ?"""
values = (1,2,"asdasd",12331, "aas)",1)
print(check_sql_string(query,values))
结果:
SELECT * FROM dbo.MA_ItemsMonthlyBalances WHERE Item = 1 AND Storage = 2 AND FiscalYear = 'asdasd' AND BalanceYear = 12331 AND Balance = 'aas') AND BalanceMonth = 1
有了这个,你可以记录或做任何你想做的事情:
rowcount = self.cur.execute(query,values).rowcount
logger.info(check_sql_string(query,values))
如果您只需要在函数中添加一些异常捕获。
根据您使用的驱动程序,这可能会也可能不会。在某些数据库中,参数(?
s )被简单地替换,正如 user589983 的回答所暗示的那样(尽管驱动程序必须做一些事情,比如在这些字符串中引用字符串和转义引号,以产生可执行的语句)。
其他驱动程序将要求数据库编译(“准备”)语句,然后要求它使用给定的值执行准备好的语句。正是通过这种方式,使用准备好的或参数化的语句有助于避免 SQL 注入——在语句执行时,数据库“知道”您希望运行的 SQL 的一部分,以及其中使用的值的一部分那句话。
从PyODBC 文档的快速浏览来看,执行实际的 SQL 似乎是不可能的,但我可能错了。
之后我会检查 cursor._last_executed ,但如果你希望它们实时打印出来而不改变每次执行,试试这个猴子补丁:
def log_queries(cur):
def _query(q):
print q # could also use logging
return cur._do_query(q)
cur._query = _query
conn = MySQLdb.connect( read_default_file='~/.my.cnf' )
cur = conn.cursor()
log_queries(cur)
cur.execute('SELECT %s, %s, %s', ('hello','there','world'))
它非常依赖 MySQLdb(并且可能会在以后的版本中中断)。它之所以有效,是因为 cur._query 当前只是调用 calls._do_query 并返回其结果。
我使用Wireshark在 pyodbc 中查看实际的 SQL 字符串。如果您使用不受保护的服务器连接进行开发,这可能会有所帮助。
由于 pyodbc 无法在执行查询之前查看查询。您可以手动预填充查询,以了解最终的外观。它不会像实际查询那样工作,但它帮助我弄清楚在需要超过 40 个参数的查询中是否有任何错误。
query = """select * from [table_name] where a = ? and b = ?"""
parameter_list = ['dog', 'cat'] # List of parameters, a = 'dog', b = 'cat'.
query_list = list(query) # Split query string into individual characters.
# Loop through list and populate the question marks.
for i in range(len(parameter_list)):
for idx, val in enumerate(query_list):
if val == '?':
query_list[idx] = str(parameter_list[i])
break
# Rejoin the query string.
query_populate = ''.join(query_list)
#### Result ####
"""select * from [table_name] where a = dog and b = cat"""
编写sql字符串,然后执行:
sql='''select a
from tbl
where b=?
and c=? '''
cursor.execute(sql, x, y)
print 'just executed:',(sql, x,y)
现在您可以使用 SQL 语句做任何您想做的事情。