0

我在 Nextflow 中有一个 DSL2 工作流程,如下所示:


nextflow.enable.dsl=2

// process 1, mutually exclusive with process 2 below
process bcl {

    tag "bcl2fastq"
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/Stats/*'
    publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
    publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
    beforeScript 'export PATH=/opt/tools/bcl2fastq/bin:$PATH'

    input:
        path runfolder
        path samplesheet

    output:
        path 'fastq/Stats/', emit: bcl_ch
        path 'fastq/**fastq.gz', emit: fastqc_ch
        path 'InterOp/*', emit: interop_ch
        path 'Run*.xml'
    script: 
        // processing omitted
    }

// Process 2, note the slightly different outputs
process bcl_convert {
tag "bcl-convert"
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/Reports/*'
    publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
    publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
    beforeScript 'export PATH=/opt/tools/bcl-convert/:$PATH'

    input:
        path runfolder
        path samplesheet

    output:
        path 'fastq/Reports/', emit: bcl_ch
        path 'fastq/**fastq.gz', emit: fastqc_ch
        path 'InterOp/', emit: interop_ch
        path 'Run*.xml'

    script:
        // processing omitted
}

// downstream process that needs either the first or the second to work, agnostic
process fastqc {
    cpus 12

    publishDir "${params.outdir}/", mode: "copy"

    module 'conda//anaconda3'
    conda '/opt/anaconda3/envs/tools/'

    input:
        path fastq_input
    output:
        path "fastqc", emit: fastqc_output

    script:
    """
    mkdir -p fastqc
    fastqc -t ${task.cpus} $fastq_input -o fastqc
    """

}

现在我有一个变量,params.bcl_convert可用于从一个流程切换到另一个流程,我将工作流程设置如下:

workflow {
    runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")

    runfolder = Channel.fromPath(runfolder_repaired, type: 'dir')
    sample_data = Channel.fromPath(params.samplesheet, type: 'file')

    if (!params.bcl_convert) {
       bcl(runfolder, sample_data)
    } else {
        bcl_convert(runfolder, sample_data)
    }

    fastqc(bcl.out.mix(bcl_convert.out)) // Problematic line
}

问题在于有问题的行:无论生成它的过程如何,我都不确定如何(以及是否可能)fastqc获得bcl2fastqor bcl_convert(但只有,而不是其余的)的输入。 fastq_ch

我尝试过的一些事情包括(受https://github.com/nextflow-io/nextflow/issues/1646启发,但它使用了进程的输出):

    if (!params.bcl_convert) {
       def bcl_out = bcl(runfolder, sample_data).out
    } else {
        def bcl_out = bcl_convert(runfolder, sample_data).out
    }

    fastqc(bcl_out.fastq_ch)

Variable "runfolder" already defined in the process scope但是,即使以与帖子类似的方式使用该方法,此编译也会失败:

def result_bcl2fastq = !params.bclconvert ? bcl(runfolder, sample_data): Channel.empty()
def result_bclconvert = params.bclconvert ? bcl_convert(runfolder, sample_data): Channel.empty()

我考虑过在单个脚本中使用条件,但是两个进程的输出不同,所以这实际上是不可能的。我让它工作的唯一方法是复制所有输出,例如:

if (!params.bcl_convert) {
   bcl(runfolder, sample_data)
   fastqc(bcl.out.fastqc_ch)
} else {
   bcl_convert(runfolder, sample_data)
   fastqc(bcl_convert.out.fastqc_ch
}

然而,这在我看来是不必要的复杂化。我想做的事真的可能吗?

4

1 回答 1

0

经过大量的试验和错误,我能够弄清楚这一点。

将变量分配给流程输出的行为类似于.out所述流程的属性。所以我为两个独占进程设置了相同的变量,设置了相同的输出(如问题所示),然后直接访问它们而不使用.out

workflow {

    runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")

    runfolder = Channel.fromPath(
        runfolder_repaired, type: 'dir')

    sample_data = Channel.fromPath(
        params.samplesheet, type: 'file')

    if (!params.bcl_convert) {
       bcl_out = bcl2fastq(runfolder, sample_data)
    } else {
       bcl_out = bcl_convert(runfolder, sample_data)
    }
    fastqc(bcl_out.fastqc_ch)
}
于 2021-08-09T12:48:47.950 回答