0

我有一个进程iterate_list。进程 iterate_list 获取一个列表并对列表中的每个项目执行一些操作。运行脚本时,它需要两个输入。它需要处理的列表和项目(它作为消费者从 rabbitmq 队列中获取)

目前,我给了一个 python 脚本整个列表,它遍历每个执行处理(作为一个大块)并在完成后返回。这很好,但是,如果系统重新启动,它会重新启动。

我想知道,我怎样才能使它每次我的 python 脚本处理一个项目时,它返回该项目,我将它从列表中删除,然后将新列表传递给进程。因此,在系统重新启动/崩溃的情况下,nextflow 知道它在哪里停止并可以从那里继续。

import groovy.json.JsonSlurper

def jsonSlurper = new JsonSlurper()
def cfg_file = new File('/config.json')
def analysis_config = jsonSlurper.parse(cfg_file)
def cfg_json = cfg_file.getText()
def list_of_items_to_process = [] 

items = Channel.from(analysis_config.items.keySet())

for (String item : items) {
    list_of_items_to_process << item
    } 

process iterate_list{
    echo true

    input:
    list_of_items_to_process

    output:
    val 1 into typing_cur

    script:
    """
    python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
    """ 
}

process signal_completion{

    echo true

    input:
    val typing_cur

    script:
    """
    echo "all done!"
    """
}

基本上,进程“iterate_list”从消息代理的队列中获取一个“项目”。进程 iterate_list 应该类似于:

process iterate_list{
    echo true

    input:
    list_of_items_to_process

    output:
    val 1 into typing_cur

    script:
    """
    python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
    list_of_items_to_process.remove(<output from python script>)
    """
}

因此,对于每一个,它都会运行,删除它刚刚处理的项目,然后重新启动一个新列表。

initial_list = [1,2,3,4]
after_first_process_completes = [2,3,4]
and_eventually = [] <- This is when it should move on to the next process.
4

1 回答 1

1

看起来您真正想做的是ArrayList从 Nextflow 流程中操作全局。AFAIK,没有办法完全做到这一点。这就是渠道的用途。

目前尚不清楚您是否真的需要从要处理的项目列表中删除任何项目。Nextflow 已经可以使用该-resume选项使用缓存的结果。那么为什么不只传递完整列表和单个项目进行处理呢?

items = Channel.from(['foo', 'bar', 'baz'])

items.into {
    items_ch1
    items_ch2
}

process iterate_list{

    input:
    val item from items_ch1
    val list_of_items_to_process from items_ch2.collect()

    """
    python3.7 process_list_items.py "${item}" '${list_of_items_to_process}'
    """
}

我只能猜测您的 Python 脚本如何使用其参数,但如果您要处理的项目列表只是一个占位符,那么您甚至可以输入要处理的项目的单个元素列表:

items = Channel.from(['foo', 'bar', 'baz'])

process iterate_list{

    input:
    val item from items

    """
    python3.7 process_list_items.py "${item}" '[${item}]'
    """
}
于 2019-10-03T13:13:19.620 回答