4

我正在尝试采用一种简单的方法来跟踪我想使用 python 中的 os 模块运行的定期 mysqldump 命令。我已经写了这个,但在测试中它不会引发异常,即使 mysqldump 命令以错误完成。我对python很陌生,所以我可能会非常接近这个,但我想我会尝试指出正确的方向。

db_dump = "mysqldump -u %s -p%s --socket=source_socket --databases %s | mysql -u %s -p%s   --socket=dest_socket" % (db_user, db_pass, ' '.join(db_list), db_user, db_pass)

try:
    os.system(db_dump)
except:
    logging.error("databases did not dump")
else:    
    logging.info("database dump complete")
4

3 回答 3

5

os.system 不是调用系统命令的一种非常健壮或强大的方法,我建议使用subprocess.check_output()subprocess.check_call

IE,

>>> cmd = 'ls -l'
>>> badcmd = 'ls /foobar'
>>> subprocess.check_call(cmd.split())
0
>>> subprocess.check_call(badcmd.split())
ls: /foobar: No such file or directory
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py", line 511, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['ls', '/foobar']' returned non-zero exit status 1
于 2012-12-06T22:12:40.953 回答
2

os.system()返回一个整数结果代码。返回 0 时,命令运行成功;当它返回一个非零值时,表示错误。

db_dump = "mysqldump -u %s -p%s --socket=source_socket --databases %s | mysql -u %s -p%s   --socket=dest_socket" % (db_user, db_pass, ' '.join(db_list), db_user, db_pass)

result = os.system(db_dump)
if 0 == result:
    logging.info("database dump complete")
else:
    logging.error("databases did not dump; result code: %d" % result)

像@COpython,我推荐使用subprocess. 它比它复杂一点,os.system()但它非常灵活。将os.system()输出发送到终端,但subprocess您可以收集输出,以便搜索错误消息或其他内容。或者您可以丢弃输出。

于 2012-12-06T22:11:20.487 回答
2

这就是我要做的。

import logging
import subprocess
log = logging.getLogger(__name__)

cmd = "mysqldump -u %s -p%s --socket=source_socket --databases %s | mysql -u %s -p%s " \
      "--socket=dest_socket" % (db_user, db_pass, ' '.join(db_list), db_user, db_pass)

process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE.PIPE)
stdout, stderr = process.communicate()
stdout = [x for x in stdout.split("\n") if x != ""]
stderr = [x for x in stderr.split("\n") if x != ""]

if process.returncode < 0 or len(stderr):
    for error in stderr:
        log.error(error)
于 2012-12-06T22:27:43.963 回答