2016-08-23 78 views
0

我有一个队列(称为queue_A)并填充100个元素。如果我想执行以下两两件事:从queue_ATensorflow:出队然后入队

  1. 出列1个元件,做在其上的一些处理,并将结果排队到另一个队列(queue_B)。排队运行称为op_B
  2. 将此元素(在处理之前)排入queue_A,此排队操作称为op_A

为了实现1,我可以这样写:

anElement = queue_A.dequeue() 
result = proc(anElement) 
op_B = queue_B.enqueue(result) 
queue_runner = tf.train.QueueRunner(queue_B, 
            [op_B] * 4) 

为了实现2,我可以这样写:

anElement = queue_A.dequeue() 
op_A = queue_A.enqueue(anElement) 
queue_runner = tf.train.QueueRunner(queue_A, 
            [op_A] * 4) 

不过,我不知道我该怎么做这两件事情立刻。 现在,我使用下面的代码:

anElement = queue_A.dequeue() 
op_A = queue_A.enqueue(anElement) 
result = proc(anElement) 
op_B = queue_B.enqueue(result) 
queue_runner = tf.train.QueueRunner(queue_B, 
            [op_A, op_B] * 4) 

我期待queue_A的大小是恒定的,但是当我使用session.run(queue_A.size())检查它的尺寸逐渐减小。 该代码有什么问题?以及如何实现我想要的?

回答

3

的代码在你的例子有两种类型的“队列运行”的:

  1. 一个运行op_A:从queue_A从队列中的元素,并且入列回queue_B
  2. 另一个运行op_B:它从queue_A中取出元素,通过proc()处理它,并将结果排入queue_B

的问题是,当op_Aop_B单独运行(例如,在不同的队列跑步,或在不同的调用sess.run()),他们将出列从queue_A不同元件。通过运行op_B出列的元素将永不会重新排列为queue_A,这就解释了为什么它的大小逐渐减小。

要解决此问题,作为Andrei suggests,您需要创建一个运行单个TensorFlow子图的操作,该子图执行op_Aop_B。下面的例子应该工作:

anElement = queue_A.dequeue() 
op_A = queue_A.enqueue(anElement) 
result = proc(anElement) 
op_B = queue_B.enqueue(result) 

# Creates a single op that enqueues the original element back to queue_A and the 
# processed element to queue_B. 
op = tf.group(op_A, op_B) 
queue_runner = tf.train.QueueRunner(queue_B, [op] * 4) 
+0

谢谢你们这么多。我有两个后续问题。 1.'op = tf.group(op_A,op_B)queue_runner = tf.train.QueueRunner(queue_B,[op] * 4)'和'queue_runner = tf.train.QueueRunner(queue_B,[op_A, op_B] * 4)'? 2。因为op_A和op_B涉及两个不同的队列,我应该使用哪个队列作为QueueRunner的第一个参数? queue_A或queue_B?谢谢! – denru

+0

在第一个片段('[op] * 4')中,'queue_runner'将创建四个线程,每个线程调用'sess.run(op)',它将两个op运行在一起。在第二个片段('[op_A,op_B] * 4')中,'queue_runner'将创建八个调用'sess.run(op_A)'或'sess.run(op_B)'的线程,因此运行两个ops分别为'op_A'和'op_B'出队单独的元素。 – mrry

+0

'QueueRunner()'的'queue'参数并不特别重要,但它确实控制了如何执行干净关闭(例如,如果使用'tf.train.Coordinator')。您可以传递任一队列作为参数,但是您还应该传递'close_op = tf.group(queue_A.close(),queue_B.close())',以便两个队列在关闭时关闭。 – mrry

1

不幸的是我无法解释为什么你的代码不能正常工作,但它看起来像op_A不执行,因为它不依赖于queue_B,我建议你使用控制流运算(例如tf.group)实现你想要的东西。

op = tf.group(op_A, op_B) 
queue_runner = tf.train.QueueRunner(queue_B, 
            [op] * 4)