2

我正在对 NextFlow 执行分散收集操作。

它如下所示:

reads = PATH+"test_1.fq"
outdir = "results"

split_read_ch = channel.fromFilePairs(reads, checkIfExists: true, flat:true ).splitFastq( by: 10, file:"test_split" )

process Scatter_fastP {
    tag 'Scatter_fastP'
    publishDir outdir

    input:
    tuple val(name), path(reads) from split_read_ch
    
    output:
    file "${reads}.trimmed.fastq" into gather_fatsp_ch

    script:
    """
    fastp -i ${reads} -o ${reads}.trimmed.fastq
    """
}


gather_fatsp_ch.collectFile().view().println{ it.text }

我使用 Nextflow ( https://www.nextflow.io/docs/latest/tracing.html )提出的所有基准测试选项运行此代码: nextflow run main.nf -with-report nextflow_report -with-trace nextflow_trace -with-timeline nextflow_timeline -with-dag nextflow_dag.html

在这些跟踪文件中,我可以找到 10 个 Scatter_fastP 进程的资源和速度。但我还想衡量创建渠道的资源split_read_ch和速度。gather_fastp_ch

我试图将频道的创建包含在流程中,但我找不到使其工作的解决方案。有没有办法将通道创建包含到跟踪文件中?还是我还没有找到将这些渠道创建到流程中的方法?

预先感谢您的帮助。

4

1 回答 1

2

尽管 Nextflow 可以解析 FASTQ 文件并将它们拆分为较小的文件等,但通常最好将这些操作传递给另一个进程或一组进程,尤其是当您的输入 FASTQ 文件很大时。这在两个方面是有益的:(1) 您的主要 nextflow 流程不需要努力工作,以及 (2) 您可以在 nextflow 报告中获得精细的任务流程统计信息。

以下示例使用 GNU split 来拆分输入 FASTQ 文件,并使用groupTuple()运算符收集输出,并groupKey()尽快将收集的值流式传输。您需要适应您的非压缩输入:

nextflow.enable.dsl=2

params.num_lines = 40000
params.suffix_length = 5

process split_fastq {

    input:
    tuple val(name), path(fastq)

    output:
    tuple val(name), path("${name}-${/[0-9]/*params.suffix_length}.fastq.gz")

    shell:
    '''
    zcat "!{fastq}" | split \\
        -a "!{params.suffix_length}" \\
        -d \\
        -l "!{params.num_lines}" \\
        --filter='gzip > ${FILE}.fastq.gz' \\
        - \\
        "!{name}-"
    '''
}

process fastp {

    input:
    tuple val(name), path(fastq)

    output:
    tuple val(name), path("${fastq.getBaseName(2)}.trimmed.fastq.gz")

    """
    fastp -i "${fastq}" -o "${fastq.getBaseName(2)}.trimmed.fastq.gz"
    """
}

workflow {

    Channel.fromFilePairs( './data/*.fastq.gz', size: 1 ) \
        | split_fastq \
        | map { name, fastq -> tuple( groupKey(name, fastq.size()), fastq ) } \
        | transpose() \
        | fastp \
        | groupTuple() \
        | map { key, fastqs -> tuple( key.toString(), fastqs ) } \
        | view()
}
于 2021-02-10T04:12:39.740 回答