1

编辑:答案是操作系统正在取消进程,因为我正在消耗所有内存

我正在生成足够多的子进程以保持内核的平均负载为 1:1,但是在一小时内的某个时间点,此脚本可能会运行数天,其中 3 个进程会运行:

tipu   14804  0.0  0.0 328776   428 pts/1    Sl   00:20   0:00 python run.py
tipu   14808 64.4 24.1 2163796 1848156 pts/1 Rl   00:20  44:41 python run.py
tipu   14809  8.2  0.0      0     0 pts/1    Z    00:20   5:43 [python] <defunct>
tipu   14810 60.3 24.3 2180308 1864664 pts/1 Rl   00:20  41:49 python run.py
tipu   14811 20.2  0.0      0     0 pts/1    Z    00:20  14:04 [python] <defunct>
tipu   14812 22.0  0.0      0     0 pts/1    Z    00:20  15:18 [python] <defunct>
tipu   15358  0.0  0.0 103292   872 pts/1    S+   01:30   0:00 grep python

我不知道为什么会这样,附加的是主从。如果需要,我也可以附加 mysql/pg 包装器,有什么建议吗?

slave.py

from boto.s3.key import Key
import multiprocessing
import gzip
import os
from  mysql_wrapper import MySQLWrap
from pgsql_wrapper import PGSQLWrap
import boto
import re

class Slave:

    CHUNKS = 250000

    BUCKET_NAME = "bucket"
    AWS_ACCESS_KEY = ""
    AWS_ACCESS_SECRET = ""
    KEY = Key(boto.connect_s3(AWS_ACCESS_KEY, AWS_ACCESS_SECRET).get_bucket(BUCKET_NAME))
    S3_ROOT = "redshift_data_imports"
    COLUMN_CACHE = {}
    DEFAULT_COLUMN_VALUES = {}

    def __init__(self, job_queue):
        self.log_handler = open("logs/%s" % str(multiprocessing.current_process().name), "a");
        self.mysql = MySQLWrap(self.log_handler)
        self.pg = PGSQLWrap(self.log_handler)
        self.job_queue = job_queue


    def do_work(self):
        self.log(str(os.getpid()))
        while True:

            #sample job in the abstract: mysql_db.table_with_date-iteration
            job = self.job_queue.get()

            #queue is empty
            if job is None:
                self.log_handler.close()
                self.pg.close()
                self.mysql.close()
                print("good bye and good day from %d" % (os.getpid()))
                self.job_queue.task_done()
                break

            #curtail iteration
            table = job.split('-')[0]

            #strip redshift table from job name
            redshift_table = re.sub(r"(_[1-9].*)", "", table.split(".")[1])

            iteration = int(job.split("-")[1])
            offset = (iteration - 1) * self.CHUNKS

            #columns redshift is expecting
            #bad tables will slip through and error out, so we catch it
            try:
                colnames = self.COLUMN_CACHE[redshift_table]
            except KeyError:
                self.job_queue.task_done()
                continue

            #mysql fields to use in SELECT statement
            fields = self.get_fields(table)

            #list subtraction determining which columns redshift has that mysql does not
            delta = (list(set(colnames) - set(fields.keys())))

            #subtract columns that have a default value and so do not need padding
            if delta:
                delta = list(set(delta) - set(self.DEFAULT_COLUMN_VALUES[redshift_table]))

            #concatinate columns with padded \N
            select_fields = ",".join(fields.values()) + (",\\N" * len(delta))

            query = "SELECT %s FROM %s LIMIT %d, %d" % (select_fields, table,
                    offset, self.CHUNKS)

            rows = self.mysql.execute(query)

            self.log("%s: %s\n" % (table, len(rows)))

            if not rows:
                self.job_queue.task_done()
                continue

            #if there is more data potentially, add it to the queue
            if len(rows) == self.CHUNKS:
                self.log("putting %s-%s" % (table, (iteration+1)))
                self.job_queue.put("%s-%s" % (table, (iteration+1)))

            #various characters need escaping
            clean_rows = []
            redshift_escape_chars = set( ["\\", "|", "\t", "\r", "\n"] )
            in_chars = ""

            for row in rows:
                new_row = []
                for value in row:
                    if value is not None:
                        in_chars = str(value)
                    else:
                        in_chars = ""

                    #escape any naughty characters
                    new_row.append("".join(["\\" + c if c in redshift_escape_chars else c for c in in_chars]))
                new_row = "\t".join(new_row)
                clean_rows.append(new_row)

            rows = ",".join(fields.keys() + delta)
            rows += "\n" + "\n".join(clean_rows)

            offset = offset + self.CHUNKS

            filename = "%s-%s.gz" % (table, iteration) 
            self.move_file_to_s3(filename, rows)

            self.begin_data_import(job, redshift_table, ",".join(fields.keys() +
               delta))

            self.job_queue.task_done()


    def move_file_to_s3(self, uri, contents):

        tmp_file = "/dev/shm/%s" % str(os.getpid())

        self.KEY.key = "%s/%s" % (self.S3_ROOT, uri)
        self.log("key is %s" % self.KEY.key )

        f = gzip.open(tmp_file, "wb")
        f.write(contents)
        f.close()

        #local saving allows for debugging when copy commands fail
        #text_file = open("tsv/%s" % uri, "w")
        #text_file.write(contents)
        #text_file.close()

        self.KEY.set_contents_from_filename(tmp_file, replace=True)

    def get_fields(self, table):
        """
            Returns a dict used as: 
                {"column_name": "altered_column_name"}
            Currently only the debug column gets altered
        """
        exclude_fields = ["_qproc_id", "_mob_id", "_gw_id", "_batch_id", "Field"]

        query = "show columns from %s" % (table)
        fields = self.mysql.execute(query)

        #key raw field, value mysql formatted field
        new_fields = {}

        #for field in fields:
        for field in [val[0] for val in fields]:
            if field in exclude_fields:
                continue
            old_field = field

            if "debug_mode" == field.strip():
                field = "IFNULL(debug_mode, 0)"

            new_fields[old_field] = field

        return new_fields

    def log(self, text):
        self.log_handler.write("\n%s" % text)

    def begin_data_import(self, table, redshift_table, fields):
        query = "copy %s (%s) from 's3://bucket/redshift_data_imports/%s' \
            credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' delimiter '\\t' \
            gzip NULL AS '' COMPUPDATE ON ESCAPE IGNOREHEADER 1;" \
            % (redshift_table, fields, table, self.AWS_ACCESS_KEY, self.AWS_ACCESS_SECRET)
        self.pg.execute(query)

master.py

from slave import Slave as Slave 
import multiprocessing
from mysql_wrapper import MySQLWrap as MySQLWrap
from pgsql_wrapper import PGSQLWrap as PGSQLWrap


class Master:

    SLAVE_COUNT = 5

    def __init__(self):
        self.mysql = MySQLWrap()
        self.pg = PGSQLWrap()

    def do_work(table):
        pass

    def get_table_listings(self):
        """Gathers a list of MySQL log tables needed to be imported"""

        query = 'show databases'
        result = self.mysql.execute(query)

        #turns list[tuple] into a flat list
        databases = list(sum(result, ()))

        #overriding during development
        databases = ['db1', 'db2', 'db3']]

        exclude = ('mysql', 'Database', 'information_schema')
        scannable_tables = []

        for database in databases:
            if database in exclude:
                continue

            query = "show tables from %s" % database
            result = self.mysql.execute(query)

            #turns list[tuple] into a flat list
            tables = list(sum(result, ()))

            for table in tables:
                exclude = ("Tables_in_%s" % database, "(", "201303", "detailed", "ltv")

                #exclude any of the unfavorables
                if any(s in table for s in exclude):
                    continue

                scannable_tables.append("%s.%s-1" % (database, table))

        return scannable_tables

    def init(self):
        #fetch redshift columns once and cache
        #get columns from redshift so we can pad the mysql column delta with nulls
        tables = ('table1', 'table2', 'table3')

        for table in tables:

            #cache columns
            query = "SELECT column_name FROM information_schema.columns WHERE \
            table_name = '%s'" % (table)
            result = self.pg.execute(query, async=False, ret=True)
            Slave.COLUMN_CACHE[table] = list(sum(result, ()))

            #cache default values
            query = "SELECT column_name FROM information_schema.columns WHERE \
            table_name = '%s' and column_default is not \
            null" % (table)

            result = self.pg.execute(query, async=False, ret=True)

            #turns list[tuple] into a flat list
            result = list(sum(result, ()))

            Slave.DEFAULT_COLUMN_VALUES[table] = result

    def run(self):
        self.init()

        job_queue = multiprocessing.JoinableQueue()
        tables = self.get_table_listings()
        for table in tables:
            job_queue.put(table)

        processes = []
        for i in range(Master.SLAVE_COUNT):
            process = multiprocessing.Process(target=slave_runner, args=(job_queue,))
            process.daemon = True
            process.start()
            processes.append(process)

        #blocks this process until queue reaches 0
        job_queue.join()

        #signal each child process to GTFO
        for i in range(Master.SLAVE_COUNT):
            job_queue.put(None)

        #blocks this process until queue reaches 0
        job_queue.join()

        job_queue.close()

        #do not end this process until child processes close out
        for process in processes:
            process.join()

        #toodles !
        print("this is master saying goodbye")


