0

我有一个问题,我需要将消费者生成的值写入磁盘。我不想每次都打开要写入的文件的新实例,所以我想使用第二个队列和另一个消费者从单个 Greenlet 写入磁盘。我的代码的问题是第二个队列没有从第一个队列异步消耗。第一个队列首先完成,然后第二个队列被消耗。我想同时将值写入磁盘,然后生成其他值。感谢帮助!

#!/usr/bin/python
#- * -coding: utf-8 - * -
import gevent #pip install gevent
from gevent.queue import *
import gevent.monkey
from timeit import default_timer as timer
from time import sleep
import cPickle as pickle

gevent.monkey.patch_all()

def save_lineCount(count):
    with open("count.p", "wb") as f:
        pickle.dump(count, f)

def loader():
    for i in range(0,3):
        q.put(i)

def writer():
    while True:
        task = q_w.get()
        print "writing",task
        save_lineCount(task)

def worker():
    while not q.empty():
        task = q.get()
        if task%2:
            q_w.put(task)
            print "put",task
            sleep(10)

def asynchronous():
    threads = []
    threads.append(gevent.spawn(writer))
    for i in range(0, 1):
        threads.append(gevent.spawn(worker))
    start = timer()
    gevent.joinall(threads,raise_error=True)
    end = timer()
    #pbar.close()
    print "\n\nTime passed: " + str(end - start)[:6]


q = gevent.queue.Queue()
q_w = gevent.queue.Queue()
gevent.spawn(loader).join()
asynchronous()
4

1 回答 1

1

一般来说,这种方法应该可以正常工作。但是,此特定代码存在一些问题:

  • 调用time.sleep将导致所有greenlet 阻塞。您需要调用gevent.sleep或猴子修补该过程才能只有一个 greenlet 块(我看到已gevent.monkey导入,但未patch_all调用)。我怀疑这是这里的主要问题。

  • 写入文件也是同步的,并导致所有 greenlet 阻塞。FileObjectThread如果这是一个主要瓶颈,您可以使用。

于 2016-10-13T11:56:01.803 回答