2

在我看来,使用我自己的以下 SupervisedExecutor 和 ExecutorSuperviser 实现,我的性能很差,你认为这段代码效率低下的是什么?我想学习如何提高它的效率。

ExecutorSupervisor 类:

public class ExecutorSuperviser {
private SupervisedExecutor[] threadPool;
private int poolSize = 0;
private LinkedList<Runnable> q;\\my own implementation of linkedlist
public ExecutorSuperviser(int nThreads) {
    threadPool=new SupervisedExecutor[poolSize=nThreads];
    q=new LinkedList<Runnable>();
    init();
}
public void execute(Runnable r) { 
    synchronized (q) {
        q.addToTail(r);
    }
    for (int i=0;i<poolSize;i++) 
            if (!threadPool[i].isBusy()) {
                if (!threadPool[i].isAlive()) threadPool[i].start();
                threadPool[i].interrupt();
                return;
            }


}
private void init() {
    for (int i=0;i<poolSize;i++) {
        threadPool[i]=new SupervisedExecutor(this);
    }

}
public Object getLock() {
    return q;
}
public Runnable getTask() {
    return q.removeHead();
}
public void terminate() {
    for (int i=0;i<poolSize;i++) 
        threadPool[i].terminate();
}
public void waitUntilFinished() {
    while (!isFinished()) {
        try {
            Thread.sleep(Thread.MAX_PRIORITY);
        } catch (InterruptedException e) {}
    }
}
private boolean isFinished() {
    for (int i=0;i<poolSize;i++) 
        if (threadPool[i].isBusy()) return false;
    return q.isEmpty();
}

}

监督执行者类:

public class SupervisedExecutor extends Thread {
    private boolean   terminated = false;
    private Boolean busy = false;
    private ExecutorSuperviser boss;
    SupervisedExecutor (ExecutorSuperviser boss) {
        this.boss=boss;
    }
    public void run() {
        while (!terminated) {
            try {
                sleep(MAX_PRIORITY);
            } catch (InterruptedException e) {
                synchronized (busy) {
                    busy=true;
                }
                Runnable r;
                while (true) {
                    synchronized (boss.getLock()) {
                        r=boss.getTask();
                    }
                    if (r!=null) r.run();
                    else break;
                } 
                synchronized (busy) {
                    busy=false;
                }
            }
        }
    }

    public boolean isBusy() {
        boolean isBusy;
        synchronized (boss.getLock()) {
            isBusy=busy;
        }
        return isBusy;
    }
    public void terminate() {
        terminated=true;
    }

}
4

2 回答 2

1

以下具有以下优点的解决方案怎么样:

  1. 作为ThreadPoolExecutor的子类,您不必ThreadPoolExecutor为了获得waitUntilFinished()您所追求的功能而重新实现为您所做的一切。

  2. 通过利用ReentrantLock,Conditionawait()/signal()您可以避免忙于等待,这肯定会损害性能。

此实现通过利用beforeExecute()和公开的afterExecute()方法ThreadPoolExecutor来保持我们自己的活动任务计数。我不使用getActiveCount(),因为根据 JavaDoc,它不能保证准确的答案(尽管也许ThreadPoolExecutor它确实提供了准确的答案,但我需要进一步研究才能确定)。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class WaitableThreadPoolExecutor extends ThreadPoolExecutor
{
    private Condition waitCondition;
    private ReentrantLock lock;
    private int taskCount = 0;

    public WaitableThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue )
    {
        super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );

        lock = new ReentrantLock( );
        waitCondition = lock.newCondition( );
    }

    // if isEmpty() is true, then there is no need to block
    // otherwise, wait until waitCondition is signaled
    public void waitUntilFinished( )
    {
        lock.lock( );
        try
        {
            while ( !isEmpty( ) )
                waitCondition.await( );
        }
        catch ( InterruptedException e )
        {
            e.printStackTrace();
        }
        finally
        {
            lock.unlock( );
        }
    }

    // the ThreadPool is empty if our taskCount is 0 and the
    // work queue is empty (this may not be bullet-proof, for one
    // thing, I'm hesitant to use getActiveCount() because it
    // does not guarantee an exact answer
    protected boolean isEmpty( )
    {
        lock.lock( );
        try
        {
            return taskCount == 0 && getQueue( ).isEmpty( );
        }
        finally
        {
            lock.unlock( );
        }
    }

    // increment our task count before executing each task
    @Override
    protected void beforeExecute( Thread t, Runnable r )
    {
        super.beforeExecute( t, r );

        lock.lock( );
        try
        {
            taskCount += 1;
        }
        finally
        {
            lock.unlock( );
        }
    }

    // decrement our task count after executing each task
    // then, if the pool is empty, signal anyone waiting
    // on the waitCondition
    @Override
    protected void afterExecute( Runnable r, Throwable t )
    {
        super.afterExecute( r, t );

        lock.lock( );
        try
        {
            taskCount -= 1;

            if ( isEmpty( ) ) waitCondition.signalAll( );
        }
        finally
        {
            lock.unlock( );
        }
    }

    public static void main( String[] args )
    {
        WaitableThreadPoolExecutor pool = new WaitableThreadPoolExecutor( 2, 4, 5000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>( ) );

        for ( int i = 0 ; i < 10 ; i++ )
        {
            final int threadId = i;

            pool.execute( new Runnable( )
            {
                @Override
                public void run( )
                {
                    try { Thread.sleep( (int) ( Math.random( ) * 5000 ) ); } catch ( InterruptedException e ) { }

                    System.out.println( threadId + " done." );
                }
            });
        }

        pool.waitUntilFinished( );

        System.out.println( "Done waiting." );
    }
}

我包含了一个简单的main()方法,您可以将其用作测试用例。它启动 10 个线程,这些线程在打印完成之前等待随机时间。然后主线程调用waitUntilFinished()

结果看起来像(主要的一点是Done waiting.总是最后打印:

1 done.
2 done.
0 done.
4 done.
3 done.
5 done.
7 done.
8 done.
6 done.
9 done.
Done waiting.
于 2012-04-19T23:20:47.947 回答
0

就个人而言,我发现使用普通的 ExecutorService 更短更容易。

注意:这是您需要的所有代码。

ExecutorService es = Executors.newCachedThreadPool();

List<Future<Void>>futures = new ArrayList<Future<Void>>();
for (int i = 0; i < 10; i++) {
    final int threadId = i;
    futures.add(es.submit(new Callable<Void>() {
        @Override
        public Void call() throws InterruptedException {
            Thread.sleep((int) (Math.random() * 1000));
            System.out.println(threadId + " done.");
            return null;
        }
    }));
}
for (Future<Void> future : futures)
    future.get();
System.out.println("Done waiting.");

es.shutdown();

印刷

2 done.
4 done.
7 done.
6 done.
8 done.
5 done.
9 done.
1 done.
3 done.
0 done.
Done waiting.
于 2012-04-20T07:21:57.867 回答