1

由于 feed_dict 的低效率,我想使用 FIFOQueue 和 StagingArea 来加速我的输入管道。所以我使用 python 线程将 numpy 数据提供给 tf.FIFOQueue 和主线程出列张量进行训练。如果队列不为空,将数据从 python 运行时复制到 tensorflow 运行时的延迟已被隐藏。

我的测试模型是 Alexnet 模型,这里是模型代码

import tensorflow as tf

class Model(object):

def __init__(self, num_classes=1000):

    self.num_classes = num_classes

def inference(self, images):
    network = tf.layers.conv2d(
            images, 64, [11, 11], 4, padding='VALID')
    network = tf.layers.max_pooling2d(network, [3, 3], 2)
    network = tf.layers.conv2d(network, 192, [5, 5]) 
    network = tf.layers.max_pooling2d(network, [3, 3], 2)
    network = tf.layers.conv2d(network, 384, [3, 3]) 
    network = tf.layers.conv2d(network, 384, [3, 3]) 
    network = tf.layers.conv2d(network, 256, [3, 3]) 
    network = tf.layers.max_pooling2d(network, [3, 3], 2)

    network = tf.layers.flatten(network)
    network = tf.layers.dense(
            network, 4096, tf.nn.relu,
            kernel_initializer=tf.truncated_normal_initializer(stddev=1.0))
    network = tf.layers.dropout(network, rate=0.5)
    network = tf.layers.dense(
            network, 4096, tf.nn.relu,
            kernel_initializer=tf.truncated_normal_initializer(stddev=1.0))
    network = tf.layers.dropout(network, rate=0.5)
    network = tf.layers.dense(network, self.num_classes)

    return network

def compute_loss(self, logits, labels):
    cross_entropy = tf.losses.sparse_softmax_cross_entropy(
            logits=logits, labels=labels)
    return tf.reduce_mean(cross_entropy)

def build_model(self, images, labels):
    logits = self.inference(images)
    return self.compute_loss(logits, labels)

def get_optimizer(self):
    return tf.train.GradientDescentOptimizer(0.01)

首先,我使用假数据构建baseline性能基准,
分别在cpu和gpu设备上使用tensorflow随机算子生成图像和标签,性能应该达到上限。

import os
import sys
import time
from datetime import datetime
import argparse
import numpy as np
import tensorflow as tf
from model import Model
import pdb

num_classes = 1000
height = 227
width = 227
channel = 3

def input_pipeline(batch_size, device="/cpu:0", use_feed_dict=False):
    with tf.device(device):
        shape = [batch_size, height, width, channel]
        if use_feed_dict:
            images = tf.placeholder(shape=shape, dtype=tf.float32)
            labels = tf.placeholder(shape=[batch_size], dtype=tf.int32)
        else:
           images = tf.truncated_normal(
                    shape, dtype=tf.float32,
                    mean=127, stddev=60)
            images = tf.contrib.framework.local_variable(images)
            labels = tf.random_uniform(
                    [batch_size], minval=0, maxval=num_classes, dtype=tf.int32)

        return images, labels

def build_fetches(images, labels):
    with tf.device("/gpu:0"):
        model = Model(num_classes)
        loss = model.build_model(images, labels)
        optimizer = model.get_optimizer()
        train_op = optimizer.minimize(loss)

    return train_op

def get_numpy_batch(batch_size):
    shape = [batch_size, height, width, channel]
    batch_images = np.random.random(shape).astype(np.float32)
    batch_labels = np.random.rand(batch_size) * 1000
    batch_labels = batch_labels.astype(np.int32)

    return batch_images, batch_labels

def main(args, warmup_steps=10):
    images, labels = input_pipeline(args.batch_size, args.device, 
        args.use_feed_dict)
    fetches = build_fetches(images, labels)

    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        sess.run(tf.local_variables_initializer())

        if args.use_feed_dict:
            batch_images, batch_labels = get_numpy_batch(args.batch_size)
            feed_dict = {
                    images: batch_images,
                    labels: batch_labels,
            }
        else:
            feed_dict = None

        for _ in xrange(warmup_steps):
            sess.run(fetches, feed_dict=feed_dict)

        begin = time.time()
        for _ in xrange(args.steps):
            sess.run(fetches, feed_dict=feed_dict)
        time_cost = time.time() - begin

        print "%s: after %d steps for test, result is %.2f ms (%.3f ms/step)" % (
            datetime.now(), args.steps, time_cost * 1000, time_cost * 1000 / args.steps)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--device", default="cpu")
    parser.add_argument("--steps", type=int, default=100)
    parser.add_argument("--batch_size", type=int, default=256)
    parser.add_argument("--use_feed_dict", action="store_true")
    args = parser.parse_args()

    if args.use_feed_dict:
        args.device = "cpu"185.489
    args.device = "/%s:0" % args.device

    os.environ["CUDA_VISIBLE_DEVICES"]="0"
    main(args)

在GPU P40和tensorflow verison 1.4.1的环境下,batch_size设置为256,即batch数据大小为150MB

|--------------------- |------------------|
|    device/feed_dict  |     ms/steps     |
|--------------------- |------------------|
|          cpu         |      106.367     |
|--------------------- |------------------|
|          gpu         |      92.650      |
|--------------------- |------------------|
|       feed_dict      |      115.710     |
|--------------------- |------------------|

似乎将 150MB 数据从 python 运行时复制到 tf 运行时需要 11 毫秒,而将数据从 cpu 传输到 gpu 需要 14 毫秒。

然后我使用 FIFOQueue 和 StagingArea 数据来预取数据

batch_images = np.random.random(
        [256, 227, 227, 3]).astype(np.float32)
batch_labels = np.random.rand(256) * 1000
batch_labels = batch_labels.astype(np.int32)

with tf.device("/cpu:0"):
    images_plr = tf.placeholder(shape=[None, 227, 227, 3], dtype=tf.float32)
    labels_plr = tf.placeholder(shape=[None], dtype=tf.int32)

    queue = tf.FIFOQueue(106, 
            dtypes=[images_plr.dtype.base_dtype, labels_plr.dtype.base_dtype])
    enqueue_op = queue.enqueue([images_plr, labels_plr])
    images, labels = queue.dequeue()
    images.set_shape(images_plr.get_shape())
    labels.set_shape(labels_plr.get_shape())

with tf.device("/gpu:0"):
    stage = StagingArea(
            dtypes=[images.dtype.base_dtype, labels.dtype.base_dtype], shapes=None)
    put_op = stage.put([images, labels])
    images, labels = stage.get()
    images.set_shape(images_plr.shape)
    labels.set_shape(labels_plr.shape)

with tf.device("/gpu:0"):
    model = Model(1000)
    loss = model.build_model(images, labels)
    opt = model.get_optimizer()
    fetches = opt.minimize(loss)

    fetches = tf.group(fetches, put_op)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    print "prefetch 106 steps data to fifoqueue"
    for _ in range(106):
        sess.run(enqueue_op, feed_dict={images_plr : batch_images, labels_plr: batch_labels})

    sess.run(put_op)

    for _ in range(5):
        sess.run(fetches, feed_dict=None)

    begin = time.time()
    for _ in range(100):
         sess.run(fetches, feed_dict=None)
    time_cost = time.time() - begin
    print "after 100 steps for testing, result is %.2f ms/step" % (
         time_cost * 1000 / 100)

结果性能为 113.669 毫秒/步,仅比 feed_dict 好一点,但我的目标是隐藏从 python 运行时获取数据到 gpu 的延迟。我的代码有什么问题?

4

0 回答 0