19

我需要设计一个 Redis 驱动的可伸缩任务调度系统。

要求:

  • 多个工作进程。
  • 许多任务,但长时间的空闲是可能的。
  • 计时精度合理。
  • 空闲时资源浪费最少。
  • 应该使用同步 Redis API。
  • 应该适用于 Redis 2.4(即即将推出的 2.6 中没有功能)。
  • 不应使用 Redis 以外的其他 RPC 方式。

伪 API schedule_task(timestamp, task_data):. 时间戳以整数秒为单位。

基本思路:

  • 收听列表​​中即将执行的任务。
  • 根据时间戳将任务放入存储桶。
  • 休眠直到最近的时间戳。
  • 如果出现一个新任务,其时间戳小于最接近的一个,请唤醒。
  • 分批处理所有即将到来的时间戳≤现在的任务(假设任务执行速度很快)。
  • 确保并发工作人员不会处理相同的任务。同时,请确保如果我们在处理它们时崩溃,不会丢失任何任务。

到目前为止,我无法弄清楚如何在 Redis 原语中适应它......

有什么线索吗?

请注意,有一个类似的老问题:Delayed execution / scheduling with Redis? 在这个新问题中,我介绍了更多细节(最重要的是,许多工人)。到目前为止,我无法弄清楚如何在这里应用旧答案——因此,这是一个新问题。

4

5 回答 5

12

这是另一种基于其他几个解决方案的解决方案 [1]。它使用 redis WATCH 命令来移除竞态条件,而不使用 redis 2.6 中的 lua。

基本方案是:

  • 将 redis zset 用于计划任务,将 redis 队列用于准备运行的任务。
  • 让调度程序轮询 zset 并将准备好运行的任务移动到 redis 队列中。您可能需要超过 1 个调度程序来实现冗余,但您可能不需要或不需要很多。
  • 拥有尽可能多的工作人员来阻止 redis 队列上的弹出。

我还没有测试过:-)

foo 工作创建者会做:

def schedule_task(queue, data, delay_secs):
    # This calculation for run_at isn't great- it won't deal well with daylight
    # savings changes, leap seconds, and other time anomalies. Improvements
    # welcome :-)
    run_at = time.time() + delay_secs

    # If you're using redis-py's Redis class and not StrictRedis, swap run_at &
    # the dict.
    redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})

schedule_task('foo_queue', foo_data, 60)

调度员看起来像:

while working:
    redis.watch(SCHEDULED_ZSET_KEY)
    min_score = 0
    max_score = time.time()
    results = redis.zrangebyscore(
        SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
    if results is None or len(results) == 0:
        redis.unwatch()
        sleep(1)
    else: # len(results) == 1
        redis.multi()
        redis.rpush(results[0]['queue'], results[0]['data'])
        redis.zrem(SCHEDULED_ZSET_KEY, results[0])
        redis.exec()

foo 工人看起来像:

while working:
    task_data = redis.blpop('foo_queue', POP_TIMEOUT)
    if task_data:
        foo(task_data)

[1] 此解决方案基于 not_a_golfer,http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/ 中的一个以及用于事务的 redis 文档。

于 2013-02-22T03:27:28.423 回答
7

您没有指定您使用的语言。您至少有 3 种替代方法可以做到这一点,而无需至少在 Python 中编写一行代码。

  1. Celery 有一个可选的 redis 代理。 http://celeryproject.org/

  2. resque 是一个非常流行的使用 redis 的 redis 任务队列。 https://github.com/defunkt/resque

  3. RQ 是一个简单而小型的基于 redis 的队列,旨在“从 celery 和 resque 中获取好东西”,并且使用起来更简单。 http://python-rq.org/

如果你不能使用它们,你至少可以看看它们的设计。

但是要回答你的问题 - 你想要的可以用 redis 来完成。实际上,我过去或多或少地写过。

编辑:至于在redis上建模你想要的东西,这就是我要做的:

  1. 使用时间戳对任务进行排队将由客户端直接完成 - 您将任务放入排序集中,时间戳作为分数,任务作为值(参见 ZADD)。

  2. 中央调度程序每 N 秒唤醒一次,检查该集合上的第一个时间戳,如果有任务准备好执行,它会将任务推送到“立即执行”列表。这可以使用 ZREVRANGEBYSCORE 在“等待”排序集上完成,获取时间戳<=now 的所有项目,因此您可以一次获得所有准备好的项目。推送由 RPUSH 完成。

  3. 工作人员在“立即执行”列表中使用 BLPOP,当有事情要做时醒来,然后做他们的事情。这是安全的,因为 redis 是单线程的,没有 2 个工作人员会执行相同的任务。

  4. 完成后,工作人员将结果放回响应队列,由调度程序或其他线程检查。您可以添加“待处理”存储桶以避免失败或类似情况。

所以代码看起来像这样(这只是伪代码):

客户:

ZADD "new_tasks" <TIMESTAMP> <TASK_INFO>

调度员:

while working:
   tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now
   for task in tasks:

       #do the delete and queue as a transaction
       MULTI
       RPUSH "to_be_executed" task
       ZREM "new_tasks" task
       EXEC

   sleep(1)

我没有添加响应队列处理,但它或多或少像工人:

工人:

while working:
   task = BLPOP "to_be_executed" <TIMEOUT>
   if task:
      response = work_on_task(task)
      RPUSH "results" response

编辑:无状态原子调度程序:

while working:

   MULTI
   ZREVRANGE "new_tasks" 0 1
   ZREMRANGEBYRANK "new_tasks" 0 1
   task = EXEC

   #this is the only risky place - you can solve it by using Lua internall in 2.6
   SADD "tmp" task

   if task.timestamp <= now:
       MULTI
       RPUSH "to_be_executed" task
       SREM "tmp" task
       EXEC
   else:

       MULTI
       ZADD "new_tasks" task.timestamp task
       SREM "tmp" task
       EXEC

   sleep(RESOLUTION)
于 2012-06-03T10:53:38.270 回答
3

如果您正在寻找关于 Java 的现成解决方案。雷迪森适合您。它允许使用熟悉的ScheduledExecutorService api 并基于 Redis 队列以分布式方式在Redisson 节点上调度和执行任务(具有 cron-expression 支持)。

这是一个例子。java.lang.Runnable首先使用接口定义一个任务。每个任务都可以通过注入对象访问Redis实例。RedissonClient

public class RunnableTask implements Runnable {

    @RInject
    private RedissonClient redissonClient;

    @Override
    public void run() throws Exception {
        RMap<String, Integer> map = redissonClient.getMap("myMap");
        Long result = 0;
        for (Integer value : map.values()) {
            result += value;
        }
        redissonClient.getTopic("myMapTopic").publish(result);
    }

}

现在可以将其汇总到ScheduledExecutorService

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<?> future = executorService.schedule(new CallableTask(), 10, 20, TimeUnit.MINUTES);

future.get();
// or cancel it
future.cancel(true);

带有 cron 表达式的示例:

executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));

executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));

executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

所有任务都在Redisson 节点上执行。

于 2016-08-24T18:06:01.323 回答
0

一种组合方法似乎是合理的:

  1. 没有新的任务时间戳可能小于当前时间(如果小于则钳位)。假设可靠的 NTP 同步。

  2. 所有任务都在键处进入存储桶列表,并以任务时间戳为后缀。

  3. 此外,所有任务时间戳都转到一个专用的 zset(键和分数——时间戳本身)。

  4. 通过单独的 Redis 列表从客户端接受新任务。

  5. 循环:通过 zrangebyscore ...limit 获取最旧的 N 个过期时间戳。

  6. 新任务列表和获取时间戳列表超时的 BLPOP。

  7. 如果有旧任务,请处理它。如果是新的 - 添加到存储桶和 zset。

  8. 检查已处理的存储桶是否为空。如果是这样 — 从 zset 中删除 list 和 entrt。可能不检查最近过期的存储桶,以防止出现时间同步问题。结束循环。

批判?注释?备择方案?

于 2012-06-03T08:23:38.357 回答
0

卢阿

我做了一些类似于这里建议的东西,但优化了睡眠持续时间以更精确。如果您在延迟的任务队列中插入很少,则此解决方案很好。以下是我使用 Lua 脚本的方法:

local laterChannel = KEYS[1]
local nowChannel = KEYS[2]
local currentTime = tonumber(KEYS[3])

local first = redis.call("zrange", laterChannel, 0, 0, "WITHSCORES")

if (#first ~= 2)
then
    return "2147483647"
end

local execTime = tonumber(first[2])
local event = first[1]

if (currentTime >= execTime)
then
    redis.call("zrem", laterChannel, event)
    redis.call("rpush", nowChannel, event)
    return "0"
else
    return tostring(execTime - currentTime)
end

它使用两个“通道”。laterChannel是一个ZSET并且nowChannel是一个LIST。每当需要执行任务时,事件就会从 移动ZSETLIST。Lua 脚本响应调度程序在下一次轮询之前应该休眠多少 MS。如果ZSET是空的,就永远沉睡。如果是时候执行某事,请不要休眠(即立即再次轮询)。否则,休眠直到执行下一个任务。

那么如果在调度员睡觉的时候添加了一些东西呢?

此解决方案与关键空间事件结合使用。您基本上需要订阅的键,laterChannel并且每当有添加事件时,您都会唤醒所有调度程序,以便他们可以再次轮询。

然后你有另一个使用阻塞 left pop on 的调度程序nowChannel。这表示:

  • 您可以让调度程序跨多个实例(即它正在扩展)
  • 轮询是原子的,因此您不会有任何竞争条件或双重事件
  • 任务由任何空闲的实例执行

有一些方法可以进一步优化这一点。例如,不是返回“0”,而是从 zset 中获取下一项并直接返回正确的睡眠时间。

到期

如果不能使用 Lua 脚本,可以在过期文档上使用键空间事件。订阅频道并在 Redis 驱逐它时接收事件。然后,抓住一把锁。这样做的第一个实例会将其移动到列表(“立即执行”通道)。然后你不必担心睡眠和轮询。Redis 会告诉你什么时候该执行某事。

execute_later(timestamp, eventId, event) {
    SET eventId event EXP timestamp
    SET "lock:" + eventId, ""
}

subscribeToEvictions(eventId) {
    var deletedCount = DEL eventId
    if (deletedCount == 1) {
        // move to list
    }
}

然而,这有它自己的缺点。例如,如果您有很多节点,所有节点都会收到事件并尝试获取锁。但我仍然认为这里提出的任何建议总体上都较少。

于 2021-05-10T11:40:50.890 回答