0

问题:我需要控制由 foreach 循环并行处理任务的执行顺序。不幸的是,foreach 不支持这一点。

解决方案:使用 doRedis 使用数据库来保存在 foreach 循环中执行的所有任务。为了控制顺序,我想通过 setGetTask 覆盖 getTask 以根据预先指定的顺序获取任务。虽然我找不到太多关于如何做到这一点的文档。

附加信息:

  1. 在redis 文档中有一个关于 setGetTask 的小段落和一个示例 。

    getTask <- function ( queue , job_id , ...)
    {
    
      key <- sprintf("
      redisEval("local x=redis.call('hkeys',KEYS[1])[1];
                   if x==nil then return nil end;
                   local ans=redis.call('hget',KEYS[1],x);
                   redis.call('hdel',KEYS[1],x);i
                   return ans",key)
    }
    
    setGetTask(getTask)
    

    虽然我认为文档中的代码在语法上不正确(缺少恕我直言“和右括号”)“)。我认为这在 CRAN 上是不可能的,因为文档的代码是在提交时执行的。

  2. 更改 getTask 函数不会改变有关工人获取任务的任何内容(即使在 redisEval 中引入明显的废话,例如将其更改为 redisEval("dddddddddd(((")

  3. 从源代码安装软件包后,我只能访问 setGetTask 函数(我从1.1.1 版的官方 CRAN 软件包页面下载了该软件包(恕我直言,这与直接从 CRAN 安装它没有区别)

数据:要执行的任务的数据框如下所示:

taskName;taskQueuePosition;parameter1;paramterN
taskT;1;val1;10
taskK;2;val2;8
taskP;3;val3;7
taskA;4;val4;7

我想使用'taskQueuePosition'来控制顺序,应该先执行编号较小的任务。

问题:

  1. 有人知道我可以从哪里获得有关使用 doRedis 或 setGetTask 执行此操作的更多信息的任何来源吗?
  2. 有人知道我需要如何更改 getTask 以实现上述目的吗?
  3. 还有其他聪明的想法来控制 foreach 循环中的执行顺序吗?最好是这样在某些时候我可以使用 doRedis 作为并行后端(更改这将意味着由于复杂的技术基础设施原因而对处理进行重大更改)。

代码(便于复制):

下面假设redis-server在本地机器上启动。

Redis 数据库填充:

library(doRedis)
library(foreach)

options('redis:num'=TRUE) # needed for proper execution

REDIS_JOB_QUEUE = "jobs"
registerDoRedis(REDIS_JOB_QUEUE)

# filling up the data frame
taskDF = data.frame(taskName=c("taskT","taskK","taskP","taskA"),
           taskQueuePosition=c(1,2,3,4),
           parameter1=c("val1","val2","val3","val4"),
           parameterN=c(10,8,7,7))

foreach(currTask=iter(taskDF, by='row'), 
        .verbose = T
) %dopar% {
  print(paste("Executing task: ",currTask$taskName))
  Sys.sleep(currTask$parameterN)
}

removeQueue(REDIS_JOB_QUEUE)

工人:

library(doRedis)
REDIS_JOB_QUEUE = "jobs"

startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE)
4

1 回答 1

0

我可以解决问题,现在可以控制任务执行的顺序。

附加信息:

1.文档中似乎有一个错字,导致 getTask 示例无法正常工作。通过考虑包中文件 task.R 中 default_getTask 函数的形式,它应该看起来可能类似于:

getTaskDefault <- function ( queue , job_id , ...)
{
  key <- sprintf("%s:%s",queue, job_id)
  return(redisEval("local x=redis.call('hkeys',KEYS[1])[1];
                   if x==nil then return nil end;
                   local ans=redis.call('hget',KEYS[1],x);
                   redis.call('set', KEYS[1] .. '.start.' .. x, x);
                   redis.call('hdel',KEYS[1],x);
                   return ans",key))
}

函数第一行中第一个百分号后面的字母似乎丢失了。这可以解释括号和引号的数量不均匀。

2) setGetTask 对我仍然没有任何影响。当我在填充数据库时通过 .option 设置 getTask 函数时(就像它在包的小插图中描述的那样),它被成功调用。

3) 2)的信息表示我不需要getTask函数,所以我可以使用CRAN的包。

- - - 问题 - - -

1) doRedis 小插图描述了如何成功设置自定义 getTask。

