我有一个生产者/消费者模式,如下所示
- 固定数量的生产者线程,每个线程都写入自己的 BlockingQueue,通过 Executor 调用
- 单个消费者线程,读取生产者线程
每个生产者都在运行数据库查询并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程会死掉,然后消费者会永远卡住,等待产品队列上的更多结果。
我应该如何构建它以正确处理捕获错误?
我有一个生产者/消费者模式,如下所示
每个生产者都在运行数据库查询并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程会死掉,然后消费者会永远卡住,等待产品队列上的更多结果。
我应该如何构建它以正确处理捕获错误?
我曾经做过类似的事情,并决定使用一个哨兵值,即将死去的生产者线程将从 catch-block 推入队列。您可以推送异常本身(这在大多数情况下都有效),或者为此设置一个特殊对象。无论如何,将异常推送给消费者以进行调试是非常好的。
无论您实际推送到队列中的是什么类,它都应该包含成功/失败/错误成员,以便消费者可以检查失败。
Peter 已经建议只使用一个队列——我不明白如何避免所有轮询应该是任何特定问题——队列上的对象可以具有标识它们来自哪个生产者的成员,以及任何其他元数据(如果需要)。
看来,当生产者死亡时,您唯一的选择就是停止消费者。
为此,您可以使用毒丸。这是生产者在停止时添加的特殊对象,而消费者在收到它时知道要停止。毒丸可以添加到finally块中,因此无论生产者如何被杀死/死亡,它都会被添加。
鉴于您只有一个消费者,我会使用一个队列。这样,您的消费者只会阻止所有生产者死亡的地方。
回答我自己的问题。
我使用了以下课程。它需要一个 Runnable 列表并并行执行它们,如果一个失败,它会中断所有其他的。然后我在生产者和消费者中进行中断处理,以便在中断时优雅地死去。
这很适合我的情况。
感谢所有评论/答案,因为他们给了我一些想法。
// helper class that does the following
//
// if any thread has an exception then interrupt all the others with an eye to cancelling them
// if the thread calling execute() is interrupted then interrupt all the child threads
public class LinkedExecutor
{
    private final Collection<Runnable> runnables;
    private final String name;
    public LinkedExecutor( String name, Collection<Runnable> runnables )
    {
        this.runnables = runnables;
        this.name = name;
    }
    public void execute()
    {
        ExecutorService executorService = Executors.newCachedThreadPool( ConfigurableThreadFactory.newWithPrefix( name ) );
        // use a completion service to poll the results
        CompletionService<Object> completionService = new ExecutorCompletionService<Object>( executorService );
        for ( Runnable runnable : runnables )
        {
            completionService.submit( runnable, null );
        }
        try
        {
            for ( int i = 0; i < runnables.size(); i++ )
            {
                Future<?> future = completionService.take();
                future.get();
            }
        }
        catch ( InterruptedException e )
        {
            // on an interruption of this thread interrupt all sub-threads in the executor
            executorService.shutdownNow();
            throw new RuntimeException( "Executor '" + name + "' interrupted", e );
        }
        catch ( ExecutionException e )
        {
            // on a failure of any of the sub-threads interrupt all the threads
            executorService.shutdownNow();
            throw new RuntimeException( "Execution execution in executor '" + name + "'", e );
        }
    }
}
当队列中没有更多元素一段时间后,您可能会添加一些超时来杀死消费者。
另一种方法可能是让生产者维护一个“活动”标志,并通过将其设置为 false 来表示他们正在死亡。如果生产者连续运行但可能并不总是从数据库中获取结果,则“活动”标志可能是生产者上次报告存活的时间,然后使用超时检查生产者是否可能已经死亡(当最后一次报告时)活着太久了)。