26

我搜索了很多,但找不到解决我的问题的方法。

我有自己的班级,BaseTask使用 aThreadPoolExecutor来处理任务。我想要任务优先级,但是当我尝试使用 a 时,PriorityBlockingQueue我得到ClassCastException了因为ThreadPoolExecutor它将我的任务包装到一个FutureTask对象中。

这显然是有道理的,因为FutureTask没有实现Comparable,但是我将如何继续解决优先级问题?我读过你可以覆盖newTaskFor()in ThreadPoolExecutor,但我似乎根本找不到这种方法......?

我们欢迎所有的建议!

一些帮助的代码:

在我的BaseTask课上,我有

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

private final BaseFutureTask<Result> mFuture;

public BaseTask(int priority) {
    mFuture = new BaseFutureTask<Result>(mWorker, priority);
}

public final BaseTask<Params, Progress, Result> execute(Params... params) {

    /* Some unimportant code here */

    sExecutor.execute(mFuture);
}

BaseFutureTask课堂上

@Override
public int compareTo(BaseFutureTask another) {
    long diff = this.priority - another.priority;

    return Long.signum(diff);
}

BaseThreadPoolExecutor类中,我覆盖了 3 个submit方法......这个类中的构造函数被调用,但没有一个submit方法

4

7 回答 7

14
public class ExecutorPriority {

public static void main(String[] args) {

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
    exe.execute(new RunWithPriority(2) {

        @Override
        public void run() {

            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });
    exe.execute(new RunWithPriority(10) {

        @Override
        public void run() {
            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });

}

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {
        return o1.getPriority().compareTo(o2.getPriority());
    }
}

}

你可以猜到 RunWithPriority 是一个抽象类,它是可运行的并且有一个整数优先级字段

于 2011-03-30T11:37:20.330 回答
11

您可以使用这些辅助类:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

这个辅助方法:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

然后像这样使用它

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
于 2013-05-16T01:44:11.627 回答
5

我将尝试用一个功能齐全的代码来解释这个问题。但在深入研究代码之前,我想解释一下 PriorityBlockingQueue

PriorityBlockingQueue:PriorityBlockingQueue 是 BlockingQueue 的一个实现。它接受任务及其优先级,并首先提交具有最高优先级的任务执行。如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定哪个任务先执行。

现在让我们直接进入代码。

驱动程序类:该类创建一个执行器,它接受任务并稍后提交它们以供执行。在这里,我们创建了两个任务,一个具有 LOW 优先级,另一个具有 HIGH 优先级。在这里,我们告诉执行程序运行 MAX 1 个线程并使用 PriorityBlockingQueue。

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
}

MyTask 类:MyTask 实现 Runnable 并接受优先级作为构造函数中的参数。当这个任务运行时,它会打印一条消息,然后让线程休眠 1 秒。

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask 类:由于我们使用 PriorityBlocingQueue 来保存我们的任务,我们的任务必须包装在 FutureTask 中,并且我们的 FutureTask 实现必须实现 Comparable 接口。Comparable接口比较2个不同任务的优先级,提交优先级最高的任务执行。

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

优先级:不言自明的优先级。

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

现在当我们运行这个例子时,我们得到以下输出

The following Runnable is getting executed High
The following Runnable is getting executed Low

尽管我们先提交了 LOW 优先级,然后再提交 HIGH 优先级任务,但是由于我们使用的是 PriorityBlockingQueue,所以优先级较高的任务将首先执行。

于 2016-01-24T09:55:13.663 回答
4

My solution:

public class XThreadPoolExecutor extends ThreadPoolExecutor
{
    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory, RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
    {
        return new ComparableFutureTask<>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
    {
        return new ComparableFutureTask<>(callable);
    }

    protected class ComparableFutureTask<V>
        extends FutureTask<V> implements Comparable<ComparableFutureTask<V>>
    {
        private Object object;
        public ComparableFutureTask(Callable<V> callable)
        {
            super(callable);
            object = callable;
        }

        public ComparableFutureTask(Runnable runnable, V result)
        {
            super(runnable, result);
            object = runnable;
        }

        @Override
        @SuppressWarnings("unchecked")
        public int compareTo(ComparableFutureTask<V> o)
        {
            if (this == o)
            {
                return 0;
            }
            if (o == null)
            {
                return -1; // high priority
            }
            if (object != null && o.object != null)
            {
                if (object.getClass().equals(o.object.getClass()))
                {
                    if (object instanceof Comparable)
                    {
                        return ((Comparable) object).compareTo(o.object);
                    }
                }
            }
            return 0;
        }
    }
}
于 2011-12-12T08:13:44.243 回答
0

看起来他们离开了 apache 和谐。大约一年前有一个svn 提交日志newTaskFor修复了. 您可能只需覆盖扩展中的submit函数ThreadPoolExecutor以创建扩展FutureTaskComparable. 它们不是很长

于 2010-08-23T17:18:12.420 回答
0

回答您的问题:该newTaskFor()方法在ThreadPoolExecutor的超类中找到AbstractExecutorService。但是,您可以简单地在 中覆盖它ThreadPoolExecutor

于 2012-09-30T18:30:13.883 回答
0

这个答案是@StanislavVitvitskyy 答案的简化版本。感谢他。

我想让我提交的工作Comparable成为. 我用 a 创建了ExecutorService一个PriorityBlockingQueue并扩展它来处理这些newTaskFor(...)方法:

ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
    keepAliveTime, timeUnit, new PriorityBlockingQueue<Runnable>()) {

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new ComparableFutureTask<T>(runnable, value);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new ComparableFutureTask<T>(callable);
    };
};

我定义了一个通过委托提交给池的ComparableFutureTask扩展FutureTask和实现。Comparablejob.compareTo(...)

public class ComparableFutureTask<T> extends FutureTask<T>
    implements Comparable<Object> {

    private final Comparable<Object> comparableJob;

    @SuppressWarnings("unchecked")
    public ComparableFutureTask(Runnable runnable, T value) {
        super(runnable, value);
        this.comparableJob = (Comparable<Object>) runnable;
    }

    @SuppressWarnings("unchecked")
    public ComparableFutureTask(Callable<T> callable) {
        super(callable);
        this.comparableJob = (Comparable<Object>) callable;
    }

    @Override
    public int compareTo(Object o) {
        return this.comparableJob
            .compareTo(((ComparableFutureTask<?>) o).comparable);
    }
}

然后这ExecutorService也可以处理RunnableCallable工作Comparable。例如:

public class MyJob implements Runnable, Comparable<MyJob> {
    private int priority;
    ...
    @Override
    public int compareTo(MyJob other) {
        // we want higher priority to go first
        return other.priority - this.priority;
    }
    ...
}

需要注意的是,如果您提交的作业不在Comparable此队列中,它将抛出一个ClassCastException.

于 2019-07-19T13:23:15.087 回答