0

我有一些浮点数要存储在一个大(500K x 500K)矩阵中。我通过使用可变大小的数组(根据某些特定条件)将它们存储在块中。

我有一个并行代码(Python3.3 和 h5py),它生成数组并将它们放入共享队列中,还有一个从队列中弹出并将它们一一写入 HDF5 矩阵的专用进程。它在大约 90% 的时间内按预期工作。

有时,我会遇到特定数组的写入错误。如果我多次运行它,错误的数组就会一直变化。

这是代码:

def writer(in_q):
    # Open HDF5 archive
    hdf5_file = h5py.File("./google_matrix_test.hdf5")
    hdf5_scores = hdf5_file['scores']
    while True:
        # Get some data
        try:
            data = in_q.get(timeout=5)
        except:
            hdf5_file.flush()
            print('HDF5 archive updated.')
            break
        # Process the data
        try:
            hdf5_scores[data[0], data[1]:data[2]+1] = numpy.matrix(data[3:])
        except:
            # Print faulty chunk's info
            print('E: ' + str(data[0:3]))
            in_q.put(data)  # <- doesn't solve
        in_q.task_done()

def compute():
    jobs_queue = JoinableQueue()
    scores_queue = JoinableQueue()

    processes = []
    processes.append(Process(target=producer, args=(jobs_queue, data,)))
    processes.append(Process(target=writer, args=(scores_queue,)))
    for i in range(10):
        processes.append(Process(target=consumer, args=(jobs_queue,scores_queue,)))

    for p in processes:
        p.start()

    processes[1].join()
    scores_queue.join()

这是错误:

Process Process-2:
Traceback (most recent call last):
    File "/local/software/python3.3/lib/python3.3/multiprocessing/process.py", line 258, in _bootstrap
        self.run()
    File "/local/software/python3.3/lib/python3.3/multiprocessing/process.py", line 95, in run
        self._target(*self._args, **self._kwargs)
    File "./compute_scores_multiprocess.py", line 104, in writer
        hdf5_scores[data[0], data[1]:data[2]+1] = numpy.matrix(data[3:])
    File "/local/software/python3.3/lib/python3.3/site-packages/h5py/_hl/dataset.py", line 551, in __setitem__
        self.id.write(mspace, fspace, val, mtype)
    File "h5d.pyx", line 217, in h5py.h5d.DatasetID.write (h5py/h5d.c:2925)
    File "_proxy.pyx", line 120, in h5py._proxy.dset_rw (h5py/_proxy.c:1491)
    File "_proxy.pyx", line 93, in h5py._proxy.H5PY_H5Dwrite (h5py/_proxy.c:1301)
OSError: can't write data (Dataset: Write failed)

如果我在写入任务中插入两秒的暂停(time.sleep(2)),那么问题似乎就解决了(尽管我每次写入不能浪费 2 秒,因为我需要写入超过 250.000 次)。如果我捕获写入异常并将错误数组放入队列中,则脚本将永远不会停止(大概)。

我正在使用 CentOS (2.6.32-279.11.1.el6.x86_64)。有什么见解吗?

非常感谢。

4

1 回答 1

1

When using the multiprocessing module with HDF5, the only big restriction is that you can't have any files open (even read-only) when fork() is called. In other words, if you open a file in the master process to write, and then Python spins off a subprocess for computation, there may be problems. It has to do with how fork() works and the choices HDF5 itself makes about how to handle file descriptors.

My advice is to double-check your application to make sure you're creating any Pools, etc. before opening the master file for writing.

于 2013-12-12T23:38:29.080 回答