0

有 N 个线程,每个线程在自己的成员变量“status”中将状态设置为 0 或 1。在调用者中,如果任何一个线程的状态为 1,则整体状态必须变为 1。

这些线程以批处理模式运行。因此,如果任何失败都需要再次运行。我需要想出一些处理失败的方法,以便剩余的线程也立即停止。

然而,线程是可运行的,而不是可调用的。

我们可以使用 volatile 布尔变量在线程之间进行通信以发出故障信号。但是,我需要知道如何从调用者的可运行线程中读取单个状态。

4

2 回答 2

3

有几种方法可以做到这一点,我敢肯定。一种方法是使用完成服务。这是一些(粗略测试过的)代码:

public void allSucceed(final List<MyRunnable> tasks) {
    if (tasks == null || tasks.size() == 0)
        return;

    int status = 0;

    boolean success = true;
    do {
        final ExecutorService executorService = Executors
                .newFixedThreadPool(tasks.size());
        final CompletionService<MyRunnable> completionService = new ExecutorCompletionService<MyRunnable>(
                executorService);
        for (final MyRunnable task : tasks) {
            completionService.submit(task, task);
        }

        for (int i = 0; i < tasks.size(); i++) {
            try {
                status = completionService.take().get().getStatus();
            } catch (final Exception e) {
                success = false;
                break;
            }

            if (status == 0) {
                System.out.println("A task failed. All will be rerun");
                executorService.shutdownNow();
                success = false;
                break;
            }
        }
    } while (!success);
}

将任务(Runnables)设计为可中断的至关重要,否则任何停止它们的尝试都是徒劳的。

于 2013-02-20T19:10:09.530 回答
1

您可以简单地要求线程在调用者的线程中设置一个值:

调用者中的字段:

int[] status = new int[NUM_THREADS];

然后给每个 Runnable 一个索引,这样它就可以在数组中设置它的状态,例如:

for (int i = 0; i < NUM_THREADS; i++) {
    final index = i;

    Runnable r = new Runnable() {
        void run() {
            // have your code stop occasionally to check for any failures
            if (>I failed>)
                status[index] = 1;
        }
    }
    // start thread using Runnable
}

要检测停止,请让您的 Runnables 偶尔检查数组中的任何故障,或者您可以boolean向调用者类添加单独的标志:

volatile boolean failed = false;
Object lock = new Object();

然后在您的可运行文件中:

if (<I failed>) {
    synchronised (lock) {
        failed = true;
    }
}

并且您在工作期间对失败的检查将是:

synchronised (lock) {
    if (failed) {
        // clean up resources
        status[index] = -1; // consider using a separate value for "halted"
        return;
    }
}
于 2013-02-20T19:04:08.920 回答