我为 Keras 编写了一个生成器,它使用 Pytables 从 HDF5 文件中获取图像(参见下面的代码)。它工作正常,当这样调用它时:
self._model.fit_generator(self.training_generator,
epochs=epochs,
validation_data=self.validation_generator,
verbose=1,
callbacks=[model_checkpoint, tensorboard_callback],
use_multiprocessing=True,
# workers=2 # uncommenting this and using more than 1 worker fails
)
但是,如果我使用多个工作人员(请参阅上面的注释行),我会收到如下所示的错误。我怀疑这与尝试访问 HDF5 文件的多个线程有关。但是,我认为 Pytables 和 HDF5 能够处理这个只读访问。那么我做错了什么?
奖励问题:此代码是否可以确保,在训练期间,模型在一个时期内只看到给定样本一次,如Notes下所述:
序列是一种更安全的多处理方式。这种结构保证了网络在每个 epoch 的每个样本上只训练一次,而生成器则不是这样。
这是我使用多个工人时遇到的错误:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/project/path/venv/lib/python3.7/site-packages/keras/utils/data_utils.py", line 401, in get_index
return _SHARED_SEQUENCES[uid][i]
File "/project/path/python_package/python_package/training_generators.py", line 41, in __getitem__
images, masks, weights = self.__data_generation(indexes)
File "/project/path/python_package/python_package/training_generators.py", line 52, in __data_generation
images, labels = self.__get_images(indexes)
File "/project/path/python_package/python_package/training_generators.py", line 79, in __get_images
labels[counter] = self.tables.root['labels'][i, ...]
File "/project/path/venv/lib/python3.7/site-packages/tables/array.py", line 662, in __getitem__
arr = self._read_slice(startl, stopl, stepl, shape)
File "/project/path/venv/lib/python3.7/site-packages/tables/array.py", line 766, in _read_slice
self._g_read_slice(startl, stopl, stepl, nparr)
File "tables/hdf5extension.pyx", line 1585, in tables.hdf5extension.Array._g_read_slice
tables.exceptions.HDF5ExtError: HDF5 error back trace
File "H5Dio.c", line 216, in H5Dread
can't read data
File "H5Dio.c", line 587, in H5D__read
can't read data
File "H5Dchunk.c", line 2276, in H5D__chunk_read
error looking up chunk address
File "H5Dchunk.c", line 3022, in H5D__chunk_lookup
can't query chunk address
File "H5Dbtree.c", line 1047, in H5D__btree_idx_get_addr
can't get chunk info
File "H5B.c", line 341, in H5B_find
unable to load B-tree node
File "H5AC.c", line 1763, in H5AC_protect
H5C_protect() failed
File "H5C.c", line 2565, in H5C_protect
can't load entry
File "H5C.c", line 6890, in H5C_load_entry
Can't deserialize image
File "H5Bcache.c", line 181, in H5B__cache_deserialize
wrong B-tree signature
End of HDF5 error back trace
Problems reading the array data.
"""
这是我的生成器的代码:
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, pytables_file_path=None, batch_size=32, shuffle=True, image_processor: ImageProcessor = None,
augment_params=None, image_type=None):
'Initialization'
self.batch_size = batch_size
self.image_type = image_type
self.pytable_file_path = pytables_file_path
self.tables = tables.open_file(self.pytable_file_path, 'r')
self.number_of_samples = self.tables.root[self.image_type].shape[0]
self.image_size = self.tables.root[self.image_type].shape[1:]
self.indexes = list(range(self.number_of_samples))
self.shuffle = shuffle
self.image_processor = image_processor
self.on_epoch_end()
self.augment_params = augment_params
def __del__(self):
self.tables.close()
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(self.number_of_samples / self.batch_size))
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
# Generate data
images, masks, weights = self.__data_generation(indexes)
mask_wei_arr = np.concatenate((masks, weights[:, :, :, np.newaxis]), axis=-1)
return (images, mask_wei_arr)
def on_epoch_end(self):
"""Run after each epoch."""
if self.shuffle:
np.random.shuffle(self.indexes) # Shuffle indexes after each epoch
def __data_generation(self, indexes):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
images, labels = self.__get_images(indexes)
if self.image_processor:
images = self.__process_images(images)
masks, weights = self.generate_masks_and_weights_from_labels(labels)
if self.augment_params:
[images, masks, weights] = self.augment_data(images, masks, weights)
images = images.astype('float32')
masks_new = masks.astype('float32')
weights_new = weights.astype('float32')
weights_new = weights_new[:, :, :, 0]
return images, masks_new, weights_new
def __process_images(self, images):
for ind, image in enumerate(images):
images[ind, ...] = self.image_processor.process(image)
return images
def __get_images(self, indexes):
images = np.empty((self.batch_size, *self.image_size))
labels = np.empty((self.batch_size, *self.image_size))
for counter, i in enumerate(indexes):
current_image = self.tables.root[self.image_type][i, ...]
images[counter] = current_image
labels[counter] = self.tables.root['labels'][i, ...]
return images, labels
def generate_masks_and_weights_from_labels(self, labels):
pass
max_lbl_val = int(np.max(labels))
edges = np.zeros_like(labels).astype(bool)
masks = np.asarray(labels > 0).astype(float)
weights = np.ones_like(labels)
se_size = 3 # use '3': to get 1 pixel dilation; use '5': to get 2 pixel dilation
structure = np.ones((1, se_size, se_size, 1))
for lbl_ind in range(1, max_lbl_val+1): # iterate over labels
label_mask = labels == lbl_ind
label_dilated_edges = scipy.ndimage.morphology.binary_dilation(label_mask, structure) & ~label_mask
label_eroded_edges = ~scipy.ndimage.morphology.binary_erosion(label_mask, structure) & label_mask
label_edges = np.bitwise_or(label_eroded_edges, label_dilated_edges)
edges = np.bitwise_or(edges, label_edges)
weights[edges] *= 10 # weight the edges more by factor 10
return masks, weights
def augment_data(self, images, masks, weights):
# for index, _ in enumerate(images):
# [images[index, :, :, 0], masks[index, :, :, 0], weights[index, :, :, 0]] = data_augmentation(
# [images[index, :, :, 0], masks[index, :, :, 0], weights[index, :, :, 0]], self.augment_params,
# order=[1, 0, 0])
for index, image in enumerate(images):
image = images[index, ...]
mask = masks[index, ...]
weight = weights[index, ...]
[image, mask, weight] = data_augmentation([image, mask, weight], self.augment_params, order=[1, 0, 0])
# fix, ax = plt.subplots(1, 3, figsize=(5, 15))
# ax[0].imshow(image[:, :, 0])
# ax[1].imshow(mask[:, :, 0])
# ax[2].imshow(weight[:, :, 0])
# plt.show()
images[index, ...] = image
masks[index, ...] = mask
weights[index, ...] = weight
return images, masks, weights