8

我想使用子进程让 20 个书面脚本实例并行运行。假设我有一个包含 100.000 个条目的大 URL 列表,我的程序应该控制我的脚本的 20 个实例一直在该列表上工作。我想将其编码如下:

urllist = [url1, url2, url3, .. , url100000]
i=0
while number_of_subproccesses < 20 and i<100000:
    subprocess.Popen(['python', 'script.py', urllist[i]]
    i = i+1

我的脚本只是将某些内容写入数据库或文本文件。它不输出任何内容,也不需要比 url 更多的输入。

我的问题是我无法找到如何获取活动子进程的数量。我是一个新手程序员,所以欢迎每一个提示和建议。我还想知道一旦加载了 20 个子进程以使 while 循环再次检查条件,我该如何管理它?我想也许在它上面放另一个while循环,比如

while i<100000
   while number_of_subproccesses < 20:
       subprocess.Popen(['python', 'script.py', urllist[i]]
       i = i+1
       if number_of_subprocesses == 20:
           sleep() # wait to some time until check again

或者也许有更好的可能性,即while循环总是检查子进程的数量?

我也考虑过使用模块多处理,但我发现只调用带有子处理的 script.py 而不是带有多处理的函数真的很方便。

也许有人可以帮助我并引导我走向正确的方向。多谢!

4

3 回答 3

6

采用与上述不同的方法 - 因为似乎无法将回调作为参数发送:

NextURLNo = 0
MaxProcesses = 20
MaxUrls = 100000  # Note this would be better to be len(urllist)
Processes = []

def StartNew():
   """ Start a new subprocess if there is work to do """
   global NextURLNo
   global Processes

   if NextURLNo < MaxUrls:
      proc = subprocess.Popen(['python', 'script.py', urllist[NextURLNo], OnExit])
      print ("Started to Process %s", urllist[NextURLNo])
      NextURLNo += 1
      Processes.append(proc)

def CheckRunning():
   """ Check any running processes and start new ones if there are spare slots."""
   global Processes
   global NextURLNo

   for p in range(len(Processes):0:-1): # Check the processes in reverse order
      if Processes[p].poll() is not None: # If the process hasn't finished will return None
         del Processes[p] # Remove from list - this is why we needed reverse order

   while (len(Processes) < MaxProcesses) and (NextURLNo < MaxUrls): # More to do and some spare slots
      StartNew()

if __name__ == "__main__":
   CheckRunning() # This will start the max processes running
   while (len(Processes) > 0): # Some thing still going on.
      time.sleep(0.1) # You may wish to change the time for this
      CheckRunning()

   print ("Done!")
于 2013-08-08T15:51:28.290 回答
2

只要在启动它们时保持计数,如果有任何要处理的 url 列表条目,则使用每个子进程的回调来启动一个新子进程。

例如,假设您的子进程在结束时调用传递给它的 OnExit 方法:

NextURLNo = 0
MaxProcesses = 20
NoSubProcess = 0
MaxUrls = 100000

def StartNew():
   """ Start a new subprocess if there is work to do """
   global NextURLNo
   global NoSubProcess

   if NextURLNo < MaxUrls:
      subprocess.Popen(['python', 'script.py', urllist[NextURLNo], OnExit])
      print "Started to Process", urllist[NextURLNo]
      NextURLNo += 1
      NoSubProcess += 1

def OnExit():
   NoSubProcess -= 1

if __name__ == "__main__":
   for n in range(MaxProcesses):
      StartNew()
   while (NoSubProcess > 0):
      time.sleep(1)
      if (NextURLNo < MaxUrls):
         for n in range(NoSubProcess,MaxProcesses):
             StartNew()
于 2013-08-08T10:48:42.013 回答
2

为了保持恒定数量的并发请求,您可以使用线程池:

#!/usr/bin/env python
from multiprocessing.dummy import Pool

def process_url(url):
    # ... handle a single url

urllist = [url1, url2, url3, .. , url100000]
for _ in Pool(20).imap_unordered(process_url, urllist):
    pass

要运行进程而不是线程,.dummy请从导入中删除。

于 2015-05-31T16:27:31.313 回答