2

我的工作是处理大量的 xml;为了获得更快的结果,我想使用 ipython 的并行处理;下面是我的示例代码。因为我只是在使用celementTree模块查找 xml/xsd 的元素数量。

>>> from IPython.parallel import Client
>>> import os
>>> c = Client()
>>> c.ids
>>> lview = c.load_balanced_view()
>>> lview.block =True
>>> def return_len(xml_filepath):
        import xml.etree.cElementTree as cElementTree
        tree = cElementTree.parse(xml_filepath)
        my_count=0
        file_result=[]
        cdict={}
        for elem in tree.getiterator():
            cdict[my_count]={}
            if elem.tag:
                cdict[my_count]['tag']=elem.tag
            if elem.text:
                cdict[my_count]['text']=(elem.text).strip()
            if elem.attrib.items():
                cdict[my_count]['xmlattb']={}
                for key, value in elem.attrib.items():
                    cdict[my_count]['xmlattb'][key]=value
            if list(elem):
                cdict[my_count]['xmlinfo']=len(list(elem))
            if elem.tail:
                cdict[my_count]['tail']=elem.tail.strip()
            my_count+=1
        output=xml_filepath.split('\\')[-1],len(cdict)
        return output
        ## return cdict
>>> def get_dir_list(target_dir, *extensions):
        """
        This function will filter out the files from given dir based on their extensions
        """
        my_paths=[]
        for top, dirs, files in os.walk(target_dir):
            for nm in files:
                fileStats = os.stat(os.path.join(top, nm))
                if nm.split('.')[-1] in extensions:
                    my_paths.append(top+'\\'+nm)
        return my_paths
>>> r=lview.map_async(return_len,get_dir_list('C:\\test_folder','xsd','xml'))

为了获得最终结果,我必须这样做 >>> r.get(),当过程完成时我会得到结果

我的问题是我是否能够在它们完成时获得中间结果;
例如,如果我将我的工作应用到包含 1000 个 xmls/xsds 文件的文件夹,那么在处理完特定文件后,我可以立即获得结果。不喜欢1st file is done--> show its result... 2nd file is done---> show its result........ 1000th file is done--> show its result上面的当前工作;wait till final file get finished然后它将显示所有这 1000 个文件的完整结果。
还要处理我在函数import内部定义的导入/命名空间错误return_len;有没有更好的方法来解决这个问题?

4

1 回答 1

4

当然。AsyncMapResult(map_async 返回的类型)是立即可迭代的,迭代产生的项目与最终产生的列表相同r.get()。所以在你这样做之后:

amr = lview.map_async(return_len, get_dir_list('C:\\test_folder','xsd','xml'))

你可以做:

for r in amr:
    print r

或使用 enumerate 保留索引

for i,r in enumerate(amr):
    print i, r 

reduce或使用内置执行缩减:

summary_result = reduce(myfunc, amr)

所有这些都将在您到达时遍历您的结果。如果您不关心顺序并且每个任务的时间差异很大,您可以通过map_async(...,ordered=False). 如果您这样做,当您遍历 AMR 时,您将在先到先得的基础上获得单独的结果,而不是保留提交顺序。

ipython 文档中有更多信息。

为了处理导入/命名空间错误,我在 return_len 函数中定义了导入;有没有更好的方法来解决这个问题?

是和不是。有几种方法可以设置引擎命名空间,例如使用模块、@parallel.require("module")装饰器或简单地使用 显式执行导入%px import xml.etree.cElementTree as cElementTree,每种方法在某些情况下都有好处。但我经常发现将导入放在函数中是最简单的方法,而且意外最少。

于 2012-11-28T02:52:52.937 回答