17

我目前面临一个问题,我很确定有一个官方名称,但我不知道在网上搜索什么。我希望如果我描述了我想到的问题和解决方案,有人能够告诉我设计模式的名称(如果有一个与我将要描述的匹配)。

基本上,我想要的是一个作业队列:我有多个创建作业的客户端(发布者),以及一些处理这些作业的工人(消费者)。现在我想将发布者创建的作业分发给各个消费者,这基本上可以使用几乎任何具有跨队列负载平衡的消息队列来实现,例如使用 RabbitMQ 甚至 MQTT 5。

然而,现在事情变得复杂了……每个工作都指向一个外部实体,比如说一个用户。我想要的是按顺序处理单个用户的作业,但并行处理多个用户。我没有要求用户 X 的工作总是交给工人 Y,因为无论如何它们都应该按顺序处理。

现在我可以使用 RabbitMQ 及其一致的哈希交换来解决这个问题,但是当新的工作人员进入集群时我会遇到数据竞争,因为 RabbitMQ 不支持重新定位已经在队列中的作业。

MQTT 5 也不支持这个:这里这个想法被称为“粘性共享订阅”,但这不是官方的。它可能是 MQTT 6 的一部分,也可能不是。谁知道。

我还查看了 NSQ、NATS 和其他一些经纪人。他们中的大多数甚至不支持这种非常具体的场景,而那些确实使用一致哈希的场景,其中存在前面提到的数据竞速问题。

现在,如果代理不将作业排序到队列中,一旦作业到达,问题就会消失,但是如果它会跟踪特定用户的作业是否已经被处理:如果是这样,它应该延迟所有其他作业此用户,但其他用户的所有作业仍应处理。这是,AFAICS,使用 RabbitMQ 等人是不可能的。

我很确定我不是唯一一个有这个用例的人。例如,我可以想到用户将视频上传到视频平台,虽然上传的视频是并行处理的,但单个用户上传的所有视频都是按顺序处理的。

所以,长话短说:我所描述的东西是否以一个通用名称而闻名?诸如分布式作业队列之类的东西?具有任务亲和性的任务调度程序?还是别的什么?我尝试了很多术语,但没有成功。这可能意味着没有解决方案,但正如所说,很难想象我是这个星球上唯一遇到这个问题的人。

有什么我可以寻找的想法吗?并且:是否有任何工具可以实现这一点?有什么协议吗?

PS:只使用预定义的路由键不是一个选项,因为用户 ID(我只是在这里作为一个虚构的例子)基本上是 UUID,所以可能有数十亿个,所以我需要更动态的东西。因此,一致的散列基本上是正确的方法,但如前所述,分布必须逐个工作,而不是预先进行,以避免数据竞争。

4

9 回答 9

19

时间工作流能够以最小的努力支持您的用例。

这是满足您要求的稻草人设计:

  • 使用 userID 作为工作流 ID 向用户工作流发送 signalWithStart 请求。它要么将信号传递给工作流,要么首先启动工作流并将信号传递给它。
  • 对该工作流的所有请求都由它缓冲。时间提供了一个硬保证,即只有一个具有给定 ID 的工作流可以处于打开状态。因此,所有信号(事件)都保证在属于用户的工作流中得到缓冲。在存在任何流程或基础设施故障的情况下,Temporal 会保留工作流中的所有数据(包括堆栈跟踪和局部变量)。所以不需要taskQueue显式地持久化变量。
  • 内部工作流事件循环会一一分派这些请求。
  • 当缓冲区为空时,工作流可以完成。

这是用 Java 实现它的工作流代码(也支持 Go 和 PHP SDK,NodeJS 处于 alpha 阶段):

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

然后是通过信号方法将该任务排入工作流的代码:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = WorkflowOptions.newBuilder()
       .setTaskQueue(TASK_QUEUE)
       .setWorkflowId(task.getUserId())
       .build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = temporalClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = temporalClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    temporalClient.signalWithStart(request);
}

与使用队列进行任务处理相比,Temporal 提供了许多其他优势。

  • 建立了指数重试,具有无限的过期间隔
  • 故障处理。例如,如果在配置的时间间隔内两次更新都无法成功,它允许执行通知另一个服务的任务。
  • 支持长时间运行的心跳操作
  • 能够实现复杂的任务依赖。例如,在不可恢复的故障(SAGA)的情况下实现调用链或补偿逻辑
  • 提供对当前更新状态的完整可见性。例如,当使用队列时,您都知道队列中是否有一些消息,并且您需要额外的数据库来跟踪整体进度。使用 Temporal 记录每个事件。
  • 能够取消进行中的更新。
  • 分布式 CRON 支持

