1

我有一个作业处理系统,其中每个作业都包含数千个需要不同策略才能完成的单独任务。单个任务构成了整个工作。如果所有任务都已完成,则将作业标记为成功完成并执行其他步骤,如果任何任务失败,则必须将作业标记为失败并执行其他步骤,如果作业超时,则必须标记作业失败并采取其他步骤。

一旦接收到作业的所有结果,就可以获取下一个作业。当前正在处理作业时,不应获取下一个作业。

这是流程的样子:

在此处输入图像描述

Job Polling Verticle 将作业发布到事件总线,Job Processing Verticle 将每个任务发布到事件总线。当作业策略完成时,它将任务结果发布到事件总线。

问题是我不知道在此模型中确定所有任务何时完成的正确方法。所有 Verticle 都是无状态的,Job Processing Verticle 不等待任何未来,即使 Job Results Verticle 是有状态的,它也不知道它应该期待多少结果。

我能想到的唯一方法是拥有一个全局有状态对象。但我不认为这是一个好的设计。

在此处输入图像描述

此外,我需要知道 Job 何时超时。也就是说,它运行的时间比它应该运行的时间长,我需要考虑它是失败的,记录它,然后继续。

我可以对全局状态执行此操作,但我再次认为这不是正确的解决方案。

这种垂直模式对我想要做的事情有意义吗?

4

1 回答 1

3

首先,让我尝试解决您的问题。然后我会尝试解释这个设计有什么问题。

问题是我不知道在此模型中确定所有任务何时完成的正确方法。所有 Verticle 都是无状态的,Job Processing Verticle 不等待任何未来,即使 Job Results Verticle 是有状态的,它也不知道它应该期待多少结果。

解决方案可能是引用计数垂直。每个工作人员都应该在它开始时和完成时发出一个start messageon 事件总线。即使你有扇出(那些是你不知道有多少工人的情况),计数 verticle 会知道这一点。在您的图表中,“作业后处理垂直”是一个很好的候选者。它可以保持一个计数器,只有当它达到零时,它才应该开始下一个工作。这也有助于避免实际共享一些内存引用。jobIdend messagejobId

此外,我需要知道作业何时超时。也就是说,它运行的时间比它应该运行的时间长,我需要考虑它是失败的,记录它,然后继续。

在同一个 verticle 中,您可以在每次获得新的start message. 如果你得到end message,取消计时器。否则,取消当前作业并重新开始。

现在,该解决方案将起作用,但该设计有两个主要缺陷。一个事实是,您似乎将所有流程都保留在内存中。如果您的应用程序崩溃,所有进度都会丢失,并且不清楚您如何记录它。也许 DB 中的轮询Jobs表实际上会更好,因为无论如何您的作业执行都是顺序的。

第二点是所有这些超时和引用计数都是结构化并发的自制实现。也许你应该看看像 Kotlin coroutines 这样的东西,它会为你处理很多问题。

于 2019-09-19T10:20:14.637 回答