3

我正在并行运行一个函数。为了获得有关工作状态的进度更新,我希望一名但只有一名工作人员定期报告其进度。我对如何做到这一点的自然想法是让工作人员执行的功能检查工作人员的名称,并且仅在名称与特定值匹配时才提供状态更新。但是,我找不到可靠的方法来提前确定这一点。例如,在 Julia 中,有一个简单的myid()函数可以提供工人的 ID(即 1、2 等)。我正在寻找 R 中的等价物。到目前为止,我发现的最好的方法是让每个工人打电话Sys.getpid(). 但是,我不知道编写脚本的可靠方法,以便我提前知道分配给工作人员的 pid 是什么。我要编写的基本功能脚本如下所示,除了我正在寻找的 R 等效于该myid()函数:

library(parallel)

Test_Fun = function(a){
    for (idx in 1:10){
        Sys.sleep(1)
        if (myid() == 1){
            print(idx)
        }
    }
}

mclapply(1:4, Test_Fun, mc.cores = 4)
4

1 回答 1

1

从 R 3.3.2 开始,该parallel包不提供工作人员 ID 功能。也没有提供一种机制来在工作人员开始执行任务之前对其进行初始化。

我建议您通过使用该函数将额外的任务 ID 参数传递给工作mcmapply函数。如果任务数与worker数相等,则task ID可以作为worker ID。例如:

library(parallel)
Test_Fun = function(a, taskid){
    for (idx in 1:10){
        Sys.sleep(1)
        if (taskid == 1){
            print(idx)
        }
    }
}
mcmapply(Test_Fun, 1:4, 1:4, mc.cores = 4)

但如果任务多于工作人员,您将只能看到第一个任务的进度消息。您可以通过在执行第一个任务时初始化每个工作人员来解决这个问题:

WORKERID <- NA  # indicates worker is uninitialized
Test_Fun = function(a, taskid){
    if (is.na(WORKERID)) WORKERID <<- taskid
    for (idx in 1:10){
        Sys.sleep(1)
        if (WORKERID == 1){
            print(idx)
        }
    }
}
cores <- 4
mcmapply(Test_Fun, 1:8, 1:cores, mc.cores = cores)

请注意,这假定mc.prescheduleTRUE,这是默认值。如果mc.prescheduleFALSE并且任务的数量大于工作人员的数量,情况就会更加动态,因为每个任务都由不同的工作进程执行,并且工作人员不会同时执行。

于 2017-02-06T15:25:33.497 回答