0

我有一个情况,我有 2 个阻塞队列。首先我插入一些我执行的任务。当每个任务完成时,它会将一个任务添加到第二个队列中,并在那里执行它们。

所以我的第一个队列很简单:我只是检查以确保它不为空并执行,否则我会中断():

public void run() {
    try {
        if (taskQueue1.isEmpty()) {
            SomeTask task = taskQueue1.poll();
            doTask(task);
            taskQueue2.add(task);
        }
        else {
            Thread.currentThread().interrupt();
        }
    }

    catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

第二个我执行以下操作,如您所知,它不起作用:

public void run() {
    try {
        SomeTask2 task2 = taskQueue2.take();
        doTask(task2);
    }

    catch (InterruptedException ex) {

    }
    Thread.currentThread().interrupt();

}

你将如何解决它,以便第二个 BlockingQueue 不会在 take() 上阻塞,但只有在它知道没有更多项目要添加时才完成。如果第二个线程可以看到第一个阻塞队列,并检查它是否为空并且第二个队列也为空,那么它将中断。

我也可以使用 Poison 对象,但更喜欢别的东西。

注意:这不是确切的代码,只是我在这里写的:

4

2 回答 2

2

我无法弄清楚你在这里实际想要做什么,但我可以说interrupt()你的第run()一种方法要么没有意义,要么是错误的。

  • 如果您run()在自己的Thread对象中运行该方法,那么该线程无论如何都将退出,因此中断它没有意义。

  • 如果您run()在带有线程池的执行程序中运行该方法,那么您很可能根本不想杀死线程或关闭执行程序……此时。如果你确实想关闭执行器,那么你应该调用它的关闭方法之一。


例如,这是一个版本,您似乎正在做的事情没有所有中断的东西,也没有线程创建/破坏的搅动。

public class TaskExecutor {

    private ExecutorService executor = new ThreadPoolExecutorService(...);

    public void submitTask1(final SomeTask task) {
        executor.submit(new Runnable(){
            public void run() {
                doTask(task);
                submitTask2(task);
            }
        });
    }

    public void submitTask2(final SomeTask task) {
        executor.submit(new Runnable(){
            public void run() {
                doTask2(task);
            }
        });
    }

    public void shutdown() {
        executor.shutdown();
    }
 }

如果您想为任务单独排队,只需创建并使用两个不同的执行程序。

于 2012-01-04T01:23:28.747 回答
1

你听起来好像处理第一个队列的线程知道一旦它的队列被耗尽,就没有更多的任务来了。这听起来很可疑,但无论如何我都会相信你并提出解决方案。

定义AtomicInteger对两个线程可见的。将其初始化为正数

定义第一个线程的操作如下:

  • 循环播放Queue#poll()
  • 如果Queue#poll()返回 null,AtomicInteger#decrementAndGet()则调用共享整数。
    • 如果AtomicInteger#decrementAndGet()返回零,则通过 中断第二个线程Thread#interrupt()。(这处理没有物品到达的情况。)
    • 无论哪种情况,都退出循环。
  • 否则,处理提取的项目,调用AtomicInteger#incrementAndGet()共享整数,将提取的项目添加到第二个线程的队列中,然后继续循环。

定义第二个线程的操作如下:

  • 循环阻塞BlockingQueue#take()
  • 如果BlockingQueue#take()throws InterruptedException,捕获异常,调用Thread.currentThread().interrupt()并退出循环。
  • 否则,处理提取的项目。
  • 调用AtomicInteger#decrementAndGet()共享整数。
    • 如果AtomicInteger#decrementAndGet()返回零,则退出循环。
    • 否则,继续循环。

在尝试编写实际代码之前,请确保您理解了这个想法。约定是第二个线程继续等待其队列中的更多项目,直到预期任务数达到零。此时,生产线程(第一个)将不再将任何新项目推入第二个线程的队列,因此第二个线程知道停止为其队列提供服务是安全的。

没有任务到达第一个线程的队列时,就会出现棘手的情况。由于第二个线程只在处理一个项目递减和测试计数,如果它永远没有机会处理任何项目,它永远不会考虑停止。我们使用线程中断来处理这种情况,代价是第一个线程的循环终止步骤中的另一个条件分支。幸运的是,该分支只会执行一次。

有很多设计可以在这里工作。我只是描述了一个只引入了一个额外的实体——共享原子整数——但即便如此,它还是很繁琐。我认为使用毒丸会更清洁,尽管我承认既不Queue#add()也不BlockingQueue#put()接受 null 作为有效元素(由于Queue#poll()的返回值合同)。否则很容易将 null 用作毒丸

于 2012-01-04T01:42:13.603 回答