0

Using NextFlow (DSL=2), I would like to use each line of a file as a streaming input of my workflow.

nextflow.enable.dsl=2

process bar {
    input: val data
    output: val result
    exec:
    result = data.toUpperCase()
}

workflow {
    myFile = file('input_test.txt')
    myReader = myFile.newReader()
    myFile.withInputStream {
        String line
        while( line = myReader.readLine() ) {
            channel.from(line) | bar | view
        }
    }
}

I face the problem that I can only use the "bar" process once: Process 'bar' has been already used -- If you need to reuse the same component include it with a different name or include in a different workflow context

I have also tried to create a subworkflow that takes the channel from line and call bar.

Is there a way to use streamed data as an input using Nextflow?

Note: my final goal is not just to apply an upper case function. I would like to link several complex processes on the data stream.

Thank you!

4

1 回答 1

1

Your example code looks like an anti-pattern - it will try to create a new channel for each line in your input file. Instead, have a look at the splitting operators, especially the splitText operator:

workflow {
    Channel.fromPath('input_test.txt')
        | splitText { it.trim() } \
        | bar \
        | view()
}

If the above doesn't help, please describe exactly what you want to do with each line in your input file.

于 2021-03-03T02:38:42.267 回答