1

问题 - ProcessPoolExecutor 没有提高速度。tqdm 确认

对 python 有足够的了解,可以复制和/或编写一个有效的程序。每个文件需要大约 40 秒来加载->过滤->写入。我有大约 6,800 个文件要处理,并且想要一个更好的版本,它可以使用我所有的处理能力(6 核),我尝试编写该版本(如下)。所述版本产生,但比我原来的功能稍慢:

from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer

decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers

def load_decode_filter(file):
    '''`
     Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open(file, 'rb') as ins:
        for bufr_message in generate_bufr_message(
                decoder,ins.read()):
            input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
            mask = [obj for obj in input_list if ((PHI_MAX > obj[
                12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
            output_message.extend(mask)
        return output_message

def main(files_in):
    '''
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor(max_workers=6) as executor:
        with tqdm(range(len(files_in)), desc='files loaded',
                  position=0) as progress:
            futures = []
            for file in files_in:
                future = executor.submit(load_decode_filter(file), file)
                future.add_done_callback(lambda p: progress.update())
                futures.append(future)
            results = []
            for future in futures:
                result = future.result()
                results.append(result)
    with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
        dump(results, f_o)

if __name__ == '__main__':
    main(files)

我希望至少减少每个文件的处理时间。


更新,结束:
首先,我要感谢所有评论的人和回答者(我太新了,无法投票)。似乎有意义地提高效率的唯一方法是从不解码并从原位缓冲区数据中获取我想要的东西,这完全超出了我目前的能力(这是我第一次接触任何类型的代码)。


我计划(目前)尽可能地运行我的初始版本(f.bufr in,f.bufr_.txt out),我将在每次“运行”后将处理过的文件移动到子目录。一线希望是我在这方面学到了足够多的知识,我将能够制作一个程序来将所有文本输出合并到一个文件中。再次感谢。

4

1 回答 1

0

问:
“问题 - ProcessPoolExecutor 没有提高速度。由 tqdm 确认”

答:
不,从各方面来说

您的主要问题不是ProcessPoolExecutor()-instance 的效率,而是
您的主要问题是选择性能/效率(几乎)反模式,其中 Python,Windows O 领域中的 Python 子进程越多/S 会让您等待大约 75 小时以收集所有结果(如果处理管道确实按照您期望的那样做,我无法判断,但猜它不会......原因如下所列)

SUSPECT #1:
最好避免 75 小时产生无意义的输出:

鉴于记录的标准 Py3 concurrent.futures.Executor()-instance .submit()-method 的调用签名,您的代码不符合此规范。

