如果我有大约 10+ 百万个小任务要在 python 中处理(转换图像左右),我如何创建队列并在处理崩溃的情况下保存进度。需要明确的是,我怎样才能保存进度或停止任何我想要的进程并从最后一点继续处理。
在这种情况下如何处理多个线程?
一般的问题是如何将处理数据的进度保存到文件中。如果它有大量非常小的文件,则每次迭代后保存文件将比处理本身更长...
谢谢!
(对不起,如果我的英语不清楚)
如果我有大约 10+ 百万个小任务要在 python 中处理(转换图像左右),我如何创建队列并在处理崩溃的情况下保存进度。需要明确的是,我怎样才能保存进度或停止任何我想要的进程并从最后一点继续处理。
在这种情况下如何处理多个线程?
一般的问题是如何将处理数据的进度保存到文件中。如果它有大量非常小的文件,则每次迭代后保存文件将比处理本身更长...
谢谢!
(对不起,如果我的英语不清楚)
首先,我建议不要使用多线程。请改用多处理。当涉及到计算密集型任务时,由于 GIL,多个线程在 python 中不能同步工作。
要解决保存结果的问题,请使用以下顺序
您可以为每个进程设置 monitor_1.txt、monitor_2.txt ...,这样您就不必为每个进程读取整个文件。
以下要点可能会对您有所帮助。您只需要为第 4 点添加代码。 https://gist.github.com/rishibarve/ccab04b9d53c0106c6c3f690089d0229
保存文件等 I/O 操作总是相对较慢。如果您必须处理大量文件,那么无论您使用多少线程,您都会遇到很长的 I/O 时间。
最简单的是使用多线程而不是多处理,让操作系统的调度程序解决所有问题。文档对如何设置线程有很好的解释。一个简单的例子是
from threading import Thread
def process_data(file_name):
# does the processing
print(f'processed {file_name}')
if __name__ == '__main__':
file_names = ['file_1', 'file_2']
processes = [Thread(target=process_data, args=(file_name,)) for file_name in file_names]
# here you start all the processes
for proc in processes:
proc.start()
# here you wait for all processes to finish
for proc in processes:
proc.join()
一种可能更快的解决方案是创建一个单独的进程来执行 I/O。然后您使用 amultiprocessing.Queue将来自“数据处理线程”的文件排队,并让 I/O 线程拾取这些文件并一个接一个地处理它们。
这样 I/O 就不必休息了,这将接近最佳状态。我不知道这是否会比基于线程的解决方案产生很大的优势,但与并发的情况一样,最好的找出方法是用你自己的应用程序做一些基准测试。
需要注意的一个问题是,如果数据处理速度更快,那么它Queue会变得非常大。这可能会对性能产生影响,具体取决于您的系统。如果队列变大,一个快速的解决方法是暂停数据处理。
请记住在脚本中使用 Python 编写所有多处理代码
if __name__ == '__main__':
# mp code
请注意,并注意某些 IDE 不能很好地处理并发 Python 代码。安全的选择是通过从终端执行代码来测试您的代码。