请参阅有关Temporal 编程模型的演示文稿。它提到了 Temporal 的前身 Cadence 项目。

于 2019-06-16T00:13:55.400 回答
3

我想要的是一个工作队列:我有多个创建工作的客户端(发布者),以及一些处理这些工作的工作人员(消费者)。现在我想将发布者创建的作业分发给各个消费者,这基本上可以使用几乎任何具有跨队列负载平衡的消息队列来实现,例如使用 RabbitMQ 甚至 MQTT 5。

然而,现在事情变得复杂了……每个工作都指向一个外部实体,比如说一个用户。我想要的是按顺序处理单个用户的作业,但并行处理多个用户。我没有要求用户 X 的工作总是交给工人 Y,因为无论如何它们都应该按顺序处理。

即使不是这个特定的用例,我在几个月前对(动态)任务调度 [ 0 ] [ 1 ] 进行了调查,但没有出现类似的情况。

我读到的每个调度算法都有一些其他任务共有的属性,比如优先级、年龄、入队时间、任务名称(以及扩展的平均处理时间)。如果您的任务都链接到一个用户,您可以构建一个调度程序,考虑user_id从队列中选择任务。

但是我想,您不想构建自己的调度程序,无论如何这都是浪费,因为根据这种需求的经验,现有的消息队列允许实现您的要求。

总结您需要的要求:

一个调度程序,每个用户同时只运行一个任务。

解决方案是使用分布式锁,例如REDIS distlock并在任务启动之前获取锁,并在任务执行期间定期刷新。如果同一用户的新任务进入并尝试执行,它将无法获取锁并将重新入队。

这是一个伪代码:

def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()

不要忘记刷新释放

采用类似的方法来强制robots.txt爬虫中的每个请求之间的延迟。

于 2019-05-31T09:37:29.093 回答
2

只要锁冲突不经常发生,amirouche 所描述的将是一个简单的解决方案。如果是这样,您将浪费大量时间在您的工作人员获取他们必须拒绝的消息并让消息代理重新排队。

可以很好地解决此类问题的替代方法是 Actor 模型/Actor 框架。一些例子包括 Akka、Orleans、Protoactor 和 Cadence(上面提到过,尽管 Candence 不仅仅是一个演员框架)。这些框架可能会变得非常复杂,但其核心可以确保一次处理单个参与者的消息,但允许同时处理多个参与者(在您的场景中每个用户 ID 都会有一个参与者)。这些框架从您那里抽象出所有消息路由和并发性,从而大大简化了实现,并且从长远来看应该更加健壮/可扩展。

于 2019-09-03T19:36:31.387 回答
1

对每个实体处理订单的硬性要求具有挑战性。

每个发布的任务运行多长时间?如果它们总是很短,您可以通过哈希分配任务,并在每次更改形状时简单地耗尽正在运行的工作池,而不会损失太多生产力。

如果它们运行时间更长,那可能太慢了。在这种情况下,您还可以让工作人员在执行期间为他们消费的每个任务的 user_id 从快速中央服务(如 Redis 或其他东西)取出原子咨询锁。该服务也可以按用户 ID 范围或你有什么进行单独扩展分区。如果在接收任务和执行任务的第一个副作用之间有足够的差距,那么工作人员甚至不需要在它即将提交之前阻止成功获取锁,因此可能不会看到显着增加潜伏。争用*可能很少见:如果您已经在 user_id 上使用了一些一致的散列方案来分配工作,那么它们确实很少见,并且仍然只会在工作池拓扑发生变化时发生。

如果授予锁以先到先服务的顺序提供服务,并且请求锁的速度比工作池拓扑更改更快(也就是说,工作人员在收到来自发布者的作业后立即排队等待锁),这可能即使拓扑变化很快,也可以为您提供很好的排序保证。

编辑:

*我最初写的是“失败”;不完全是我的意思。这个想法是,除非拓扑发生变化,否则这个锁服务几乎不会遇到任何锁争用,因为给定用户的任务总是会正常发送给同一个工作人员。

