需要一些帮助。查看 Gpars 数据流/管道,但有些我不明白
如果您查看下面的示例(我已经使用运算符、管道线、chainWith 完成了此操作并遇到了同样的问题)。
在此示例中,我使用了任务,但也很容易没有任务,并且出现相同的问题。在此示例中,我设置了两个 DataflowQueue,一个用于初始条件,一个用于根据谓词评估的结果。然后我布局一个管道,根据谓词(偶数测试)评估输入并将结果存储在输出结果队列中
设置了管道并将一些条目发布到第一个队列中,我相信这些条目将在数据可用时被处理(这也不适用于操作员版本),如您所见,我测试了 resultQ 的大小为零(如果我在我将条目写入 sessionQ 之后,删除仍然正确的任务)。因此,写入数据不会“触发”处理。
第一个任务将一些条目保存到队列中。
import groovyx.gpars.dataflow.Dataflow
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.Promise
/**
* Created by will on 13/01/2017.
*/
def iValues = [1,2,3,4,5]
DataflowQueue sessionQ = new DataflowQueue()
DataflowQueue resultQ = new DataflowQueue()
Dataflow.task {
println "setup task: set initial conditions list for rule predicate "
iValues.each {sessionQ << it}
}
Closure evenPredicate = {it %2 == 0}
//layout pipeline
sessionQ | evenPredicate | resultQ
assert resultQ.iterator().size() == 0
Promise ans = Dataflow.task {
println "result task : get three values from result q "
def outlist = []
3.times {
def res = resultQ.val
println "got result $res"
outlist << res
}
assert sessionQ.iterator().size() == 0
assert resultQ.iterator().size() == 2
outlist
}
println "ans list is $ans.val"
assert resultQ.iterator().size() == 2
它仅在第二个任务/chainWith 等中 - 您在引擎开始运行的第二个队列上调用 .val (或 get() ),所有条目都从第一个队列处理,结果绑定到 resultQ。
您可以从断言中看到这一点,因为一旦进行了第一个触发器 (.val) 同步调用,引擎就会运行并处理起始 sessionQ 中的所有绑定条目。
这是一个问题,因为直到您运行第一个 .val 调用 - 如果您执行 poll() 或 resultQ.interator.size() 例如它是空的且未绑定,size()=0。所以你不能写
for (dfRes in resultQ) {//do something with dfRes}
因为它总是空的,直到您使用 sessionQ 中的第一个项目。我不明白为什么?在条目绑定到第一个 dataflowQueue 后,我认为这些项目将在它们可用时被消耗(已绑定) - 但它们不是。
现在这很棘手,因为您无法通过,检查结果的大小,在 resultQ 上执行 poll(),因为它会失败,直到读取 sessionQ 的第一个 DF。
我最终不得不使用初始值数组的大小(告诉我保存到队列中的条目),因为唯一意味着从 resultQ 中读取相同的数字以清空它(在上面我只消耗了 3 resultsQ 中的记录和断言表明 resultQ 中还剩下 2 条记录(但只有在第一次 .val 调用之后,如果您注释掉所有断言开始失败)
我用 Dataflow.operator、Pipeline 等尝试了这个并得到了同样的问题。为什么每个输入都绑定到 SessionQ 时,工作没有得到处理?
最后,在 Pipeline 的情况下,有一个 .complete() 方法,如果你在管道中处理闭包 {},它会保持打开状态 (!complete()),但是当你运行像 .binaryChoice() 这样的方法时,它会标记管道已完成,无法添加进一步的操作。为什么这样做?
当然,我不明白那个状态在说什么(不再进行处理),如果您尝试在这种方法之后再执行另一个步骤,则会引发异常。
无论哪种方式 - 我尝试过这样的管道线
Pipeline pipeLine = new Pipeline(Q)
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) }
但是,当您将值绑定到 Q 时,什么都不会发生 - 直到您使用类似的输出
odd.val
当管道突然“运行”并处理存储在 Q 中的所有 DF 项目时。
除了第一个 .val 消耗之外,我没有尝试过启动工作的安排
任何人都可以解释为什么会这样,我必须在这里忽略这一点,但是在读取第一个条目之前“什么都不做”不是我所期望的,并且会使任何大小评估(.iterator.size(),poll()等无效) 类型调用 DataflowWriteChannel 目标。
我很感激对此的任何帮助-我已经为此苦苦挣扎了两天,但一无所获。我也查看了所有 Gpars 测试,它们只是调用 .val 的次数与输入绑定的次数相同 - 所以不要显示我描述的问题。
Vaclav Pech 或任何其他观看这些问题的 Gpars 大师,我将不胜感激任何有关这方面的帮助见解,以帮助我度过这个难关
提前问候