0

我得到了执行 mysqldump 的代码... | gzip -c 4 > table.sql.gz 在数据库的每个 innodb 表上,并通过 rsync 在存储上发送。代码将在 amqp 工作人员之一从 rabbitmq 收到消息后运行。由于某种原因,我有很多 mysqldump 命令的僵尸进程,但实际上程序是完整的。为什么有很多不复存在的,我该如何预防?

def _gzip_dump(self, db_dir, table, db, charset):
    gzip_logger = logger.get_logger()

    gzip_dump_file = os.path.join(db_dir, table + ".sql.gz")

    if os.path.isfile(gzip_dump_file):
        os.remove(gzip_dump_file)

    command_dump = [helper.get_util("mysqldump", use_chroot=False),
                    "--defaults-extra-file=/root/.my.cnf",
                    "--max_allowed_packet=%s" % self.get_max_allowed_packet(),
                    "--single-transaction",
                    "--force",
                    "--opt",
                    "--compress",
                    "--skip-triggers",
                    "--default-character-set=%s" % unicode(charset),
                    "--", db, table]
    command_gzip = [helper.get_util("gzip", use_chroot=False), "-c", "-4"]

    fp = os.open(gzip_dump_file, os.O_WRONLY | os.O_CREAT)
    out_file = os.fdopen(fp, "w", 0)  # если fp подсвечивает, то это не косяк

    p_dump = helper.SubprocessRunner(command=command_dump, use_chroot=False, logger=gzip_logger,
                                     log_prefix="backup_gzip",
                                     stderr=subprocess.PIPE,
                                     stdout=subprocess.PIPE)
    p_dump.run()

    p_gzip = helper.SubprocessRunner(command=command_gzip, use_chroot=False, logger=gzip_logger,
                                     log_prefix="backup_gzip",
                                     stdout=out_file,
                                     stdin=p_dump.process.stdout)
    p_gzip.run()

    p_gzip.wait()

    if p_gzip.process.returncode != 0:
        if os.path.isfile(gzip_dump_file):
            os.remove(gzip_dump_file)
        raise Error("Failed to dump & gzip db %s" % gzip_dump_file)

    remote_path = '/mysql/base/' + db + '/' + table + ".sql.gz"

    return gzip_dump_file, remote_path

helper.SubprocessRunner 它是 Popen 之上的一个包装器

    def run(self):
    try:
        if self.logger:
            self.logger.debug("%s : execute command %s" % (as_unicode(self.log_prefix), as_unicode(self.command)))
    except:
        if self.logger:
            self.logger.error("%s : Error when write log" % (as_unicode(self.log_prefix)))

    command = map(lambda item: as_default_string(item), self.command)
    self.process = subprocess.Popen(command, **self.process_options)

    return self

def wait(self, extended_return=False, write_output_in_log=True):
    out, err = self.process.communicate()

    try:
        if err and self.logger:
            self.logger.error("%s : Error: %s" % (as_unicode(self.log_prefix), as_unicode(err)))

        if out and write_output_in_log and self.logger:
            self.logger.debug("%s : command output: %s" % (as_unicode(self.log_prefix), as_unicode(out)))

    except:
        if self.logger:
            self.logger.error("%s : Error when write log" % (as_unicode(self.log_prefix)))

    return (out, err, self.process.returncode) if extended_return else out
4

0 回答 0