代替传递对函数的引用main(),作为调用方,首先为 6800 个文件中的每一个文件执行完整的纯[SERIAL]METOP 工作包处理(这会产生一些昂贵的收集巨大的消息列表),这然后(与传递对函数/就地 lambda-operator 的引用的记录要求相反)再次以非常巨大的 RAM/CPU/TIME 开销,将 SER/sent/DES 转移到Executor-managed 池之一工作进程(我怀疑在收到列表而不是函数(计划在这样的远程进程中执行,通过传递给它的参数 - 根据调用签名指定)时,它能够做任何合理的事情。哎哟...

def main( files_in ):
    '''                                                                 __doc__
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor( max_workers = 6
                              )  as executor: #---------------------------# eXe CONTEXT-mgr
        
        with tqdm( range( len( files_in ) ),
                   desc     = 'files loaded',
                   position = 0
                   ) as progress: #---------------------------------------# pro CONTEXT-in-CONTEXT-mgr
            
            futures = []
            for file in files_in: #---------------------------------------#     LUXURY of top-level iterator, commanding  6800x times a pool of workers
                future = executor.submit( load_decode_filter( file ), #---#     ??? WHY CALC THE RESULT BEFORE passing it to the .submit() ???
                                                              file    #---#         std PARA
                                                              )
                future.add_done_callback( lambda p: progress.update() )   #     LUXURY of tdqm() for showing 75-hours of work ???
                futures.append( future ) #--------------------------------#     LUXURY of no performance gain
            
            results = []
            for future in futures:
                result = future.result()
                results.append( result ) #--------------------------------#     LUXURY of adverse performance gain
    
    with open( DIRECTORY + 'bufrout.json', 'w',
               encoding = 'utf-8'
               ) as f_o: #------------------------------------------------# f_o CONTEXT-mgr
        dump( results, f_o )

SUSPECT #2:如果性能是要实现的真正目标,
最好避免任何和所有降低性能的语法构造函数:

避免键入一种看起来很“性感”,但却付出了巨大的附加开销成本的低挂水果 SLOC-s 的任何和所有罪恶。

设计流程,以便我们可以通过延迟屏蔽来改善端到端处理时间,在可能的情况下(文件 I/O 是一个经典案例)并完全避免任何可简化的步骤(创建命名变量(有时从不used ) 是类似的 sin )。

鉴于您在 Windows O/S 中运行,您的 (tho' hidden ) 子进程实例化成本是所有其他情况中最高的 - Windows O/S 将生成 Python 解释器进程的完整自上而下副本,其中所有数据结构等,因此,如果这导致您的物理 RAM “过度拥挤”,操作系统将开始(在剩下的 75 小时内......)一场激烈的虚拟内存管理战争从 RAM 到磁盘和从磁盘到 RAM 的文件 I/O 传输(大约 10.000 倍更大的延迟) 。这将有效地破坏任何其他 CPU-from/to-RAM I/O 操作,我们可能会直接忘记任何关于提高性能的梦想。

从promises 来看,如果您的“过滤器”可以使用-templatespybufrkit进行编译,那么还有一次机会 - 获得 10% ~ 30% 的性能提升:pybufrkit

"(...) BUFR 模板编译 模板编译
的主要目的是性能。但是,由于位操作是整个处理中最耗时的部分。性能增益有些有限。取决于要处理的描述符总数对于消息,模板编译提供了 10 - 30% 的性能提升。阅读文档

原样,熵减少代码:

def load_decode_filter( file ):
    '''`
    Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open( file, 'rb' ) as ins: #----------------------------------- # ins CONTEXT-mgr
        for idx,         \
            bufr_message  \
            in             \
            enumerate( generate_bufr_message( decoder,                   #     LUXURY of enumerate for no real use
                                              ins.read() # <-------------# ins.
                                              )
                       ):
            input_list = FlatJsonRenderer().render( bufr_message )[3][2] #     LUXURY of JSON-(re)-)decorations
            mask = [ obj for obj in input_list                           #
                                 if ( (    PHI_MAX > obj[12] >    PHI_MIN )
                                    & ( LAMBDA_MAX > obj[13] > LAMBDA_MIN )
                                      )
                     ]
            output_message.extend( mask )
        return output_message

性能提示,如果既没有设法使用本pybufrkit机编译模板也没有使用本机脚本 CLI 任务pybufrkit并求助于 Win/Py3 处理流程:

  • 考虑到主要 Python 解释器进程的完整上下副本的支付成本,你的工作人员应该“知道”所有文件的列表,所以这个令人尴尬的独立文件进程将最好:

  • gc.collect(); gc.disable()在产生任何工人池之前

  • 生成与max_workers主机硬件上存在的 CPU-RAM 物理内存 I/O 通道一样少的工作进程(任务受内存限制,而不是 CPU)

  • 拆分,在-sidemain()要处理的文件列表 - 使用max_workers-many,平衡长度,不重叠的元组( from_fileIDX, to_fileIDX )

  • executor.submit()块处理函数引用,具有单个元组( from_, to_ )并将所有其余部分安排在此类块处理函数中,包括延迟屏蔽的文件 I/O 存储结果(可能稍后合并,使用 O/S 文本/二进制文件合并)

  • 更喜欢延迟屏蔽流,使用 syntax-sugar(ed) 迭代器在教科书示例中可能很好,但这里这些是(不可屏蔽的)性能杀手 - 收集大量列表[ obj for obj in ... if ... ]永远不会改善类似流(可屏蔽延迟)处理流程,没有首先收集这样一个巨大的列表,只是为了下一次(重新)迭代这样一个巨大的列表,将这些列表的项目一个接一个地文件 I/O 到磁盘文件上。在一个单一的步骤流中更好地迭代/过滤/有条件地执行文件 I/O 操作(减少 RAM,避免附加开销以及所有可屏蔽的延迟)

有关更多详细信息,您可能想阅读this和 code from this和 theredirected examples。

于 2022-03-02T16:15:55.023 回答