1

我在 python 中构建了一个回测器,它在 70 毫秒内完成了完整的运行。在单个线程上(使用 for 循环)我可以运行这个回测器,它显示出正常的性能(每次迭代大约 70 毫秒):

for q in queue:
    print(backtest(alldata1h, alldata, q['strats'], q['filts']))

我的问题如下:每当我尝试使用多处理运行此功能时,性能要差得多(每次回测约 800 毫秒)。

我尝试过使用ProcessQueue对象的数组来做到这一点:

def do_workload(q, wk, a1h, ad):
    for w in wk:
        c = backtest(a1h, ad, w['strats'], w['filts'])
        q.put({"Strategy": w['sname'], "Filter": w['fname'], "c": c})
    q.put('DONE')

#Please ignore unnecessary indentation

    for i in range(thread_nr):
        thread_pool.append({"Process": "", "Queue": "", "workload": workloads[i], "workindex": 0, "finished": False})
        thread_pool[i]['Queue'] = Queue()
        thread_pool[i]['Process'] = Process(target=do_workload, args=(thread_pool[i]['Queue'], workloads[i], alldata1h, alldata))
        thread_pool[i]['Process'].start()
    print("Total workload: {} backtests".format(len(queue)))

    while queue_index < len(queue):
        for t in range(len(thread_pool)):
            time.sleep(0.1)
            if thread_pool[t]['finished'] == False:
                while not thread_pool[t]['Queue'].empty():
                    res = thread_pool[t]['Queue'].get()
                    if res == "DONE":
                        thread_pool[t]['finished'] = True
                    else:
                        final_results = final_results.append(res, ignore_index=True)
                        queue_index += 1
        print("Read from threads: {}/{}".format(queue_index, len(queue)))
        time.sleep(10)
    print("DONE")

我也尝试过使用一个Pool对象:

print("Total workload: {} backtests".format(len(queue)))
from functools import partial
target = partial(do_workload, a1h=alldata1h, ad=alldata)
pool = Pool(processes=thread_nr)
print("Starting pool...")
print(len(pool.map(target, workloads, len(workloads[0]))))

我的处理器有 64 个内核和 128 个线程,所以我给它很高thread_nr(大约 100-120),但性能仍然很糟糕。

我的问题如下:有没有办法足够改进 python 多处理以实现每个回测(每个进程)70 毫秒?或者我应该用 C++ 重写整个项目(回测器和进程管理器)以实现最佳性能(使用所有可能的线程/整个 CPU)。

4

0 回答 0