1

我想以与输入相同的顺序从 Nextflow 过程中收集结果。

我知道我可以简单地通过所有流程传递来自所有渠道的值。这将确保对一起传递给所有进程。但是,当您开始添加多个进程时,该解决方案效果不佳,因为它破坏了这些进程并行运行的能力。例如,在提供的示例代码中,如果您要添加一个 add_twenty 进程,然后从 add_ten、add_twenty 和 vals2 收集输出。

我玩过的另一种可能的解决方案是为原始通道中的每个值添加一个键,这实际上将原始通道转换为字典(即哈希)。但我无法让它发挥作用。如有必要,我可以提供一个例子。

我创建了一个玩具示例,其中创建了两个通道,将一个发送到一个进程,然后将处理后的输出和一个原始通道发送到一个新进程。

vals1 = Channel.from(1,2,3,4,5)
vals2 = Channel.from(1,2,3,4,5)


process add_ten {
    input:
    val(vals1)

    output:
    val(new_int) into new_vals1

    exec:
    new_int = vals1 + 10
}

process pair {
    echo true

    input:
    val(new_vals1)
    val(vals2)

    script:
    """
    echo "${new_vals1}, ${vals2}"
    """
}

我希望看到的是这样的数字匹配:

11, 1
12, 2
13, 3
14, 4
15, 5

即使这些线是混乱的,只要对持续存在就可以了。例如,

14, 4
11, 1
13, 3
15, 5
12, 2

但是,我看到的是这样的:

15, 1
13, 2
11, 3
12, 4
14, 5
4

1 回答 1

2

您可以通过使用元组和 nextflow 组合运算符来做到这一点:

https://www.nextflow.io/docs/latest/operator.html#combine

这是一个例子:

vals1 = Channel.from([1, 'the'], [2, 'brown'], [3, 'jumps'], [4, 'a'], [5, 'fox'])
vals2 = Channel.from([5,'.'], [4, 'lazy'], [3, 'over'], [2, 'fox'], [1, 'quick'])

vals1
  .combine(vals2, by: 0)
  .println()

当你运行它时,使用选项-ansi-log false. 您的示例经过一些更改,如下所示:

vals1 = Channel.from(1,2,3,4,5)
vals2 = Channel.from(1,2,3,4,5)

i=0; vals1.map{[i++, it]}.view().set{keyed_vals1}
j=0; vals2.map{[j++, it]}.view().set{keyed_vals2}

process add_ten {

  input: set val(key), val(vals1) from keyed_vals1
  output: set val(key), val(new_vals1) into new_vals1

  exec: new_vals1 = vals1 + 10
}

process pair {
  echo true
  tag "$key $one $two"

  input: set val(key), val(one), val(two) from new_vals1.combine(keyed_vals2, by: 0).view()

  script: "echo '${key} ${one} ${two}'"
}
于 2019-07-26T16:33:37.860 回答