2和3)当getTask函数中的LUA脚本如下修改时,任务以提交的方式从数据库中提取。这不正是我所要求的,但由于时间限制以及我(或更好地)不是关于 LUA 脚本的第一个想法的事实,它是一个令人满意的解决方案来控制 taskQueuePosition 列的提交顺序。

getTaskInOrder <- function ( queue , job_id , ...)
{

  key <- sprintf("%s:%s",queue, job_id)
  return(redisEval("

        local tasks=redis.call('hkeys',KEYS[1]); -- get all tasks

        local x=tasks[1];           -- get first task available task
        if x==nil then              -- if there are no tasks left, stop processing
          return nil 
        end;  

        local xMin = 65535;         -- if we have more tasks than 65535, getting the 
        -- task with the lowest taskID is not guaranteed to be the first one
        local i = 1;
        -- local iMinFound = -1;
        while (x ~= nil) do         -- search the array until there are no tasks left
        -- print('x: ',x)
        local xNum = tonumber(x);
        if(xNum<xMin) then
          xMin = xNum;
          -- iMinFound = i;
        end
        i=i+1;
        -- print('i is now: ',i);
        x=tasks[i];
        end
        -- print('Minimum is task number',xMin,' found at i ', iMinFound)
        x=tostring(xMin)            -- convert it back to a string (maybe it would 
                                    -- be better to keep the original string somewhere, 
                                    -- in case we loose some information whilst converting to number)

        -- print('x is now:',x);
        -- print(KEYS[1] .. '.start.' .. x, x);
        -- print('');
        local ans=redis.call('hget',KEYS[1],x);
        redis.call('set', KEYS[1] .. '.start.' .. x, x);
        redis.call('hdel',KEYS[1],x);
        return ans",key))
}

重要提示:我注意到如果一个任务被中止,顺序被搞砸了,重新提交的任务(即使任务号保持不变),将在最初提交的任务之后执行。这对我来说没问题。

------ 代码(便于复制):------

这导致以下代码示例(任务数据框中有 12 个条目,而不是原来的 4 个):

Redis 数据库填充:

library(doRedis)
library(foreach)

options('redis:num'=TRUE) # needed for proper execution

REDIS_JOB_QUEUE = "jobs"

getTaskInOrder <- function ( queue , job_id , ...)
{
  ...like above
}

registerDoRedis(REDIS_JOB_QUEUE)

# filling up the data frame already in order of tasks to be executed
# otherwise the dataframe has to be sorted by taskQueuePosition
taskDF = data.frame(taskName=c("taskA","taskB","taskC","taskD","taskE","taskF","taskG","taskH","taskI","taskJ","taskK","taskL"),
       taskQueuePosition=c(1,2,3,4,5,6,7,8,9,10,11,12),
       parameter1=c("val1","val2","val3","val4","val1","val2","val3","val4","val1","val2","val3","val4"),
       parameterN=c(5,5,5,4,4,4,4,3,3,3,2,2))

foreach(currTask=iter(taskDF, by='row'), 
        .verbose = T,
        .options.redis = list(getTask = getTaskInOrder
) %dopar% {
  print(paste("Executing task: ",currTask$taskName))
  Sys.sleep(currTask$parameterN)
}

removeQueue(REDIS_JOB_QUEUE)

工人:

library(doRedis)
REDIS_JOB_QUEUE = "jobs"

startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE)

另一个注意事项:以防万一您像我一样处理长时间的工作,请注意redis 1.1.1(CRAN 上的当前版本)中的一个错误,这会导致任务被重新提交(由于超时),尽管工作人员仍在工作在他们。

于 2017-01-18T11:33:47.303 回答