是否可以为执行者执行的任务设置优先级?我在 JCIP 中找到了一些关于它可能的声明,但我找不到任何示例,也找不到任何相关的文档。
来自 JCIP:
执行策略指定任务执行的“内容、地点、时间和方式”,包括:
- ...
- 任务应该按什么顺序执行(FIFO、LIFO、优先顺序)?
- ...
UPD:我意识到我问的并不完全是我想问的。我真正想要的是:
如何使用/模拟设置线程优先级(即是什么thread.setPriority()
)与执行程序框架?
是否可以为执行者执行的任务设置优先级?我在 JCIP 中找到了一些关于它可能的声明,但我找不到任何示例,也找不到任何相关的文档。
来自 JCIP:
执行策略指定任务执行的“内容、地点、时间和方式”,包括:
- ...
- 任务应该按什么顺序执行(FIFO、LIFO、优先顺序)?
- ...
UPD:我意识到我问的并不完全是我想问的。我真正想要的是:
如何使用/模拟设置线程优先级(即是什么thread.setPriority()
)与执行程序框架?
目前Executor 接口的唯一具体实现是ThreadPoolExecutor和ScheduledThreadpoolExecutor
您应该使用构造函数创建一个实例,而不是使用实用程序/工厂类Executors 。
您可以将BlockingQueue传递给 ThreadPoolExecutor 的构造函数。
作为 BlockingQueue 的实现之一,PriorityBlockingQueue允许您将 Comparator 传递给构造函数,这样您就可以决定执行顺序。
这里的想法是在执行程序中使用 PriorityBlockingQueue。为了这:
首先,您需要优先考虑您的未来:
class PriorityFuture<T> implements RunnableFuture<T> {
private RunnableFuture<T> src;
private int priority;
public PriorityFuture(RunnableFuture<T> other, int priority) {
this.src = other;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return src.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return src.isCancelled();
}
public boolean isDone() {
return src.isDone();
}
public T get() throws InterruptedException, ExecutionException {
return src.get();
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return src.get();
}
public void run() {
src.run();
}
}
接下来,您需要定义能够正确排序优先期货的比较器:
class PriorityFutureComparator implements Comparator<Runnable> {
public int compare(Runnable o1, Runnable o2) {
if (o1 == null && o2 == null)
return 0;
else if (o1 == null)
return -1;
else if (o2 == null)
return 1;
else {
int p1 = ((PriorityFuture<?>) o1).getPriority();
int p2 = ((PriorityFuture<?>) o2).getPriority();
return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
}
}
}
接下来让我们假设我们有这样一个冗长的工作:
class LenthyJob implements Callable<Long> {
private int priority;
public LenthyJob(int priority) {
this.priority = priority;
}
public Long call() throws Exception {
System.out.println("Executing: " + priority);
long num = 1000000;
for (int i = 0; i < 1000000; i++) {
num *= Math.random() * 1000;
num /= Math.random() * 1000;
if (num == 0)
num = 1000000;
}
return num;
}
public int getPriority() {
return priority;
}
}
然后为了优先执行这些作业,代码将如下所示:
public class TestPQ {
public static void main(String[] args) throws InterruptedException, ExecutionException {
int nThreads = 2;
int qInitialSize = 10;
ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
}
};
for (int i = 0; i < 20; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("Scheduling: " + priority);
LenthyJob job = new LenthyJob(priority);
exec.submit(job);
}
}
}
这是很多代码,但这几乎是可以完成的唯一方法。
在我的机器上,输出如下:
Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97
您可以实现自己的 ThreadFactory 并将其设置在 ThreadPoolExecutor 中,如下所示:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));
我的 OpJobThreadFactory 如下所示:
public final static class OpJobThreadFactory implements ThreadFactory {
private int priority;
private boolean daemon;
private final String namePrefix;
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
public OpJobThreadFactory(int priority) {
this(priority, true);
}
public OpJobThreadFactory(int priority, boolean daemon) {
this.priority = priority;
this.daemon = daemon;
namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(priority);
return t;
}
}
您可以将 ThreadPoolExecutor 与优先阻塞队列一起使用 如何使用 ThreadPoolExecutor 和自定义任务实现 PriorityBlockingQueue
您可以ThreadFactory
在ThreadPoolExecutor
构造函数(或Executors
工厂方法)中指定 a。这允许您为执行程序提供给定线程优先级的线程。
要为不同的作业获取不同的线程优先级,您需要将它们发送给具有不同线程工厂的执行器。
请注意 setPriority(..) 通常在 Linux下不起作用。有关完整详细信息,请参阅以下链接:
只是想在这个讨论中添加我的一点贡献。我已经为一个非常特定的目的实现了这个ReorderingThreadPoolExecutor,它能够在我想要的任何时候显式地将执行器的 BlockingQueue(在本例中为 LinkedBlockingDeque)放在前面,而不必处理优先级(这可能导致死锁并且是,无论如何,已修复)。
我正在使用它来管理(在 Android 应用程序内)我必须下载许多显示在长列表视图中的图像的情况。每当用户快速向下滚动时,执行程序队列就会被图像下载请求淹没:通过将最新的移动到队列顶部,我在加载实际在屏幕上的图像方面取得了更好的性能,延迟了下载以后可能需要的那些。请注意,我使用内部并发映射键(可以像图像 URL 字符串一样简单)将任务添加到执行程序,以便稍后检索它们以进行重新排序。
有很多其他方法可以做同样的事情,也许它过于复杂,但它工作正常,而且他的 Android SDK 中的 Facebook 在它自己的工作线程队列中做类似的事情。
随意查看代码并给我建议,它在一个 Android 项目中,但剥离一些日志和注释将使该类成为纯 Java 6。
如果这只是试图偏袒一个线程而不是保证任何顺序的情况。在 Callable 上,您在 call() 方法的开头传入设置的线程优先级:
private int priority;
MyCallable(int priority){
this.priority=priority;
}
public String call() {
logger.info("running callable with priority {}", priority);
Thread.currentThread().setPriority(priority);
// do stuff
return "something";
}
仍然依赖于底层实现来尊重线程优先级