我正在使用 concurrent.future.ThredPoolExecutor 进行多线程处理,我正在执行一些 http 服务,我希望对线程的控制在服务器关闭时暂停执行,启动服务器然后恢复执行。
服务器关闭的触发器是,我正在检查某个文件是否在特定位置可用,然后我将不得不暂停执行。
因此 concurrent.futures.Executor.shutdown() 将向执行程序发出信号,表明它应该在当前挂起的期货完成执行时释放它正在使用的任何资源。
但是当我使用执行器的shutdown()方法时,它不会立即关闭线程,而是在完成整个执行后调用shutdown()。
事实上,我正在调用 shutdown() 方法,因为我在 concurren.future 中找不到暂停和恢复。因此,作为替代方案,一旦线程完成执行,我将从列表中删除 url。这样我就可以通过剩余的列表并调用相同的方法。
这是代码:
import concurrent.futures
import urllib.request
import os.path
import datetime
import sys
import pathlib
from errno import ENOENT, EACCES, EPERM
import time
import threading
listOfFilesFromDirectory = []
webroot = settings.configuration.WEBSERVER_WEBROOT
WEBSERVER_PORT = settings.configuration.WEBSERVER_PORT
shutdown = False
def class myclass:
#populating the list with the urls from a file
def triggerMethod(path):
try:
for line in open(path):
listOfFilesFromDirectory.append(line)
except IOError as err:
if err.errno == ENOENT:
#logging.critical("document.txt file is missing")
print("document.txt file is missing")
elif err.errno in (EACCES, EPERM):
#logging.critical("You are not allowed to read document.txt")
print("You are not allowed to read document.txt")
else:
raise
# calling this method to stop the threads and restart after a sleep of 100 secs, as the list will always have the urls that were not executed.
def stopExecutor(executor):
filePath = "C:\logs\serverStopLog.txt"
while not shutdown:
time.sleep(5)
if os.path.isfile(filePath):
executor.shutdown( )
time.sleep(100)
runRegressionInMultipleThreads( )
break
def load_url(url, timeout):
conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + "/" + url, timeout = timeout)
return conn.info()
def trigegerFunc( ):
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in listOfFilesFromDirectory}
t = threading.Thread(target=stopExecutor, args=(executor))
t.start()
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
listOfFilesFromDirectory.remove(url)
else:
if data:
if "200" in data:
listOfFilesFromDirectory.remove(url)
else:
listOfFilesFromDirectory.remove(url)
else:
listOfFilesFromDirectory.remove(url)
shutdown = True
t.join()
triggerMethod("C:\inetpub\wwwroot")
trigegerFunc()