4

我有一个案例,其中有一个批量操作正在处理多个项目。处理完所有项目后,我必须将操作状态更新为已完成。项目由多个消费者并行处理。

理论上,处理项目后的消费者可以检查是否没有剩余项目(或此操作的队列中没有消息),但有可能两个消费者(A 和 B)同时完成,他们都检查同时,他们都看到另一个还没有准备好(因为事务还没有提交)——消费者 A 不会看到消费者 B 所做的更改,消费者 B 也不会看到消费者 A 所做的更改,所以没有他们会更新动作状态。我对吗?

如何在没有某种额外的定期检查状态且没有开销的情况下实现这种条件?如果每个操作有数千个项目,定期检查可能会很好,但如果通常有 1-2 个长期运行的项目,则效率非常低。

谢谢!

编辑:简而言之-在处理一组消息后触发某些操作的正确方法是什么,但是:

  • 消息必须并行处理
  • 定期检查是否所有消息都已处理不是答案
4

5 回答 5

2

您可以让每个消费者在完成处理时(以及在提交自己的事务之前)将 are-we-done-yet 消息排入新队列。are-we-done-yet 消息应该与单个消费者一起进入队列;当这个消费者处理一条消息时,它会检查原始队列是否为空。这具有序列化检查的效果,并解决了最初由并行性引起的问题。

在一般情况下,这不是最有效的方法,但由于您提到您只有 1-2 个长期运行的项目,它可能对您有用。我以前在类似的情况下做过这个,效果很好。

于 2013-11-12T17:16:15.960 回答
2

您需要一流的配料设备。仅仅依靠队列的大小是不够可靠的。例如,您可以让一个进程处理一条消息,然后拒绝它,从而将消息放回队列中(以前是“空的”)。

相反,使批次成为一流的概念。考虑发送包含批次中项目数量的“批次开始”消息。然后在处理消息时,它们可以更新批处理状态记录或其他一些设备。批处理状态可以跟踪处理的消息数量、通过的数量、失败的数量等。

当最后一条消息被处理时,它可以通过查看消息处理计数与批处理计数“减 1”(因为它正在运行最后一条消息)来检查批处理状态以查看它是否是“最后一条消息”。

您需要使这个过程原子化,例如,如果您使用 SQL,您将观察获取批处理状态行“FOR UPDATE”,这会将行锁定到您的事务,因此您的比较可以是原子的.

您也可以在该行上放置一个触发器,并检查它是否更符合您的风格。

或者你可以在你的系统上有一个全局对象来为你管理它。各种机制。

但关键是你有一些包罗万象的批处理概念来管理所有的工人。您不能在单个工人级别执行此操作,而且不可靠。

于 2013-11-12T17:41:24.700 回答
1

作为威尔和丹的答案的组合,我建议一个批处理管理队列,其中带有批处理大小计数器的“批处理开始”消息与“消息处理”消息一起到达,由消费者在处理完消息后发送.

它的单一管理消费者可以在处理的消息到达时对它们进行计数,直到它们与批处理大小匹配,并记录批处理完成。

于 2013-11-15T12:14:15.890 回答
0

为了允许出现错误情况,您必须进行定期检查。

例如,假设队列中有两个消费者和一条消息。消费者 1 拿起并开始处理消息。现在消费者 1 意外崩溃,事务被回滚。消息现在需要消费者 2 接收和处理。

因此,消费者 2 在成功处理所有消息之前无法退出。检查这一点的唯一方法是定期检查队列大小,直到它为空。如果消费者 2 在没有更多消息时退出,那么最终消费者 1 的队列中将有未处理的消息必须回滚事务。

于 2013-11-13T15:58:40.480 回答
0

创建一个ActionMonitor负责将Action标记为已完成。不同的ActionConsumer实例将在完成时通知它。当完成的消费者数量与正在运行的消费者数量相同时,ActionMonitorAction标记为已完成。

使用此解决方案,无需添加任何额外的队列或线程。将Action标记为已完成的实际执行将由消耗最后一个元素的同一线程执行。

它会是这样的:

public void ActionMonitor {
    private int numberOfConsumers; // Total number of consumers.
    private int numberOfConsumersFinished;

    public synchronized void consumerFinished() { // Sync could be more efficient.
        numberOfConsumersFinished++;
        if(numberOfConsumers == numberOfConsumersFinished) {
            markTheActionAsFinished();
        }
    }
}

public void ActionConsumer {

    private ActionMonitor actionMonitor;

    public void processElementsInAction() {
        while(moreElementsToProcess()) {
            takeNewElementAndProcessIt();
        }
        actionMonitor.consumerFinished();
    }
}

警告:您需要提前知道有多少消费者。

我希望它有所帮助。

于 2013-11-17T09:38:40.780 回答