3

我为 Keras 编写了一个生成器,它使用 Pytables 从 HDF5 文件中获取图像(参见下面的代码)。它工作正常,当这样调用它时:

self._model.fit_generator(self.training_generator,
                          epochs=epochs,
                          validation_data=self.validation_generator,
                          verbose=1,
                          callbacks=[model_checkpoint, tensorboard_callback],
                          use_multiprocessing=True,
                          # workers=2  # uncommenting this and using more than 1 worker fails
                          )

但是,如果我使用多个工作人员(请参阅上面的注释行),我会收到如下所示的错误。我怀疑这与尝试访问 HDF5 文件的多个线程有关。但是,我认为 Pytables 和 HDF5 能够处理这个只读访问。那么我做错了什么?

奖励问题:此代码是否可以确保,在训练期间,模型在一个时期内只看到给定样本一次,如Notes所述:

序列是一种更安全的多处理方式。这种结构保证了网络在每个 epoch 的每个样本上只训练一次,而生成器则不是这样。

这是我使用多个工人时遇到的错误:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/project/path/venv/lib/python3.7/site-packages/keras/utils/data_utils.py", line 401, in get_index
    return _SHARED_SEQUENCES[uid][i]
  File "/project/path/python_package/python_package/training_generators.py", line 41, in __getitem__
    images, masks, weights = self.__data_generation(indexes)
  File "/project/path/python_package/python_package/training_generators.py", line 52, in __data_generation
    images, labels = self.__get_images(indexes)
  File "/project/path/python_package/python_package/training_generators.py", line 79, in __get_images
    labels[counter] = self.tables.root['labels'][i, ...]
  File "/project/path/venv/lib/python3.7/site-packages/tables/array.py", line 662, in __getitem__
    arr = self._read_slice(startl, stopl, stepl, shape)
  File "/project/path/venv/lib/python3.7/site-packages/tables/array.py", line 766, in _read_slice
    self._g_read_slice(startl, stopl, stepl, nparr)
  File "tables/hdf5extension.pyx", line 1585, in tables.hdf5extension.Array._g_read_slice
tables.exceptions.HDF5ExtError: HDF5 error back trace

  File "H5Dio.c", line 216, in H5Dread
    can't read data
  File "H5Dio.c", line 587, in H5D__read
    can't read data
  File "H5Dchunk.c", line 2276, in H5D__chunk_read
    error looking up chunk address
  File "H5Dchunk.c", line 3022, in H5D__chunk_lookup
    can't query chunk address
  File "H5Dbtree.c", line 1047, in H5D__btree_idx_get_addr
    can't get chunk info
  File "H5B.c", line 341, in H5B_find
    unable to load B-tree node
  File "H5AC.c", line 1763, in H5AC_protect
    H5C_protect() failed
  File "H5C.c", line 2565, in H5C_protect
    can't load entry
  File "H5C.c", line 6890, in H5C_load_entry
    Can't deserialize image
  File "H5Bcache.c", line 181, in H5B__cache_deserialize
    wrong B-tree signature

End of HDF5 error back trace

Problems reading the array data.
"""

这是我的生成器的代码:

class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'

    def __init__(self, pytables_file_path=None, batch_size=32, shuffle=True, image_processor: ImageProcessor = None,
                 augment_params=None, image_type=None):
        'Initialization'
        self.batch_size = batch_size
        self.image_type = image_type
        self.pytable_file_path = pytables_file_path
        self.tables = tables.open_file(self.pytable_file_path, 'r')
        self.number_of_samples = self.tables.root[self.image_type].shape[0]
        self.image_size = self.tables.root[self.image_type].shape[1:]
        self.indexes = list(range(self.number_of_samples))
        self.shuffle = shuffle
        self.image_processor = image_processor
        self.on_epoch_end()
        self.augment_params = augment_params

    def __del__(self):
        self.tables.close()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(self.number_of_samples / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        # Generate data
        images, masks, weights = self.__data_generation(indexes)
        mask_wei_arr = np.concatenate((masks, weights[:, :, :, np.newaxis]), axis=-1)
        return (images, mask_wei_arr)

    def on_epoch_end(self):
        """Run after each epoch."""
        if self.shuffle:
            np.random.shuffle(self.indexes)  # Shuffle indexes after each epoch

    def __data_generation(self, indexes):
        'Generates data containing batch_size samples'  # X : (n_samples, *dim, n_channels)
        images, labels = self.__get_images(indexes)

        if self.image_processor:
            images = self.__process_images(images)

        masks, weights = self.generate_masks_and_weights_from_labels(labels)

        if self.augment_params:
            [images, masks, weights] = self.augment_data(images, masks, weights)

        images = images.astype('float32')
        masks_new = masks.astype('float32')
        weights_new = weights.astype('float32')
        weights_new = weights_new[:, :, :, 0]
        return images, masks_new, weights_new

    def __process_images(self, images):
        for ind, image in enumerate(images):
            images[ind, ...] = self.image_processor.process(image)
        return images

    def __get_images(self, indexes):
        images = np.empty((self.batch_size, *self.image_size))
        labels = np.empty((self.batch_size, *self.image_size))
        for counter, i in enumerate(indexes):
            current_image = self.tables.root[self.image_type][i, ...]
            images[counter] = current_image
            labels[counter] = self.tables.root['labels'][i, ...]
        return images, labels

    def generate_masks_and_weights_from_labels(self, labels):
        pass
        max_lbl_val = int(np.max(labels))
        edges = np.zeros_like(labels).astype(bool)
        masks = np.asarray(labels > 0).astype(float)
        weights = np.ones_like(labels)
        se_size = 3  # use '3': to get 1 pixel dilation; use '5': to get 2 pixel dilation
        structure = np.ones((1, se_size, se_size, 1))
        for lbl_ind in range(1, max_lbl_val+1):  # iterate over labels
            label_mask = labels == lbl_ind
            label_dilated_edges = scipy.ndimage.morphology.binary_dilation(label_mask, structure) & ~label_mask
            label_eroded_edges = ~scipy.ndimage.morphology.binary_erosion(label_mask, structure) & label_mask
            label_edges = np.bitwise_or(label_eroded_edges, label_dilated_edges)
            edges = np.bitwise_or(edges, label_edges)
        weights[edges] *= 10  # weight the edges more by factor 10
        return masks, weights

    def augment_data(self, images, masks, weights):
        # for index, _ in enumerate(images):
        #     [images[index, :, :, 0], masks[index, :, :, 0], weights[index, :, :, 0]] = data_augmentation(
        #         [images[index, :, :, 0], masks[index, :, :, 0], weights[index, :, :, 0]], self.augment_params,
        #         order=[1, 0, 0])

        for index, image in enumerate(images):
            image = images[index, ...]
            mask = masks[index, ...]
            weight = weights[index, ...]
            [image, mask, weight] = data_augmentation([image, mask, weight], self.augment_params, order=[1, 0, 0])
            # fix, ax = plt.subplots(1, 3, figsize=(5, 15))
            # ax[0].imshow(image[:, :, 0])
            # ax[1].imshow(mask[:, :, 0])
            # ax[2].imshow(weight[:, :, 0])
            # plt.show()

            images[index, ...] = image
            masks[index, ...] = mask
            weights[index, ...] = weight

        return images, masks, weights
4

0 回答 0