1

我有一个多线程批处理应用程序,可以在 5-10 个并发执行线程之间运行。它们的数据段被仔细切割以尽可能均匀地分布,但当然,执行时间总是不同的。我想做的是在最后一个线程完成时调用最后一种 onFinalize 方法,这将进行一些统计计算。

我想知道是否知道线程是最后一个线程的最佳方法(而不是查询数据库,这似乎有点行人)是拥有一个静态变量,当一个新线程是时,它将在同步块中递增每个线程完成时添加和减少。因此,当一个线程完成并递减时,我可以有一个 if 来查看未完成的线程数是否为 0,然后调用最终统计信息。

这就是我的想法。我想知道是否有更好、更优雅或防弹的方法来实现这一点。

使用 Java 7

谢谢

4

4 回答 4

0

为什么不在最后在每个线程上调用 Thread.join() 呢?你可以有一个 for 循环在每个循环上调用它——它会在第一次运行时锁定,然后当那个完成时,返回,然后你在下一次运行时锁定。当您退出循环时,它们都已退出。

public class ThreadManager {
    private List<Thread> threads;

    public void addThread(Thread thread) {
        threads.add(thread);
    }

    public void waitTillAllComplete() {
        for (int ind=0; ind<threads.size(); ind++)
            threads.get(ind).join();
    }
}
于 2012-09-05T05:17:02.833 回答
0

我通过创建一个扩展基本Runnable接口以提供更健壮的线程架构的框架来完成类似的事情。后来我发现它与 GWT 中用于类似目的的异步代码惊人地相似。

这是凭记忆,但基本上可以归结为:

public interface AsyncRunnable<T> extends Runnable
{
    AsyncCallback<T> getCallback();

    T runAsync();
}

public interface AsyncCallback<T>
{
    void onSuccess(T data);
    void onFailure(Exception exception);
}

public interface AsyncCallbackInvoker<T> extends Runnable
{
    // implies requirement for callback...
    AsyncCallback<T> getCallback();
}

public class SuccessfulAsyncCallbackInvoker<T> implements AsyncCallback<T>
{
    private final AsyncCallback<T> callback;
    private final T data;

    public SuccessfulAsyncCallbackInvoker(AsyncCallback<T> callback, T data)
    {
        // note: data being null may be valid; callback would not be

        this.callback = callback;
        this.data = data;
    }

    @Override
    public void run()
    {
        callback.onSuccess(data);
    }

    @Override
    public AsyncCallback<T> getCallback()
    {
        return callback;
    }
}

public class FailureAsyncCallbackInvoker<T> implements AsyncCallback<T>
{
    private final AsyncCallback<T> callback;
    private final Exception  exception;

    public FailureAsyncCallbackInvoker(AsyncCallback<T> callback, Exception exception)
    {
        // note: data being null may be valid; callback would not be

        this.callback = callback;
        this.exception= exception;
    }

    @Override
    public void run()
    {
        callback.onFailure(exception);
    }

    @Override
    public AsyncCallback<T> getCallback()
    {
        return callback;
    }
}

public abstract class AbstractAsyncRunnable<T> implements AsyncRunnable<T>
{
    private final AsyncCallback<T> callback;

    public AbstractAsyncRunnable(AsyncCallback<T> callback)
    {
        // if == null -> throw

        this.callback = callback;
    }

    @Override
    public /* final */ void run()
    {
        AsyncCallbackInvoker<T> invoker;

        try
        {
            T data = runAsync();

            invoker = new SuccessfulAsyncCallbackInvoker<T>(callback, data);
        }
        catch (Exception e)
        {
            invoker = new FailureAsyncCallbackInvoker<T>(callback, e);
        }

        invokeCallback(invoker);
    }

    // allows overriding to put callback on whatever Thread you want
    protected void invokeCallback(AsyncCallbackInvoker<T> invoker)
    {
        invoker.run();
    }
}

在实践中,它已被证明非常健壮,特别是通过提供覆盖能力,invokeCallback我们能够利用许多线程安全操作来保证响应。

在实践中,实现只需要重写runAsync方法AsyncRunnable并提供AsyncCallback他们认为合适的任何东西。重要的是要注意,在某些情况下,您确实将它用于简单地标记完成,并且在这种情况下可以TVoid如果您使用它)。return nullrunAsync

于 2012-09-05T05:32:22.910 回答
0

所以你还能做的就是创建一个ThreadManager类。这个类应该有一个List<Thread>. 每当您创建一个新Thread任务来执行您的任务时,您都应该registerThreadManager. 然后创建一个方法,在该方法中ThreadManager返回.numberlive ThreadsList

通过这种方式,您可以随时查看有多少线程正在运行,并且您可以根据需要对正在运行的线程执行任何其他操作。另外一个Thread应该定期运行,它应该从List.

示例ThreadManager可能如下所示:

public class ThreadManager {
    private static ThreadManager tm;
    private List<Thread> threads;

    public static ThreadManager get() {
        if(null == tm) {
            tm = new ThreadManager();
        }

        return tm;
    }

    private ThreadManager() {
        threads = new ArrayList<Thread>();
    }

    public int getCountOfAliveThreads() {
        int count = 0;
        for(Thread t : threads) {
            if(t.isAlive()) {
                count++;
            }
        }

        return count;
    }
}

希望这可以帮助。您还可以在Thread List.

于 2012-09-05T05:34:55.170 回答
-1

我感谢所有超级有用和启发性的答案,但是,带有静态和易失性的简单计数器似乎工作正常:

private static volatile int crntThrdCnt = 0;

进而

    @Override
public void run() 
{
    crntThrdCnt++;
    this.runBatchInstance();
    crntThrdCnt--;

    if(crntThrdCnt == 0)
    {
        ProcessManager prcsMgr = new ProcessManager();
        Batch batch = prcsMgr.getBatch(this.batchID);
        batch.setEndTime(new Date());
        prcsMgr.updateBatch(batch);
    }
}
于 2012-09-05T05:43:14.493 回答