我在 python 中构建了一个回测器,它在 70 毫秒内完成了完整的运行。在单个线程上(使用 for 循环)我可以运行这个回测器,它显示出正常的性能(每次迭代大约 70 毫秒):
for q in queue:
print(backtest(alldata1h, alldata, q['strats'], q['filts']))
我的问题如下:每当我尝试使用多处理运行此功能时,性能要差得多(每次回测约 800 毫秒)。
我尝试过使用Process
和Queue
对象的数组来做到这一点:
def do_workload(q, wk, a1h, ad):
for w in wk:
c = backtest(a1h, ad, w['strats'], w['filts'])
q.put({"Strategy": w['sname'], "Filter": w['fname'], "c": c})
q.put('DONE')
#Please ignore unnecessary indentation
for i in range(thread_nr):
thread_pool.append({"Process": "", "Queue": "", "workload": workloads[i], "workindex": 0, "finished": False})
thread_pool[i]['Queue'] = Queue()
thread_pool[i]['Process'] = Process(target=do_workload, args=(thread_pool[i]['Queue'], workloads[i], alldata1h, alldata))
thread_pool[i]['Process'].start()
print("Total workload: {} backtests".format(len(queue)))
while queue_index < len(queue):
for t in range(len(thread_pool)):
time.sleep(0.1)
if thread_pool[t]['finished'] == False:
while not thread_pool[t]['Queue'].empty():
res = thread_pool[t]['Queue'].get()
if res == "DONE":
thread_pool[t]['finished'] = True
else:
final_results = final_results.append(res, ignore_index=True)
queue_index += 1
print("Read from threads: {}/{}".format(queue_index, len(queue)))
time.sleep(10)
print("DONE")
我也尝试过使用一个Pool
对象:
print("Total workload: {} backtests".format(len(queue)))
from functools import partial
target = partial(do_workload, a1h=alldata1h, ad=alldata)
pool = Pool(processes=thread_nr)
print("Starting pool...")
print(len(pool.map(target, workloads, len(workloads[0]))))
我的处理器有 64 个内核和 128 个线程,所以我给它很高thread_nr
(大约 100-120),但性能仍然很糟糕。
我的问题如下:有没有办法足够改进 python 多处理以实现每个回测(每个进程)70 毫秒?或者我应该用 C++ 重写整个项目(回测器和进程管理器)以实现最佳性能(使用所有可能的线程/整个 CPU)。