0

我下面的例程需要一个 urllib2.Requests 列表,并为每个请求生成一个新进程并触发它们。目的是为了异步速度,所以这一切都是一劳永逸的(不需要响应)。问题是下面代码中产生的进程永远不会终止。因此,在其中一些之后,盒子就会OOM。上下文:Django 网络应用程序。有什么帮助吗?

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)



def request_manager(req_list):
    try:
            # put request list in the queue
            for req in req_list:
                    MPQ.put(req)

                    # call processes on queue
                    worker = multiprocessing.Process(target=process_request, args=(MPQ,))
                    worker.daemon = True
                    worker.start()

            # move on after queue is empty
            MPQ.join()

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(MPQ):
    try:
            while True:
                    req = MPQ.get()
                    dr = urllib2.urlopen(req)
                    MPQ.task_done()

    except Exception, e:
            logging.error(traceback.print_exc())
4

4 回答 4

1

也许我不对,但是

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)



def request_manager(req_list):
    try:
            # put request list in the queue
            pool=[]
            for req in req_list:
                    MPQ.put(req)

                    # call processes on queue
                    worker = multiprocessing.Process(target=process_request, args=(MPQ,))
                    worker.daemon = True
                    worker.start()
                    pool.append(worker)

            # move on after queue is empty
            MPQ.join()
            # Close not needed processes
            for p in pool: p.terminate()

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(MPQ):
    try:
            while True:
                    req = MPQ.get()
                    dr = urllib2.urlopen(req)
                    MPQ.task_done()

    except Exception, e:
            logging.error(traceback.print_exc())
于 2013-09-05T19:32:40.653 回答
0

好吧,经过一番摆弄(和一夜好眠),我相信我已经找到了问题所在(感谢 Eri,你是我需要的灵感)。僵尸进程的主要问题是我没有发回信号表明该进程已完成(并杀死它),我(天真地)认为这两者都是多进程自动发生的。

有效的代码:

# function that will be run through the pool
def process_request(req):
    try:
            dr = urllib2.urlopen(req, timeout=30)

    except Exception, e:
            logging.error(traceback.print_exc())

# process killer
def sig_end(r):
    sys.exit()

# globals
MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
CHUNK_SIZE = 20
POOL = multiprocessing.Pool(MP_CONCURRENT)    

# pool initiator
def request_manager(req_list):
    try:
            resp = POOL.map_async(process_request, req_list, CHUNK_SIZE, callback=sig_end)

    except Exception, e:
            logging.error(traceback.print_exc())

几点注意事项:

1) 将被“map_async”(本例中为“process_request”)命中的函数必须首先定义(并且在全局声明之前)。

2)退出进程可能有更优雅的方式(欢迎提出建议)。

3)在这个例子中使用池确实是最好的(再次感谢 Eri),因为“回调”功能允许我立即发出信号。

于 2013-09-06T13:13:26.410 回答
0
MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)
CHUNK_SIZE = 20 #number of requests sended to one process.
pool = multiprocessing.Pool(MP_CONCURRENT)

def request_manager(req_list):
    try:
            # put request list in the queue
            responce=pool.map(process_request,req_list,CHUNK_SIZE) # function exits after all requests called and pool work ended
    # OR
            responce=pool.map_async(process_request,req_list,CHUNK_SIZE) #function request_manager exits after all requests passed to pool

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(req):
    dr = urllib2.urlopen(req)

这比你的代码快 5-10 倍

于 2013-09-05T19:17:08.883 回答
0

将侧“brocker”集成到 django(例如rabbitmq或类似的东西)。

于 2013-09-05T20:52:55.790 回答