1. tf.data (官方推荐):方便地构建复杂的输入管道
2. Feeding:通过input_fn来yield数据
3. QueueRunner:基于队列的输入管道
4. 预加载数据。用constant或variable在内存中存储所有的数据
1 2 3 4 | with tf.Session(): input = tf.placeholder(tf.float32) classifier = ... print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()})) |
简单教程中的数据加载,都是一行伪代码:(x, y) = load_data() 就搞定了
– Dataset用来表示一组数据,是一个序列,序列的每个元素包含一个或多个Tensor对象。
– Iterator用户从数据集中抽取元素,Iterator.get_next()可以返回数据集的下一个元素,可以看作是输入管道与模型之间的接口。
– one-shot
– initializable
– reinitializable
– feedable
1 2 3 4 5 6 | max_value = tf.placeholder(tf.int64, shape=[]) dataset = tf.data.Dataset.range(max_value) iterator = dataset.make_initializable_iterator() next_element = iterator.get_next() # Initialize an iterator over a dataset with 10 elements. sess.run(iterator.initializer, feed_dict={max_value: 10}) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | # initializable iterator to switch between dataset EPOCHS = 10 x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1]) dataset = tf.data.Dataset.from_tensor_slices((x, y)) train_data = (np.random.sample((100,2)), np.random.sample((100,1))) test_data = (np.array([[1,2]]), np.array([[0]])) iter = dataset.make_initializable_iterator() features, labels = iter.get_next() with tf.Session() as sess: # initialise iterator with train data sess.run(iter.initializer, feed_dict={ x: train_data[0], y: train_data[1]}) for _ in range(EPOCHS): sess.run([features, labels]) # switch to test data sess.run(iter.initializer, feed_dict={ x: test_data[0], y: test_data[1]}) print(sess.run([features, labels])) |
但如果我们不想在运行期使用feed_dict怎么办呢,用reinitializable迭代器支持多个数据集对象的初始化,创建的迭代器不再是dataset.make_xxx_iterator这种形式,而是使用结构直接使用Iterator的静态方法初始化tf.data.Iterator.from_structure(train_dataset.output_types, train_dataset.output_shapes)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | # Reinitializable iterator to switch between Datasets EPOCHS = 10 # making fake data using numpy train_data = (np.random.sample((100,2)), np.random.sample((100,1))) test_data = (np.random.sample((10,2)), np.random.sample((10,1))) # create two datasets, one for training and one for test train_dataset = tf.data.Dataset.from_tensor_slices(train_data) test_dataset = tf.data.Dataset.from_tensor_slices(test_data) # create a iterator of the correct shape and type iter = tf.data.Iterator.from_structure(train_dataset.output_types, train_dataset.output_shapes) features, labels = iter.get_next() # create the initialisation operations train_init_op = iter.make_initializer(train_dataset) test_init_op = iter.make_initializer(test_dataset) with tf.Session() as sess: sess.run(train_init_op) # switch to train dataset for _ in range(EPOCHS): sess.run([features, labels]) sess.run(test_init_op) # switch to val dataset print(sess.run([features, labels])) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | # Define training and validation datasets with the same structure. training_dataset = tf.data.Dataset.range(100).map( lambda x: x + tf.random_uniform([], -10, 10, tf.int64)).repeat() validation_dataset = tf.data.Dataset.range(50) # # A feedable iterator is defined by a handle placeholder and its structure. We # could use the `output_types` and `output_shapes` properties of either # `training_dataset` or `validation_dataset` here, because they have # identical structure. handle = tf.placeholder(tf.string, shape=[]) iterator = tf.data.Iterator.from_string_handle( handle, training_dataset.output_types, training_dataset.output_shapes) # next_element = iterator.get_next() # # You can use feedable iterators with a variety of different kinds of iterator # (such as one-shot and initializable iterators). training_iterator = training_dataset.make_one_shot_iterator() validation_iterator = validation_dataset.make_initializable_iterator() # # The `Iterator.string_handle()` method returns a tensor that can be evaluated # and used to feed the `handle` placeholder. training_handle = sess.run(training_iterator.string_handle()) validation_handle = sess.run(validation_iterator.string_handle()) # # Loop forever, alternating between training and validation. while True: # Run 200 steps using the training dataset. Note that the training dataset is # infinite, and we resume from where we left off in the previous `while` loop # iteration. for _ in range(200): sess.run(next_element, feed_dict={handle: training_handle}) # # Run one pass over the validation dataset. sess.run(validation_iterator.initializer) for _ in range(50): sess.run(next_element, feed_dict={handle: validation_handle}) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | EPOCHS = 10 BATCH_SIZE = 16 # using two numpy arrays features, labels = (np.array([np.random.sample((100,2))]), np.array([np.random.sample((100,1))])) dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE) iter = dataset.make_one_shot_iterator() x, y = iter.get_next() # make a simple model net = tf.layers.dense(x, 8, activation=tf.tanh) # pass the first value from iter.get_next() as input net = tf.layers.dense(net, 8, activation=tf.tanh) prediction = tf.layers.dense(net, 1, activation=tf.tanh) loss = tf.losses.mean_squared_error(prediction, y) # pass the second value from iter.get_net() as label train_op = tf.train.AdamOptimizer().minimize(loss) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) for i in range(EPOCHS): _, loss_value = sess.run([train_op, loss]) print("Iter: {}, Loss: {:.4f}".format(i, loss_value)) |
其中x, y = iter.get_next()表示x,y都是一个获取数据的操作,每次sess.run就是执行一次x(), y()这样就获得一个样本数据。
1 2 3 4 5 6 7 8 9 | dataset = tf.data.Dataset.from_tensor_slices((data, labels)) dataset = dataset.batch(32).repeat() # val_dataset = tf.data.Dataset.from_tensor_slices((val_data, val_labels)) val_dataset = val_dataset.batch(32).repeat() # model.fit(dataset, epochs=10, steps_per_epoch=30, validation_data=val_dataset, validation_steps=3) |
1 2 3 4 5 6 7 8 9 10 11 | def input_parser(img_path, label): # convert the label to one-hot encoding one_hot = tf.one_hot(label, NUM_CLASSES) # # read the img from file img_file = tf.read_file(img_path) img_decoded = tf.image.decode_image(img_file, channels=3) # return img_decoded, one_hot tr_data = tr_data.map(input_parser) |
对于图像的转换处理,我们常常用到 tf.image下的decode_bmp decode_jpeg resize_images transpose_image等等函数,具体的参考[2]。
1. 获取文件名列表
2. [Optional]shuffle文件名列表
3. [Optional]根据epoch扔掉一些文件
4. 建立文件名的queue
5. 使用对应文件的Reader,不同的文件格式用不同的reader,比如TFRecordReader解析TFRecord格式,普通文件的话,就用FileReader,但读取普通文件也比较复杂,比如读单个文件可以使用tf.gfile.FastGfile(path, “r”).read(),对于filename_queue,则使用WholeFileReader(),如下面的语句
1 2 3 4 | image_reader = tf.WholeFileReader() data_queue = tf.train.string_input_producer([image_dir], shuffle=False) image_key, image_value = image_reader.read(data_queue) img = tf.image.decode_jpeg(image_value, channels=3) |
1 2 | image_value = tf.read_file(image_dir) img = tf.image.decode_jpeg(image_value, channels=3) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"]) # reader = tf.TextLineReader() key, value = reader.read(filename_queue) # # Default values, in case of empty columns. Also specifies the type of the # decoded result. record_defaults = [[1], [1], [1], [1], [1]] col1, col2, col3, col4, col5 = tf.decode_csv( value, record_defaults=record_defaults) features = tf.stack([col1, col2, col3, col4]) # with tf.Session() as sess: # Start populating the filename queue. coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) # for i in range(1200): # Retrieve a single instance: example, label = sess.run([features, col5]) # coord.request_stop() coord.join(threads) |
1 2 3 4 5 | dataset = tf.data.TFRecordDataset(filename) dataset = dataset.repeat(num_epochs) # # map takes a python function and applies it to every sample dataset = dataset.map(decode) |
6. 使用Decoder从Reader中反解成Tensor
7. [Optional] preprocessing预处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | def read_my_file_format(filename_queue): reader = tf.SomeReader() key, record_string = reader.read(filename_queue) example, label = tf.some_decoder(record_string) processed_example = some_processing(example) return processed_example, label # def input_pipeline(filenames, batch_size, num_epochs=None): filename_queue = tf.train.string_input_producer( filenames, num_epochs=num_epochs, shuffle=True) example, label = read_my_file_format(filename_queue) # min_after_dequeue defines how big a buffer we will randomly sample # from -- bigger means better shuffling but slower start up and more # memory used. # capacity must be larger than min_after_dequeue and the amount larger # determines the maximum we will prefetch. Recommendation: # min_after_dequeue + (num_threads + a small safety margin) * batch_size min_after_dequeue = 10000 capacity = min_after_dequeue + 3 * batch_size example_batch, label_batch = tf.train.shuffle_batch( [example, label], batch_size=batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return example_batch, label_batch |
