1

我正在使用Python 3.5、低镜头MicroSoft Celeb1M数据集、Tensorflow 1.4,并且我想在图像分类任务上使用新的 Dataset API。

我需要构建一个具有这种形式的数据集(一):它包含(N*k + 1)图像、N不同类k的数量以及每个类的样本数。目标是在类别中对正确类别中的最后一张图像进行分类N,每个类别都由k样本表示。

为此,我在硬盘上有 16 000 条 tfrecord,每个大约 20 MB。每个 TFRecord 包含一个类的图像,大约 50-100 个图像。

我想随机选择N文件,然后随机选择k每个文件中的图像,混合它们,然后选择最终图像在N类中进行分类,与样本不同。为此,我在“本机”Python 代码和 Tensorflow Dataset API 方法之间进行了混合。

问题是我写的解决方案需要很长时间才能完成。这是我为创建这样的数据集而编写的工作代码。对于这个例子,我只从硬盘驱动器中取出 20 个文件。

import tensorflow as tf
import os
import time
import numpy.random as rng

#Creating a few variables
data_dir = '/fastdata/Celeb1M/'
test_data = [data_dir + 'test/'+ elt for elt in os.listdir(data_dir + '/test/')]

# Function to decode TFRecords
def read_and_decode(example_proto):
    features = tf.parse_single_example(
            example_proto,
            features = {
                'image': tf.FixedLenFeature([], tf.string),
                'label': tf.FixedLenFeature([], tf.int64),
                'height': tf.FixedLenFeature([], tf.int64),
                'width': tf.FixedLenFeature([], tf.int64),
                'channels': tf.FixedLenFeature([], tf.int64)
            })

    image = tf.decode_raw(features['image'], tf.uint8)
    image = tf.cast(image, tf.float32) * (1. / 255)
    height = tf.cast(features['height'], tf.int32)
    width = tf.cast(features['width'], tf.int32)
    channels = tf.cast(features['channels'], tf.int32)
    image = tf.reshape(image, [height, width, channels])
    label = tf.cast(features['label'], tf.int32)

    return image, label

def get_episode(classes_per_set, samples_per_class, list_files):
    """
    :param data_pack : train, val or test
    :param classes_per_set : N-way classification
    :param samples_per_class : k-shot classification
    :param list_files : list of length classes_per_set of files containing examples
    :return : an episode containing classes_per_set * samples_per_class + 1 image to classify among the N*k other
    """
    assert classes_per_set == len(list_files)

    dataset = tf.data.TFRecordDataset(list_files[-1]).map(read_and_decode) \
              .shuffle(100)
    elt_to_classify = dataset.take(1)
    rng.shuffle(list_files)
    episode = tf.data.TFRecordDataset([list_files[-1]]) \
              .map(read_and_decode) \
              .shuffle(100) \
              .take(1)

    _ = list_files.pop()

    for class_file in list_files:
        element = tf.data.TFRecordDataset([class_file]) \
                  .map(read_and_decode) \
                  .shuffle(150) \
                  .take(1)
        episode = episode.concatenate(element)

    episode = episode.concatenate(elt_to_classify)
    return episode

#Testing the code
episode = get_episode(20, 1, test_data)
start = time.time()
iterator = episode.make_one_shot_iterator()
end = time.time()

print("time elapsed: ", end - start)

"""
Result :
starting to build one_shot_iterator
time elapsed:  188.75095319747925
"""

耗时太长的步骤是迭代器初始化。在我的完整代码中,它由批处理情节组成,大约需要 15 分钟。我注意到这个问题很可能是由于评估episode.output_shapes:只在最后做 aprint(episode.output_shapes)也需要很长时间(但比初始化迭代器要少)。

此外,我在 Docker 中工作,当迭代器正在初始化时,我可以看到 CPU 在100 %整个步骤中都处于运行状态。

我想知道造成这种情况的原因是否是本机 Python 代码和 Tensorflow 操作之间的混合,这可能会导致 CPU 出现瓶颈。

我认为处理数据集 API 包括创建在 Tensorflow Graph 上创建Operation 节点tf.Session().run(),并且仅在执行.

有关更多信息,我尝试了:

episode = dataset.get_episode(50, 1, test_data[:50])
iterator = episode.make_one_shot_iterator()

