尝试将每个进程与单独线程上的超时连接的构造。所以主程序永远不会卡住,如果卡住了,也会因为超时而被杀死。这种技术是线程和多处理模块的组合。
这是我在内存中保持最小 x 线程数的方法。它是线程和多处理模块的组合。对于其他技术,如受人尊敬的成员在上面解释过,这可能是不寻常的,但可能非常值得。为了解释起见,我假设一次抓取至少 5 个网站。
所以这里是:-
#importing dependencies.
from multiprocessing import Process
from threading import Thread
import threading
# Crawler function
def crawler(domain):
# define crawler technique here.
output.write(scrapeddata + "\n")
pass
接下来是threadController函数。该函数将控制线程流向主内存。它将继续激活线程以维持 threadNum“最小”限制,即。5.它也不会退出,直到所有活动线程(acitveCount)都完成。
它将保持最少的 threadNum(5) startProcess 函数线程(这些线程最终会从 processList 启动进程,同时在 60 秒内加入它们)。启动 threadController 后,将有 2 个线程不包括在上述 5 个限制中,即。Main 线程和 threadController 线程本身。这就是为什么使用 threading.activeCount() != 2 的原因。
def threadController():
print "Thread count before child thread starts is:-", threading.activeCount(), len(processList)
# staring first thread. This will make the activeCount=3
Thread(target = startProcess).start()
# loop while thread List is not empty OR active threads have not finished up.
while len(processList) != 0 or threading.activeCount() != 2:
if (threading.activeCount() < (threadNum + 2) and # if count of active threads are less than the Minimum AND
len(processList) != 0): # processList is not empty
Thread(target = startProcess).start() # This line would start startThreads function as a seperate thread **
startProcess 函数作为一个单独的线程,将从进程列表中启动进程。这个函数的目的(**作为一个不同的线程开始)是它将成为进程的父线程。因此,当它将以 60 秒的超时时间加入它们时,这将停止 startProcess 线程继续前进,但这不会停止 threadController 执行。因此,这样,threadController 将按要求工作。
def startProcess():
pr = processList.pop(0)
pr.start()
pr.join(60.00) # joining the thread with time out of 60 seconds as a float.
if __name__ == '__main__':
# a file holding a list of domains
domains = open("Domains.txt", "r").read().split("\n")
output = open("test.txt", "a")
processList = [] # thread list
threadNum = 5 # number of thread initiated processes to be run at one time
# making process List
for r in range(0, len(domains), 1):
domain = domains[r].strip()
p = Process(target = crawler, args = (domain,))
processList.append(p) # making a list of performer threads.
# starting the threadController as a seperate thread.
mt = Thread(target = threadController)
mt.start()
mt.join() # won't let go next until threadController thread finishes.
output.close()
print "Done"
除了在内存中保持最少数量的线程外,我的目标是还有一些东西可以避免内存中的线程或进程卡住。我使用超时功能做到了这一点。对于任何打字错误,我深表歉意。
我希望这个建筑能帮助这个世界上的任何人。
问候,
维卡斯·高塔姆