2017-01-13 59 views
0

需要一些帮助。看看Gpars数据流/管道,但我不明白的东西gpars dataflowQueues处理或管道似乎只能在df.val请求上触发

如果你看看下面的例子(我已经完成了这与运营商,piplines,chainWith和同样的问题)。

在这个例子中,我已经使用过任务,但可能很容易就没有,同样的问题也体现出来。在这个例子中,我设置了两个DataflowQueue,一个用于初始条件,另一个用于对谓词进行评估的结果。然后我布置一个管道,用于评估反对对谓词的输入的输入(甚至测试),并存储结果的输出结果队列

具有设置管道和张贴的某些条目到第一队列我认为该条目将在数据可用的情况下处理(这不适用于操作员版本),如您所见,在将条目写入sessionQ之后,我将resultQ的大小测试为零(如果我删除了任务,它仍然为真)。所以写入数据不会'触发'处理。

第一个任务将一些条目保存到一个队列中。

import groovyx.gpars.dataflow.Dataflow 
import groovyx.gpars.dataflow.DataflowQueue 
import groovyx.gpars.dataflow.DataflowVariable 
import groovyx.gpars.dataflow.Promise 

/** 
* Created by will on 13/01/2017. 
*/ 

def iValues = [1,2,3,4,5] 

DataflowQueue sessionQ = new DataflowQueue() 
DataflowQueue resultQ = new DataflowQueue() 

Dataflow.task { 
    println "setup task: set initial conditions list for rule predicate " 
    iValues.each {sessionQ << it} 
} 

Closure evenPredicate = {it %2 == 0} 

//layout pipeline 
sessionQ | evenPredicate | resultQ 

assert resultQ.iterator().size() == 0 

Promise ans = Dataflow.task { 
    println "result task : get three values from result q " 
    def outlist = [] 
    3.times { 
     def res = resultQ.val 
     println "got result $res" 
     outlist << res 
    } 
    assert sessionQ.iterator().size() == 0 
    assert resultQ.iterator().size() == 2 
    outlist 
} 

println "ans list is $ans.val" 
assert resultQ.iterator().size() == 2 

其仅在第二任务/ chainWith等 - 在第二队列,你调用一个.VAL(或得到()),发动机开始运行,所有的条目从第一队列处理,结果绑定到resultQ。

您可以从断言中看到这一点,因为一旦第一个触发器(.val)同步调用使引擎运行并处理启动sessionQ中的所有绑定条目。

这是一个问题,直到你运行第一个.val调用 - 如果你做一个poll()或者resultQ.interator.size(),例如它是空的并且是unbound,size()= 0。所以你不能写

for (dfRes in resultQ) {//do something with dfRes} 

它始终为空,直到你消耗sessionQ中的第一项。我不明白为什么?在条目绑定到第一个数据流队列后,我认为这些项目会在它们变得可用(被绑定)时被使用 - 但它们不是。

现在这很棘手,因为您无法获取条目,检查结果的大小,对resultQ执行poll(),因为它将会失败,直到读取到sessionQ中的第一个DF。

我已经结束了不得不使用初始值数组的大小(告诉我保存到队列中的条目)作为唯一的方法来读取相同的数字从resultQ清空它(在上面我只从结果Q中消耗了3条记录,并且断言显示resultQ中仍有2条记录(但是只有在第一个.val调用完成之后,如果您注释该行,则所有断言都会失败)

我试图与Dataflow.operator,管道等,并得到同样的问题。为什么不为每个输入绑定到SessionQ工作得到处理?

最后在流水线的情况下,那里有一个.complete ( )方法,如果您在管道中处理闭包{},则保持打开状态(!complete()),但是当您运行像.binaryChoice()这样的方法时,它会将管道标记为完整,并且无法添加任何进一步的操作。为什么这样做?

我不明白这个状态是什么意思(当然不会有更多的处理会发生),如果你尝试在这种方法之后尝试另一个步骤,就会抛出一个异常。

无论哪种方式 - 我想这样

Pipeline pipeLine = new Pipeline(Q) 
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) } 

管道线路然而,当您将值绑定至Q什么也没有发生 - 直到你吃像

odd.val 

输出时,一下子把管道'运行'并处理存储在Q中的所有DF项目。

没有我试过kickstarts工作的调度 - 除了第一个.val消耗

可以解释为什么这是,我必须在这里忽略这一点,但是这个'什么也不做',直到第一个条目被读取,并不是我期望的,并且使任何大小的评估(.iterator.size(), poll()等)在DataflowWriteChannel目标上调用类型。

我会欣赏这方面的任何帮助 - 我已经为此挣扎了两天,并且无处可去。我也查看了所有Gpars测试,他们只是调用.val与输入绑定的次数相同 - 所以不要显示我描述的问题。

瓦茨拉夫·佩赫,或任何其他Gpars大师谁看的问题,我将不胜感激任何帮助的洞察力,让我在这个驼峰

问候提前

回答

1

一个小的修改(添加延迟)就在你声称大小为0之前,将显示计算是由写入的数据触发的:

//layout pipeline 
sessionQ | evenPredicate | resultQ 
sleep 5000 
assert resultQ.iterator().size() == 0 
+0

谢谢瓦茨拉夫,我早上在火车上,也许是时候让房地产'赶上'将清除我的问题。我尝试了几次攻击,看起来像没有按预期处理工作 - 回报各种'尝试'模式的调查结果我尝试过,看看是否有窍门 –

+0

我增加了睡眠差距500毫秒,然后开始看例子正确和在其他线程的工作有机会做背景的东西。尽管 –

+0

完整状态限制管道构建器只允许构建有效的管道,但我仍然不确定管道上的“完整”状态如何保护。当您不能再向其添加操作员时,管道将视为已完成。例如,通过调用binaryChoice(),将两个通道挂接到管道末端,并且所有其他管道操作都必须附加到这两个通道中的任一个,而不是原始管道。 –