3个小时后,它甚至没有结束。我停止了代码,这是 TraceBack(我编辑了一些重复的块,例如return self._as_variant_tensor()

KeyboardInterrupt              Traceback (most recent call last)
<ipython-input-8-550523c179b3> in <module>()
    2 print("there")
    3 start = time.time()
        ----> 4 iterator = episode.make_one_shot_iterator()
    5 end = time.time()
    6 print("time elapsed: ", end - start)

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/ops/dataset_ops.py in make_one_shot_iterator(self)
    110       return self._as_variant_tensor()  # pylint: disable=protected-access
    111 
--> 112     _make_dataset.add_to_graph(ops.get_default_graph())
    113 
    114     return iterator_ops.Iterator(

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/framework/function.py in add_to_graph(self, g)
    484   def add_to_graph(self, g):
    485     """Adds this function into the graph g."""
--> 486     self._create_definition_if_needed()
    487 
    488     # Adds this function into 'g'.

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/framework/function.py in _create_definition_if_needed(self)
    319     """Creates the function definition if it's not created yet."""
    320     with context.graph_mode():
--> 321       self._create_definition_if_needed_impl()
    322 
    323   def _create_definition_if_needed_impl(self):

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/framework/function.py in _create_definition_if_needed_impl(self)
    336       # Call func and gather the output tensors.
    337       with vs.variable_scope("", custom_getter=temp_graph.getvar):
--> 338         outputs = self._func(*inputs)
    339 
    340       # There is no way of distinguishing between a function not returning

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/ops/dataset_ops.py in _make_dataset()
    108     @function.Defun(capture_by_value=True)
    109     def _make_dataset():
--> 110       return self._as_variant_tensor()  # pylint: disable=protected-access
    111 
    112     _make_dataset.add_to_graph(ops.get_default_graph())

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/ops/dataset_ops.py in _as_variant_tensor(self)
    998     # pylint: disable=protected-access
    999     return gen_dataset_ops.concatenate_dataset(
-> 1000         self._input_dataset._as_variant_tensor(),
   1001         self._dataset_to_concatenate._as_variant_tensor(),
   1002         output_shapes=nest.flatten(self.output_shapes),


~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/ops/dataset_ops.py in output_shapes(self)
   1006   @property
   1007   def output_shapes(self):
-> 1008     return nest.pack_sequence_as(self._input_dataset.output_shapes, [
   1009         ts1.most_specific_compatible_shape(ts2)
   1010         for (ts1, ts2) in zip(

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/ops/dataset_ops.py in output_shapes(self)
   1009         ts1.most_specific_compatible_shape(ts2)
   1010         for (ts1, ts2) in zip(
-> 1011             nest.flatten(self._input_dataset.output_shapes),
   1012             nest.flatten(self._dataset_to_concatenate.output_shapes))
   1013     ])

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/ops/dataset_ops.py in output_shapes(self)
   1009         ts1.most_specific_compatible_shape(ts2)
   1010         for (ts1, ts2) in zip(
-> 1011             nest.flatten(self._input_dataset.output_shapes),
   1012             nest.flatten(self._dataset_to_concatenate.output_shapes))
   1013     ])


~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/util/nest.py in pack_sequence_as(structure, flat_sequence)
    239     return flat_sequence[0]
    240 
--> 241   flat_structure = flatten(structure)
    242   if len(flat_structure) != len(flat_sequence):
    243     raise ValueError(

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/util/nest.py in flatten(nest)
    133     A Python list, the flattened version of the input.
    134   """
--> 135   return list(_yield_flat_nest(nest)) if is_sequence(nest) else [nest]
    136 
    137 

~/miniconda2/envs/dljupyter/lib/python3.5/site-packages/tensorflow/python/data/util/nest.py in is_sequence(seq)
    118   """
    119   return (isinstance(seq, (_collections.Sequence, dict))
--> 120           and not isinstance(seq, (list, _six.string_types)))
    121 
    122 

KeyboardInterrupt: 

所以我想知道为什么要初始化迭代器需要这么长时间:我无法找到关于初始化如何工作的太多信息,以及在创建 Graph 时究竟评估了什么。

我无法通过纯粹的方法来实现我想要的tf.data.Dataset,但我还没有尝试过这些tf.data.Dataset.flat_map()/interleave()方法(在这个线程中使用过)。

4

1 回答 1

1

该代码非常昂贵,因为它在 Python 中循环了 16000 个文件,在图中创建了 O(16000) 个节点。但是,您可以通过Dataset.flat_map()将循环移动图中来避免这种情况:

def get_episode(classes_per_set, samples_per_class, list_files):
    """
    :param data_pack : train, val or test
    :param classes_per_set : N-way classification
    :param samples_per_class : k-shot classification
    :param list_files : list of length classes_per_set of files containing examples
    :return : an episode containing classes_per_set * samples_per_class + 1 image to classify among the N*k other
    """
    assert classes_per_set == len(list_files)

    elt_to_classify = tf.data.TFRecordDataset(list_files[-1]).map(read_and_decode) \
                      .shuffle(100) \
                      .take(1)

    rng.shuffle(list_files)

    # Special handling for the first file (smaller shuffle buffer).
    first_file = tf.data.TFRecordDataset([list_files[-1]]) \
                 .map(read_and_decode) \
                 .shuffle(100) \
                 .take(1)

    _ = list_files.pop()

    # Creates a nested dataset for each file in `list_files`, and 
    # concatenates them together.
    other_files = tf.data.Dataset.from_tensor_slices(list_files).flat_map(
        lambda filename: tf.data.TFRecordDataset(filename)
                         .map(read_and_decode)
                         .shuffle(150)
                         .take(1))

    episode = first_file.concatenate(other_files).concatenate(elt_to_classify)
    return episode
于 2017-12-13T22:47:21.570 回答