**另一种可能性:您也可以在仅部分工作池排水的情况下提供良好的保证。如果没有用户级别的咨询锁,如果您使用一致的哈希方案来分发任务,并且您可以为已分派任务的完成维持一个低水位线,您可以推迟启动目标工作人员与当前执行的最早任务开始时不同的任务(即,仅为分配的工作人员更改的用户排空正在运行的任务)。这是相当多的额外复杂性。如果您可以有效地跟踪低水位标记并且您没有长时间运行的任务的长尾,那么它可能是一个不错的选择,可以让您省略锁定服务。但是,在撰写本文时,我不清楚这是否会比锁便宜。低水位线通常并不便宜以可靠地实施,并且工人在错误的时间死亡可能会延迟整个 1/N 队列的处理,这些队列改变了工人,而不仅仅是任务在工人身上进行中的用户它死的时间。

于 2019-05-31T17:23:47.417 回答
1

Apache Qpid代理支持称为消息组的功能,其中路由键和工作程序之间的关系是动态的并且基于当前流量。

消费排序意味着代理将不允许未确认的消息发送给给定组的多个消费者。

这意味着在给定时间只有一个消费者可以处理来自特定组的消息。当消费者确认其所有获取的消息时,代理可以将该组中的下一条待处理消息传递给不同的消费者。

这可以更好地利用工人:

请注意,不同的消息组不会相互阻止传递。例如,假设一个队列包含来自两个不同消息组的消息——比如组“A”和组“B”——并且它们被排入队列,使得“A”的消息位于“B”的前面。如果组“A”的第一条消息正在被客户端消费,那么剩余的“A”消息被阻塞,但“B”组的消息可供其他消费者消费——即使它位于队列中组“A”的“后面”。

与其他经纪人相比,此功能仍可能以显着的性能价格出现。这些天对 Qpid 的兴趣不大4 5

编辑:还有其他代理也提供此功能:ActiveMQActiveMQ Artemis EDIT2:事实证明,ActiveMQ 和 Artemis 中的“消息组”工作方式不同 - 将组分配给工作人员是静态的(粘性)而不是动态的。

于 2020-09-23T12:48:54.693 回答
1

Kafka 完全支持您的需求。您需要配置一个密钥,kafka 将确保所有具有相同密钥的消息都将按顺序处理。

于 2020-10-17T17:19:02.003 回答
0

通过搜索“具有类别排序的作业队列”,我能够找到关于您所描述的那种行为的讨论

不幸的是,他们似乎无法解决您的问题。

有一个对先前问题的答案,该问题建议不要将任何类型的消息代理服务用于任何类型的订单敏感或业务逻辑敏感任务,原因可能适用于您正在做的事情,也可能不适用于您正在做的事情。它还指出了一种技术,它似乎可以做你想做的事情,但可能无法很好地适应手头的任务。

如果您有粘性选项,它将巧妙地解决您的问题,并且将额外的低效率降至最低。当然,粘性有其自身的失败模式。没有理由认为您会找到一个实现您所做的确切权衡的实现。

我假设,因为您在这里提出了这个问题,所以每个用户的顺序性很重要。在您给出的示例中,对于处理上传的视频平台,违反顺序性没什么大不了的。更广泛地说,大多数需要大吞吐量负载平衡作业队列的人不需要对事物处理顺序的强有力保证。

如果您最终需要自己构建东西,那么您将有很多选择。我得到的印象是您期待巨大的吞吐量、高度并行化的架构和低用户 ID 冲突率。在这种情况下,您可能会考虑维护先决条件列表:
当有新任务进入时,平衡器会搜索所有进程中、已分配和尚未分配的作业,以查找与作业键 (user_id) 匹配的任何作业。
如果存在现有匹配项,则将新作业添加到尚未分配的列表中,其中最旧的作业共享其密钥作为先决条件。
每次工作完成时,工作人员都需要检查尚未分配的列表,看看它是否刚刚完成了任何人的先决条件。如果是这样,工人可以标记该子作业以进行分配,或者只处理子作业本身。
当然,这有其自身的故障模式;你必须做出取舍。

于 2019-05-25T01:23:12.063 回答
-2

Kafka 可以提供帮助,因为它可以将消息存储一段时间,因此您可以再次轮询它们

于 2019-05-29T15:48:29.840 回答
-2

如果我正确理解您的场景,我相信您所描述的功能与消息会话Azure Service Bus中的工作方式非常相似。

在将它们推入队列之前,您基本上将SessionId消息的属性设置为。UserId

每个消费者将一个接一个地锁定一个会话处理消息,并且这些消息将属于同一个用户。完成后,消费者可以继续进行下一个可用会话。

此外,Azure Functions最近发布了 Service Bus Sessions 支持,该支持处于预览阶段,但允许您轻松实现所有这些。

不幸的是,我不太熟悉这个功能是否存在于一个开源替代品中,但我希望这会有所帮助。

于 2019-05-31T18:24:52.977 回答