def slave_runner(queue):
    slave = Slave(queue)
    slave.do_work()
4

1 回答 1

6

没有足够的信息可以确定,但问题很可能Slave.do_work是引发未处理的异常。(有很多行代码可以在各种不同的条件下做到这一点。)

当你这样做时,子进程将退出。

在 POSIX 系统上......好吧,完整的细节有点复杂,但在简单的情况下(你在这里有什么),一个退出的子进程将作为一个<defunct>进程一直存在直到它被收割(因为父进程要么wait在它上面,或退出)。由于您的父代码在队列完成之前不会等待子代码,这正是发生的事情。

所以,有一个简单的胶带修复:

def do_work(self):
    self.log(str(os.getpid()))
    while True:
        try:
            # the rest of your code
        except Exception as e:
            self.log("something appropriate {}".format(e))
            # you may also want to post a reply back to the parent

您可能还想将大量内容分解try为不同的部分,以便区分可能出错的所有不同阶段(特别是如果其中一些意味着您需要回复,而有些则意味着您不需要)。


但是,看起来您尝试做的是完全复制 的行为multiprocessing.Pool,但在几个地方错过了该栏。这就提出了一个问题:为什么不Pool首先使用呢?map然后,您可以使用其中一种家庭方法进一步简化/优化事情。例如,您的整体Master.run可以简化为:

self.init()
pool = multiprocessing.Pool(Master.SLAVE_COUNT, initializer=slave_setup)
pool.map(slave_job, tables)
pool.join()

这将为您处理异常,并允许您在以后需要时返回值/异常,并让您使用内置logging库而不是尝试构建自己的库,等等。并且只需要对Slave.


如果您想从作业中提交新作业,最简单的方法可能是使用Future基于 - 的 API(它可以扭转局面,使未来的结果成为焦点,而池/执行器成为提供它们的愚蠢事物,而不是使池成为焦点,结果成为它回馈的愚蠢事物),但也有多种方法可以做到这Pool一点。例如,现在,您不会从每个作业中返回任何内容,因此,您可以只返回tables要执行的列表。这是一个简单的示例,展示了如何做到这一点:

import multiprocessing

def foo(x):
    print(x, x**2)
    return list(range(x))

if __name__ == '__main__':
    pool = multiprocessing.Pool(2)
    jobs = [5]
    while jobs:
        jobs, oldjobs = [], jobs
        for job in oldjobs:
            jobs.extend(pool.apply(foo, [job]))
    pool.close()
    pool.join()

显然,您可以通过将整个循环替换为例如馈入的列表理解来稍微压缩这一点itertools.chain,并且您可以通过将“提交者”对象传递给每个作业并添加到该对象而不是返回一个新工作列表等。但我想尽可能明确地表明它是多么少。


无论如何,如果您认为显式队列更容易理解和管理,那就去吧。只需查看来源multiprocessing.worker和/或concurrent.futures.ProcessPoolExecutor查看您需要自己做什么。这并不难,但有足够多的事情你可能会出错(就我个人而言,当我自己尝试做这样的事情时,我总是会忘记至少一个边缘情况),这是在寻找正确的代码的工作。


或者,您不能在这里使用的唯一原因似乎concurrent.futures.ProcessPoolExecutor是您需要初始化一些每个进程的状态(boto.s3.key.KeyMySqlWrap等),这可能是非常好的缓存原因。(如果这涉及到 Web 服务查询、数据库连接等,您当然不希望每个任务都这样做一次!)但是有几种不同的方法可以解决这个问题。

但是您可以继承ProcessPoolExecutor并覆盖未记录的函数_adjust_process_count(请参阅源代码了解它的简单程度)以传递您的 setup 函数,并且……这就是您所要做的。

或者你可以混合搭配。将from包裹在Futurefromconcurrent.futures周围。AsyncResultmultiprocessing

于 2013-03-26T21:38:04.177 回答