2017-06-14 61 views
0

当我想用tf.train.string_input_producer加载数据2个时代,我用如何使用TensorFlow tf.train.string_input_producer生成多个时代数据?

filename_queue = tf.train.string_input_producer(filenames=['data.csv'], num_epochs=2, shuffle=True) 

col1_batch, col2_batch, col3_batch = tf.train.shuffle_batch([col1, col2, col3], batch_size=batch_size, capacity=capacity,\min_after_dequeue=min_after_dequeue, allow_smaller_final_batch=True) 

但后来我发现,这种运算没有产生我想要的。

它只能生成data.csv中的每个样品2次,但生成的顺序不明确。例如,3个data.csv

[[1] 
[2] 
[3]] 

线数据就会产生(其中每个样品只出现2次,但该命令是可选的)

[1] 
[1] 
[3] 
[2] 
[2] 
[3] 

但我想是(每个历元是分开,洗牌在每个时间段)

​​

此外,如何知道什么时候1个时代做?有一些标志变量吗?谢谢!

我的代码在这里。

import tensorflow as tf 

def read_my_file_format(filename_queue): 
    reader = tf.TextLineReader() 
    key, value = reader.read(filename_queue) 
    record_defaults = [['1'], ['1'], ['1']] 
    col1, col2, col3 = tf.decode_csv(value, record_defaults=record_defaults, field_delim='-') 
    # col1 = list(map(int, col1.split(','))) 
    # col2 = list(map(int, col2.split(','))) 
    return col1, col2, col3 

def input_pipeline(filenames, batch_size, num_epochs=1): 
    filename_queue = tf.train.string_input_producer(
    filenames, num_epochs=num_epochs, shuffle=True) 
    col1,col2,col3 = read_my_file_format(filename_queue) 

    min_after_dequeue = 10 
    capacity = min_after_dequeue + 3 * batch_size 
    col1_batch, col2_batch, col3_batch = tf.train.shuffle_batch(
    [col1, col2, col3], batch_size=batch_size, capacity=capacity, 
    min_after_dequeue=min_after_dequeue, allow_smaller_final_batch=True) 
    return col1_batch, col2_batch, col3_batch 

filenames=['1.txt'] 
batch_size = 3 
num_epochs = 1 
a1,a2,a3=input_pipeline(filenames, batch_size, num_epochs) 

with tf.Session() as sess: 
    sess.run(tf.local_variables_initializer()) 
    # start populating filename queue 
    coord = tf.train.Coordinator() 
    threads = tf.train.start_queue_runners(coord=coord) 
    try: 
    while not coord.should_stop(): 
     a, b, c = sess.run([a1, a2, a3]) 
     print(a, b, c) 
    except tf.errors.OutOfRangeError: 
    print('Done training, epoch reached') 
    finally: 
    coord.request_stop() 

    coord.join(threads) 

我的数据是一样

1,2-3,4-A 
7,8-9,10-B 
12,13-14,15-C 
17,18-19,20-D 
22,23-24,25-E 
27,28-29,30-F 
32,33-34,35-G 
37,38-39,40-H 
+0

您可以添加生成张量'col1','col2','col3'的代码?代码被写入的方式表明你在流水线结束时洗牌,因此它将全部混在一起 – MZHm

+1

我添加了我的代码和数据。@ MZHm – danche

+0

你可能想看看这个答案,看看是否有类似的问题: https://stackoverflow.com/a/44526962/4282745 – npf

回答

6

由于Nicolas observestf.train.string_input_producer() API不给你检测达到一个时代的结束时的能力;相反,它将所有时代连接成一个长批次。为此,我们最近添加了(在TensorFlow 1.2中)tf.contrib.data API,这使得可以表达更复杂的流水线,包括您的用例。

下面的代码片段显示了如何使用tf.contrib.data编写程序:

import tensorflow as tf 

def input_pipeline(filenames, batch_size): 
    # Define a `tf.contrib.data.Dataset` for iterating over one epoch of the data. 
    dataset = (tf.contrib.data.TextLineDataset(filenames) 
       .map(lambda line: tf.decode_csv(
        line, record_defaults=[['1'], ['1'], ['1']], field_delim='-')) 
       .shuffle(buffer_size=10) # Equivalent to min_after_dequeue=10. 
       .batch(batch_size)) 

    # Return an *initializable* iterator over the dataset, which will allow us to 
    # re-initialize it at the beginning of each epoch. 
    return dataset.make_initializable_iterator() 

filenames=['1.txt'] 
batch_size = 3 
num_epochs = 10 
iterator = input_pipeline(filenames, batch_size) 

# `a1`, `a2`, and `a3` represent the next element to be retrieved from the iterator.  
a1, a2, a3 = iterator.get_next() 

with tf.Session() as sess: 
    for _ in range(num_epochs): 
     # Resets the iterator at the beginning of an epoch. 
     sess.run(iterator.initializer) 

     try: 
      while True: 
       a, b, c = sess.run([a1, a2, a3]) 
       print(a, b, c) 
     except tf.errors.OutOfRangeError: 
      # This will be raised when you reach the end of an epoch (i.e. the 
      # iterator has no more elements). 
      pass     

     # Perform any end-of-epoch computation here. 
     print('Done training, epoch reached') 
+1

为什么我们使用控制流的异常? (即'tf.errors.OutOfRangeError'除外) – MZHm

+2

异常是TensorFlow当前有信号表明所请求的值尚未计算的唯一机制。 (它类似于Python如何使用StopIteration异常来在自己的迭代器协议中指示迭代器的结束)。当然可以将它包装在某些库代码中,并且我提出了一种在[this GitHub评论](https://github.com/tensorflow/tensorflow/issues/7951#issuecomment-303546037)。 – mrry

+2

为什么不简单'而不是sess.run(epoch_done):...'? 'epoch_done'是一个由队列设置的变量,由'iterator.initializer'重置。 – MZHm

2

你可能想看看这个answer到类似的问题。

的短篇小说是:

  • 如果num_epochs> 1,所有的数据都在同一时间排队和独立suffled的时代,

  • ,所以你没有监视哪个时代正在出列的能力。

你可以做的是在所列出的答案,这是在每次运行与num_epochs == 1的工作,并重新初始化本地队列变量(和显然不是模型变量)的第一个建议。

init_queue = tf.variables_initializer(tf.get_collection(tf.GraphKeys.LOCAL_VARIABLES, scope='input_producer')) 
with tf.Session() as sess: 
    sess.run(tf.global_variables_initializer()) 
    sess.run(tf.local_variables_initializer()) 
for e in range(num_epochs): 
    with tf.Session() as sess: 
     sess.run(init_queue) # reinitialize the local variables in the input_producer scope 
     # start populating filename queue 
     coord = tf.train.Coordinator() 
     threads = tf.train.start_queue_runners(coord=coord) 
     try: 
      while not coord.should_stop(): 
       a, b, c = sess.run([a1, a2, a3]) 
       print(a, b, c) 
     except tf.errors.OutOfRangeError: 
      print('Done training, epoch reached') 
     finally: 
      coord.request_stop() 

     coord.join(threads) 
+0

再次感谢。我之前尝试过这个解决方案,但我认为它还不够优雅:P。也许这是最实际的方法,我认为应该增加一些参数来解决这个问题。 – danche

+0

以这种方式,我需要初始变量的每个时期,但是这个运算会产生一些其他问题给模型,对吧? – danche

+0

我同意。无论如何根据这个评论:https://github.com/tensorflow/tensorflow/issues/4535#issuecomment-283181862 队列不是我们将来处理数据的方式。 – npf