0

请查看此基本示例

def queue_add():
    #this will exit when all lines from the 
    #file are added in the Queue
    with open('get_from.txt') as f:
        for line in f:
            q.add(line.strip())

def queue_save():
    #when to trigger save_to.close()?!?!
    save_to = open('save_to.txt')
    while True:
        data = q.get() #this functions blocks if `q` is empty, so how to know
                       #when to close the file handler `save_to`??
        save_to.write(data)

def worker():
    #this is daemon process 
    while True:
        #work with file from 
        get = q.get()
        #work with data
        q_done.put('processed data')
        q.task_done()

q = Queue()
q_done = Queue()
#starts the processes here..

所以我的问题是如何知道queue_save()已经处理并保存了 done_q 中的所有数据并关闭它的 file_handler?

4

1 回答 1

1

如何使用哨兵值。

import Queue
import threading

END_OF_DATA = object() # sentinel

def queue_add(q):
    with open('get_from.txt') as f:
        for line in f:
            q.put(line.strip())

def process(x):
    # dummy
    return str(len(x)) + '\n'

def worker(q_in, q_out):
    while True:
        data = q_in.get()
        if data is END_OF_DATA:
            break
        q_out.put(process(data))
        q_in.task_done()

def queue_save(q):
    save_to = open('save_to.txt', 'w')
    while True:
        data = q.get()
        if data is END_OF_DATA:
            break
        save_to.write(data)
        q.task_done()
    save_to.close()


q1 = Queue.Queue()
q2 = Queue.Queue()

n_workers = 4
t1 = threading.Thread(target=queue_add, args=(q1,))
workers = [
    threading.Thread(target=worker, args=(q1, q2,))
    for i in range(n_workers)
]
t3 = threading.Thread(target=queue_save, args=(q2,))

t1.start()
for worker in workers: worker.start()
t3.start()

t1.join()

q1.join()
for worker in workers: q1.put(END_OF_DATA)

for worker in workers: worker.join()

q2.join()
q2.put(END_OF_DATA)
t3.join()

编辑:同步多个工人。

于 2013-06-10T10:17:23.223 回答