0

我想知道是否有一种方法可以监控线程的生命周期,但我会解释我正在做的事情的背景,所以也许有更好的方法来做到这一点。

基本上,我有 x 个线程正在处理队列并对其进行处理,如果线程获得可接受的结果,它将进入解决方案队列,否则数据将被丢弃或进一步处理。

我的问题出在我的主线程中,我有一个点赞while(!solutions_results.isEmpty()),它保存了数据(现在它打印到一个文件,但后来可能是数据库)。显而易见的问题是,一旦它清除了解决方案队列,它就完成并完成工作,即使其他线程仍在将数据放入队列中。

我不确定处理这个问题的最佳方法(也许有一个只保存解决方案队列的专用线程?)但我在想如果我能以某种方式监控其他线程的生命周期,那么就没有更多数据的机会进入解决方案队列。

如果有更好的方法来做到这一点,请让我知道,否则有没有办法告诉一旦其他线程完成(我不能等待执行程序在运行这个过程之前完全完成,因为它可能会变得很大并且不要希望它只是坐在内存中,理想情况下希望在它接近时处理它,但它不依赖于时间)?

4

4 回答 4

2

如果您使用ExecutorService来运行您的线程作业,那么您可以使用该awaitTermination()方法来了解所有线程何时完成:

ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(yourSolutionsRunnable);
pool.submit(yourSolutionsRunnable);
...
// once you've submitted your last job you can do
pool.shutdown();

然后您可以等待所有提交的作业完成:

pool.waitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

如果您的线程在提交解决方案后需要继续运行,这将变得更加复杂。如果您编辑您的问题并使其更加明显,我将编辑我的答案。


编辑:

哦,我看到您想在此过程中处理一些结果,但在所有线程完成之前不要停止。

您可以使用pool.isTerminated()测试来告诉您是否所有工作都已完成。所以你的循环看起来像:

// this is the main thread so waiting for solutions in a while(true) loop is ok
while (true) {
    // are all the workers done?
    if (pool.isTerminated()) {
       // if there are results process one last time
       if (!solutions_results.isEmpty()) {
           processTheSolutions();
       }
       break;
    } else {
        if (solutions_results.isEmpty()) {
            // wait a bit to not spin, you could also use a wait/notify here
            Thread.sleep(1000);
        } else {
            processTheSolutions();
        }
    }
}

编辑:

您还可以有两个线程池,一个用于生成解决方案,另一个用于处理。然后,您的主线程可以等待工作池清空,然后等待解决方案处理池。工作池会将解决方案(如果有)提交到解决方案池中。您可以在解决方案处理池中只有 1 个线程,或者根据需要更多。

ExecutorService workerPool = Executors.newFixedThreadPool(10);
final ExecutorService solutionsPool = Executors.newFixedThreadPool(1);
solutionsPool.submit(workerThatPutsSolutionsIntoSolutionsPool);
...
// once you've submitted your last worker you can do
workerPool.shutdown();

workerPool.waitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
// once the workers have finished you shutdown the solutions pool
solutionsPool.shutdown();
// and then wait for it to finish
solutionsPool.waitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
于 2012-04-13T17:08:13.450 回答
0

我对您正在处理的行为要求了解不多,但是如果您希望主线程阻塞直到所有子线程都完成,您应该查看类的join方法Thread

http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/Thread.html#join ()

只需在主线程中运行一个循环,该循环在每个子线程上调用 join 方法,当它退出循环时,您可以确定所有线程都已完成工作。

于 2012-04-13T17:07:17.493 回答
0

只需保留您的活动线程列表。如果同时添加/删除线程,您希望它同步以防止它被丢弃。或者使用 java.util.concurrent.ConcurrentLinkedQueue 之类的东西,它本身可以处理多个线程。启动时将每个线程添加到列表中。每个线程都应该在它停止之前从列表中删除自己。当列表为空时,您的所有线程都已完成。

编辑: 时机很重要。 首先,主线程必须将工作线程放入列表中。如果它们将自己放入列表中,则主线程可以在某些线程已将自己从列表中删除并且所有其他线程虽然已启动但尚未开始执行时检查列表 - 因此尚未将自己放入列表。然后它会认为一切都已完成,而实际上并非如此。 其次,主线程必须在启动之前将每个工作线程放在列表中。否则,线程可能会在主线程将其添加到列表之前完成并尝试将自己从列表中删除。然后列表永远不会变空,程序永远不会完成。

于 2012-04-13T21:02:38.267 回答
0

也许 java.util.concurrent.ExecutorCompletionService 在这里会很有用。

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class monitor_life_of_threads
{
    /**
     * First, convert all of your threads to instances of Callable (easy to do), and have them each return an instance some class (I'm using Integer below just as 
     * an example).
     * This will help simplify things.
     */

    public static void main ( String args[] )
    {
        final monitor_life_of_threads worker = new monitor_life_of_threads();

        worker.executeCallablesAndUseResults ();

        System.exit ( 0 );
    }

    private void executeCallablesAndUseResults ()
    {
        List < Callable < Result >> list = new ArrayList <> ();
        populateInputList ( list );

        try
        {
            doWork ( list );
        }
        catch ( InterruptedException e )
        {
            e.printStackTrace ();
        }
        catch ( ExecutionException e )
        {
            e.printStackTrace ();
        }
        catch ( CancellationException e )
        {
            /*
             * Could be called if a Callable throws an InterruptedException, and if it's not caught, it can cause Future.get to hang.
             */
            e.printStackTrace ();
        }
        catch ( Exception defaultException )
        {
            defaultException.printStackTrace ();
        }
    }

    private void doWork ( Collection < Callable < Result >> callables ) throws InterruptedException, ExecutionException
    {
        ExecutorService executorService = Executors.newCachedThreadPool ();

        CompletionService < Result > ecs = new ExecutorCompletionService < > ( executorService );

        for ( Callable < Result > callable : callables )
            ecs.submit ( callable );

        for ( int i = 0, n = callables.size (); i < n; ++i )
        {
            Result r = ecs.take ().get ();
            if ( r != null )
                use ( r ); // This way you don't need a second queue.
        }

        executorService.shutdown ();

    }

    private void use ( Result result )
    {
        // Write result to database, output file, etc.

        System.out.println ( "result = " + result );
    }

    private List < Callable < Result >> populateInputList ( List < Callable < Result >> list )
    {
        list.add ( new Callable < Result > () {

            @Override
            public Result call () throws Exception
            {
                // Do some number crunching, then return a 5.
                return new Result ( 5 );
            }

        } );

        list.add ( new Callable < Result > () {

            @Override
            public Result call () throws Exception
            {
                // Do some number crunching, then return an 8.
                return new Result ( 8 );
            }

        } );

        list.add ( new Callable < Result > () {

            @Override
            public Result call () throws Exception
            {
                // Do some number crunching, but fail and so return null.
                return null;
            }

        } );

        return list;
    }
}

class Result
{
    private Integer i;

    Result ( Integer i)
    {
        this.i = i;
    }

    public String toString ()
    {
        return Integer.toString ( i );
    }
}
于 2013-09-30T00:41:46.803 回答