我有一个脚本(worker.py),它以表单打印无缓冲的输出......
1
2
3
.
.
.
n
其中 n 是此脚本中的循环将进行的某个恒定迭代次数。在另一个脚本(service_controller.py)中,我启动了多个线程,每个线程都使用 subprocess.Popen(stdout=subprocess.PIPE, ...); 现在,在我的主线程 (service_controller.py) 中,我想读取每个线程的 worker.py 子进程的输出,并使用它来计算完成前剩余时间的估计值。
我拥有从 worker.py 读取标准输出并确定最后打印的数字的所有逻辑。问题是我无法弄清楚如何以非阻塞方式执行此操作。如果我读取一个恒定的 bufsize,那么每次读取最终都会等待来自每个工作人员的相同数据。我尝试了很多方法,包括使用 fcntl、select + os.read 等。我最好的选择是什么?如果需要,我可以发布我的来源,但我认为解释很好地描述了问题。
感谢您在这里的任何帮助。
编辑
添加示例代码
我有一个启动子流程的工人。
class WorkerThread(threading.Thread):
def __init__(self):
self.completed = 0
self.process = None
self.lock = threading.RLock()
threading.Thread.__init__(self)
def run(self):
cmd = ["/path/to/script", "arg1", "arg2"]
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
#flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
#fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
def get_completed(self):
self.lock.acquire();
fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
if fd:
self.data += os.read(fd, 1)
try:
self.completed = int(self.data.split("\n")[-2])
except IndexError:
pass
self.lock.release()
return self.completed
然后我有一个线程管理器。
class ThreadManager():
def __init__(self):
self.pool = []
self.running = []
self.lock = threading.Lock()
def clean_pool(self, pool):
for worker in [x for x in pool is not x.isAlive()]:
worker.join()
pool.remove(worker)
del worker
return pool
def run(self, concurrent=5):
while len(self.running) + len(self.pool) > 0:
self.clean_pool(self.running)
n = min(max(concurrent - len(self.running), 0), len(self.pool))
if n > 0:
for worker in self.pool[0:n]:
worker.start()
self.running.extend(self.pool[0:n])
del self.pool[0:n]
time.sleep(.01)
for worker in self.running + self.pool:
worker.join()
和一些运行它的代码。
threadManager = ThreadManager()
for i in xrange(0, 5):
threadManager.pool.append(WorkerThread())
threadManager.run()
我已经删除了其他代码的日志,希望能找出问题所在。