1

我有一个 ThreadPoolExecutor,它管理 2 个 Worker 线程(Callables)。我的 Worker 线程的工作是监听 MQ 并将内容写入文件并更新计数。我的要求是当所有线程读取的消息总数等于提交的消息总数时,我需要通过传递 STOP 消息来停止我的工作线程。如何在父线程仍在运行时从我的工作线程中获取计数。

我检查了 ThreadPoolExecutor 的 API,它只有 beforeExecute 和 afterExecute。afterExecute 对我不起作用,因为我的线程应该在到达 afterExecute 之前先停止。

任何建议表示赞赏。

谢谢

4

3 回答 3

2

我认为,如果您的工作线程正在主动轮询新消息或正在运行多个请求,那么您走错了路。

的想法Threadpoolexecutor是, aCallable代表一项任务(在您的示例中,即:处理一条消息)。如果你这样做,你可以ThreadPoolExecutor.shutdown()用来关闭你的工作线程。如果你这样做,你只需要一个中央实例,它会跟踪预期的消息数量和处理的消息,并在之后执行关闭方法processed == expected

编辑:如果您遵循该模式,则不必监视线程状态,因为TPE.shutdown()不会阻止执行已提交的任务,但只会禁止进一步提交。

于 2013-08-08T20:13:04.947 回答
1

您可以使用静态变量并从父线程和工作线程同步访问此静态变量。

让我们说,你有一个全局静态变量计数

现在,

private static int count;

public static synchronized void increment() {
    count++;
}

public static synchronized void decrement() {
    count--;
}

现在你是工作线程,每当你从 MQ 获取一个项目并在你的工作线程中处理它时,通过调用同步增量方法来增加它。

当计数等于总数时,您是父线程。提交的项目,执行 ThreadPoolExecutor.shutdown()

于 2013-08-08T20:16:47.443 回答
1

让父线程实例化一个AtomicInteger并将其传递给工作人员。他们可以调用incrementAndGet()它,然后父线程可以在它运行时读取它。您甚至可以让工作线程检查 之后的结果incrementAndGet()是否小于阈值,如果结束,工作线程可以自行停止和/或向其他工作线程发出停止命令;这样,您甚至不需要父线程的参与,工作人员会在达到阈值时自行关闭。

如果您希望父线程阻塞直到它们全部完成,您还可以CountDownLatch使用阈值创建一个初始化,然后countDown()在父线程调用时让每个工作线程调用await()

于 2013-08-08T20:14:50.137 回答