0

我已经实现了一个单例(管理器)来管理一些相关任务,在这个管理器中我使用一个执行器同时处理 10 个任务,我使用的是无限制的linkedBlockingQueue,到目前为止效果很好,但现在我需要为我的执行程序队列设置一个限制,因为我有很多任务(数十万个任务),我不想把它们都放在我的队列中,这会导致我出现性能问题,所以我做了什么:

这是我的执行者:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, corePoolSize + 5, 500, TimeUnit.MILLISECONDS, workQueue);
}


@Override
protected void beforeExecute(Thread t, Runnable r) {
    super.beforeExecute(t, r);
    //Do something to my task
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if(t != null) {
        // 
    } else {
        //Do something to my task
    }
}
}

这是我的经理:

public final class MyManager {

private static MyManager manager = new MyManager();
public static final int queueMaxSize = 100;
private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(queueMaxSize);
private ExecutorService executor = new MyThreadPoolExecutor(10, workQueue);


/**
 * constructor
 */
private MyManager() {}

public static MyManager getInstance(){

    if (manager == null){
        synchronized(MyManager.class){
            if (manager == null){
                manager = new MyManager();
            }
        }
    }
    return manager;
}

/**
 */
public void executeTask(Integer key){
    executeTask(key, Locale.getDefault());
}

/**
 */
public void executeTask(Integer key, Locale locale) {
    Tasker task = new Tasker(key, locale);
    executor.execute(task);
}
}

这里是要求完成任务的班级:

public class MyClass {

public void doTasks() {
    //geting my tasks in array of list, its holding more than 900 000 tasks,
    //sometimes its holding up to 10 million task like :
    MyManager.getInstance().isFull() {\\wait, then ask again}

    ArrayList<Integer> myTasks = getAllTasksIds();

    for(Integer id : myTasks) {
        //if i perform a waiting here it will be waiting for ever.
        MyManaget.getInstance().executeTask(id);
    }
}

}

我到底想要什么等待执行者直到完成他的队列任务,然后再重新填满它。

但问题是当我尝试根据队列大小等待时,执行程序将无法工作,并且由于队列仍然满了,它会永远等待。

4

2 回答 2

1

为什么不只使用有界阻塞队列(即指定 BlockingQueue 的边界)?如果你使用有界阻塞队列(大小你可以自己选择),当队列满时你的生产者会阻塞,当队列中的任务被消费时会恢复发布任务。这样,您可以避免将过多的东西过快地放入队列,但也可以避免将过多的东西放入队列中。这就是阻塞队列的重点......

于 2013-06-27T08:18:32.073 回答
0

我测试了你的代码,但我没有使用 ArrayBlockingQueue,而是用它扩展了它......它可以工作。试试看:

public static class MyBlockingQueue extends ArrayBlockingQueue<Runnable> {

    private static final long serialVersionUID= -9016421283603545618L;

    public static Lock lock= new ReentrantLock();
    public static Condition condition= lock.newCondition();
    public static volatile Boolean isWaiting= false;

    public MyBlockingQueue(int capacity) {
        super(capacity, true);
    }

    @Override
    public boolean offer(Runnable e) {
        if (remainingCapacity() == 0) {
            try {
                isWaiting= true;
                condition.await();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
        return super.offer(e);
    }

    @Override
    public Runnable take() throws InterruptedException {
        Runnable take= super.take();
        if (remainingCapacity() > 0 && isWaiting) {
            isWaiting= false;
            condition.signal();
        }
        return take;
    }

}
于 2013-06-27T08:12:12.033 回答