0

需要一些帮助。查看 Gpars 数据流/管道,但有些我不明白

如果您查看下面的示例(我已经使用运算符、管道线、chainWith 完成了此操作并遇到了同样的问题)。

在此示例中,我使用了任务,但也很容易没有任务,并且出现相同的问题。在此示例中,我设置了两个 DataflowQueue,一个用于初始条件,一个用于根据谓词评估的结果。然后我布局一个管道,根据谓词(偶数测试)评估输入并将结果存储在输出结果队列中

设置了管道并将一些条目发布到第一个队列中,我相信这些条目将在数据可用时被处理(这也不适用于操作员版本),如您所见,我测试了 resultQ 的大小为零(如果我在我将条目写入 sessionQ 之后,删除仍然正确的任务)。因此,写入数据不会“触发”处理。

第一个任务将一些条目保存到队列中。

import groovyx.gpars.dataflow.Dataflow
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.Promise

/**
 * Created by will on 13/01/2017.
 */

def iValues = [1,2,3,4,5]

DataflowQueue sessionQ = new DataflowQueue()
DataflowQueue resultQ = new DataflowQueue()

Dataflow.task {
    println "setup task: set initial conditions list for rule predicate "
    iValues.each {sessionQ << it}
}

Closure evenPredicate = {it %2 == 0}

//layout pipeline 
sessionQ | evenPredicate   | resultQ

assert resultQ.iterator().size() == 0

Promise ans =  Dataflow.task {
    println "result task : get three values from result q "
    def outlist = []
    3.times {
        def res = resultQ.val
        println "got result $res"
        outlist << res
    }
    assert sessionQ.iterator().size() == 0
    assert resultQ.iterator().size() == 2
    outlist
}

println "ans list is $ans.val"
assert resultQ.iterator().size() == 2

它仅在第二个任务/chainWith 等中 - 您在引擎开始运行的第二个队列上调用 .val (或 get() ),所有条目都从第一个队列处理,结果绑定到 resultQ。

您可以从断言中看到这一点,因为一旦进行了第一个触发器 (.val) 同步调用,引擎就会运行并处理起始 sessionQ 中的所有绑定条目。

这是一个问题,因为直到您运行第一个 .val 调用 - 如果您执行 poll() 或 resultQ.interator.size() 例如它是空的且未绑定,size()=0。所以你不能写

for (dfRes in resultQ) {//do something with dfRes} 

因为它总是空的,直到您使用 sessionQ 中的第一个项目。我不明白为什么?在条目绑定到第一个 dataflowQueue 后,我认为这些项目将在它们可用时被消耗(已绑定) - 但它们不是。

现在这很棘手,因为您无法通过,检查结果的大小,在 resultQ 上执行 poll(),因为它会失败,直到读取 sessionQ 的第一个 DF。

我最终不得不使用初始值数组的大小(告诉我保存到队列中的条目),因为唯一意味着从 resultQ 中读取相同的数字以清空它(在上面我只消耗了 3 resultsQ 中的记录和断言表明 resultQ 中还剩下 2 条记录(但只有在第一次 .val 调用之后,如果您注释掉所有断言开始失败)

我用 Dataflow.operator、Pipeline 等尝试了这个并得到了同样的问题。为什么每个输入都绑定到 SessionQ 时,工作没有得到处理?

最后,在 Pipeline 的情况下,有一个 .complete() 方法,如果你在管道中处理闭包 {},它会保持打开状态 (!complete()),但是当你运行像 .binaryChoice() 这样的方法时,它会标记管道已完成,无法添加进一步的操作。为什么这样做?

当然,我不明白那个状态在说什么(不再进行处理),如果您尝试在这种方法之后再执行另一个步骤,则会引发异常。

无论哪种方式 - 我尝试过这样的管道线

Pipeline pipeLine = new Pipeline(Q)
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) }

但是,当您将值绑定到 Q 时,什么都不会发生 - 直到您使用类似的输出

odd.val

当管道突然“运行”并处理存储在 Q 中的所有 DF 项目时。

除了第一个 .val 消耗之外,我没有尝试过启动工作的安排

任何人都可以解释为什么会这样,我必须在这里忽略这一点,但是在读取第一个条目之前“什么都不做”不是我所期望的,并且会使任何大小评估(.iterator.size(),poll()等无效) 类型调用 DataflowWriteChannel 目标。

我很感激对此的任何帮助-我已经为此苦苦挣扎了两天,但一无所获。我也查看了所有 Gpars 测试,它们只是调用 .val 的次数与输入绑定的次数相同 - 所以不要显示我描述的问题。

Vaclav Pech 或任何其他观看这些问题的 Gpars 大师,我将不胜感激任何有关这方面的帮助见解,以帮助我度过这个难关

提前问候

4

1 回答 1

1

在断言大小为 0 之前进行小修改(添加延迟)将显示计算是由写入的数据触发的:

//layout pipeline
sessionQ | evenPredicate   | resultQ
sleep 5000
assert resultQ.iterator().size() == 0
于 2017-01-16T09:39:07.253 回答