我需要以非常并行的方式运行 HyperOpt(比如 200 多个工作人员)。工人不到 200 人,一切正常。但是当更多的工人开始工作时,我无法再从目标函数中获得任何程序的输出(我需要)。
我已经尝试过 subprocess.popen() 并且它确实工作得很好,除非有超过 200 名工人。所以我从它切换到 os.system(program + " > " stderr + " 2> " stdout) 然后读取 stderrlog 和 stdoutlog 文件,除非有超过 200 名工人,否则它也可以正常工作。
stderrlog 文件工作得很好。但是 stdoutlog 大部分时间都是空的,或者只得到一些随机行。这些文件存在,但它们只是不完整的。程序似乎没有退出,并且日志中没有任何迹象表明出现任何问题,除了 HyperOpt 没有找到结果。同样,如果我只将 'echo "hello world"' 作为代码(非常短的输出)执行,它就可以工作。
我在 HPC 集群的 SLURM 环境中运行它,但也没有 slurm 错误。
助手.py:
import os
def run_program(program, logfiles):
stdout = logfiles["stdout"]
stderr = logfiles["stderr"]
code = program + " > " + stdout + " 2> " + stderr
retcode = os.system(code)
retcode = retcode >> 8
out, err = '', ''
if os.path.exists(stdout):
out = Path(stdout).read_text()
if os.path.exists(stderr):
err = Path(stderr).read_text()
array = {
"stdout": out,
"stderr": err,
"retcode": retcode
}
return array
目标函数_mongodb.py:
import helper
import uuid
def objective_function_mongodb(parameter):
specific_log_file = uuid.uuid4()
log_files = {
"stderr": specific_log_file + ".stderr",
"stdout": specific_log_file + ".stdout"
}
program = "programs/" + parameter[0] + "/run.sh"
res = helper.run_program(parameter_code, log_files)
re_search = 'RESULT: ([+-]?\d+(?:\.\d+)?)\\n'
res = m.group(1)
return res
import objective_function_mongodb
servername = ...
serverport = ...
projectname = test
space = hp.choice('a', [hp.randint("axis1", 10000)])
mongourl = 'mongo://' + str(serverip) + ":" + str(serverport) + '/' + projectname + '/jobs'
trials = MongoTrials(mongourl, exp_key=projectname)
best = fmin(
'fn': objective_function_mongodb.objective_function_mongodb,
'trials': trials,
'space': [projectname, space],
'algo': hp.randint,
'max_evals': 100000,
'catch_eval_exceptions': True
)
programs/test/run.sh 产生了几千行代码(它本身又调用了一个 python 脚本),我需要进一步分析这些代码,所以只是在它的输出中搜索 RESULTS 并不能解决问题。
我还尝试将此作为 run_program 的代码(带有额外的日志文件),根据我无法再找到的网站,它声称这将解决 Popen 的缓冲区大小大于 64K 的问题:
def print_to_log(string, logfile):
folder = os.path.dirname(logfile)
pathlib.Path(folder).mkdir(parents=True, exist_ok=True)
append_write = 'a'
if not os.path.exists(logfile):
append_write = 'a'
else:
append_write = 'w'
logfilehandler = open(logfile, append_write)
print(string, file=logfilehandler)
logfilehandler.close()
import subprocess
def objective_function_mongodb(parameter):
programconverted = []
print_to_log("Before Conversion", logfile)
for i in program.split():
programconverted.append(str(i))
print_to_log("In Conversion", logfile)
print_to_log("Definition of MAX_OUTPUT_SIZE", logfile)
MAX_OUTPUT_SIZE = 2 ** 64
print_to_log("Starting Timer", logfile)
start = time.time()
print_to_log("Intializing Sockets", logfile)
stdout = socket.socketpair()
stderr = socket.socketpair()
# nonblocking and timeout is not the same, timeout is easier to handle via socket.timeout exception
print_to_log("Setting Timeouts", logfile)
stdout[0].settimeout(0.01)
stderr[0].settimeout(0.01)
print_to_log("Begin popen", logfile)
p = subprocess.Popen(programconverted, stdout=stdout[1], stderr=stderr[1], close_fds=True)
print_to_log("Create empty out und err", logfile)
out, err = "", ""
print_to_log("Initizializing returncode", logfile)
returncode = None
print_to_log("Begin Loop", logfile)
loopnr = 0
while True:
print_to_log("p.poll()", logfile)
p.poll()
print_to_log("Get stdout/stderr", logfile)
try:
outtmp = stdout[0].recv(4096).decode('utf-8')
except socket.timeout as exc:
outtmp = ""
try:
errtmp = stderr[0].recv(4096).decode('utf-8')
except socket.timeout as exc:
errtmp = ""
print_to_log("Adding (out/err)tmp onto (out/err), loopnr: " + str(loopnr), logfile)
out += str(outtmp)
err += str(errtmp)
if len(out) > MAX_OUTPUT_SIZE or \
len(err) > MAX_OUTPUT_SIZE:
print_to_log("Killing process because it's output is bigger than MAX_OUTPUT_SIZE", logfile)
p.kill()
p.wait()
out = out[:MAX_OUTPUT_SIZE]
err = err[:MAX_OUTPUT_SIZE]
out += "Output was truncated to " + str(MAX_OUTPUT_SIZE)
if p.returncode != None:
print_to_log("Returncode: " + str(p.returncode), logfile)
returncode = p.returncode
break
time.sleep(0.1)
loopnr = loopnr + 1
end = time.time()
# we now have: returncode, out, err, start, end
stderr[0].close()
stdout[1].close()
out = str(out)
err = str(err)
array = {
"stdout": out,
"stderr": err,
"retcode": returncode
}
return array
在调试日志文件(由 print_to_log 打印)中,只有一行随机行“Adding (out/err)tmp to (out/err), loopnr: " + str(loopnr)" 出现随机行号,但没有其他(之前什么都没有,之后什么都没有。根据日志,该程序永远不会被杀死。
stderr 文件可以工作,但如果有超过 200 个并行工作人员,则 stdout 文件大多为空或仅显示输出中的随机行。
我把标准输出写得很好,特别是在使用 > 和 2> 时。
ulimit -a 给我看
ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 256025
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) 307200
open files (-n) 1048576
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) unlimited
cpu time (seconds, -t) unlimited
max user processes (-u) 4096
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited