2

我想使用 joblib 并行化将 numpy 数组转换为 bcolz carray,但是在写入 carray 时,发生了死锁。

简化代码在这里。

import cv2
import pandas as pd
import numpy as np
from tqdm import tqdm
import bcolz
from tqdm import tqdm
from pathlib import Path
from joblib import Parallel, delayed

class BcolzConverter():
    """
    Save the output from a iterator without loading all images into memory.
    Does not return anything, instead writes data to disk.
    :it: iterator returns image and label
    :data_dir: The folder name to store the bcolz array representing the features in.
    :labels_dir: The folder name to store the bcolz array representing the labels in.
    """    
    def __init__(self, it, data_dir, labels_dir):
        self.it = it
        self.data_dir = data_dir
        self.labels_dir = labels_dir

    def convert(self):
        for directory in [self.data_dir, self.labels_dir]:
            if not Path(directory).exists():
                Path(directory).mkdir(parents=True)

        d, l = next(self.it)
        data = bcolz.carray(d, rootdir=self.data_dir, mode=self.mode)
        labels = bcolz.carray(l, rootdir=self.labels_dir, mode=self.mode)

        Parallel(n_jobs=-1)(delayed(self._process)(
            element, data, labels) for element in it)

        data.flush()
        labels.flush()

    @staticmethod
    def _process(element, data, labels):
        d, l = element
        data.append(d)
        labels.append(l)

converter = BcolzConverter(it, '../input/bcolz/data', '../input/bcolz/label', num_workers=4)
converter.convert()

除非用户暂停,否则上述程序永远不会停止。输出在这里。由于 waiter.aquire() 导致的程序堆栈

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-117-92dbce12edff> in <module>()
      3 it = DataIterator(df, label_onehot=True)
      4 converter = BcolzConverter(it, '../input/bcolz/data', '../input/bcolz/label', num_workers=4)
----> 5 converter.convert()

<ipython-input-115-fba84effdabc> in convert(self)
     66 
     67         Parallel(n_jobs=self.num_workers, verbose=10000)(delayed(self._process)(
---> 68             element, data, labels) for element in it)
     69 
     70         data.flush()

~/.pyenv/versions/anaconda3-5.0.1/envs/py36/lib/python3.6/site-packages/joblib/parallel.py in __call__(self, iterable)
    787                 # consumption.
    788                 self._iterating = False
--> 789             self.retrieve()
    790             # Make sure that we get a last message telling us we are done
    791             elapsed_time = time.time() - self._start_time

~/.pyenv/versions/anaconda3-5.0.1/envs/py36/lib/python3.6/site-packages/joblib/parallel.py in retrieve(self)
    697             try:
    698                 if getattr(self._backend, 'supports_timeout', False):
--> 699                     self._output.extend(job.get(timeout=self.timeout))
    700                 else:
    701                     self._output.extend(job.get())

~/.pyenv/versions/anaconda3-5.0.1/envs/py36/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    636 
    637     def get(self, timeout=None):
--> 638         self.wait(timeout)
    639         if not self.ready():
    640             raise TimeoutError

~/.pyenv/versions/anaconda3-5.0.1/envs/py36/lib/python3.6/multiprocessing/pool.py in wait(self, timeout)
    633 
    634     def wait(self, timeout=None):
--> 635         self._event.wait(timeout)
    636 
    637     def get(self, timeout=None):

~/.pyenv/versions/anaconda3-5.0.1/envs/py36/lib/python3.6/threading.py in wait(self, timeout)
    549             signaled = self._flag
    550             if not signaled:
--> 551                 signaled = self._cond.wait(timeout)
    552             return signaled
    553 

~/.pyenv/versions/anaconda3-5.0.1/envs/py36/lib/python3.6/threading.py in wait(self, timeout)
    293         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    294             if timeout is None:
--> 295                 waiter.acquire()
    296                 gotit = True
    297             else:

KeyboardInterrupt: 

这似乎是由于多个进程试图写入 carray 对象。如何在没有任何死锁的情况下实现这一点?

4

0 回答 0