5

我正在尝试通过 CompletableFuture.supplyAsync 将优先级队列添加到使用 ThreadPoolExecutor 和 LinkedBlockingQueue 的现有应用程序中。问题是我无法想出一个设计来分配任务优先级,然后我可以在 PriorityBlockingQueue 的比较器中访问这些优先级。那是因为我的任务被 CompletableFuture 封装到一个名为 AsyncSupply 的私有内部类的实例中,该实例将原始任务隐藏在私有字段中。然后使用这些转换为 Runnables 的 AsyncSupply 对象调用 Comparator,如下所示:

public class PriorityComparator<T extends Runnable> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {

        // T is an AsyncSupply object.
        // BUT I WANT SOMETHING I CAN ASSIGN PRIORITIES TOO!
        return 0;
    }
}

我研究了扩展 CompletableFuture 的可能性,以便我可以将它包装在不同的对象中,但 CompletableFuture 的大部分内容都是封装且不可继承的。因此,扩展它似乎不是一种选择。也不是用适配器封装它,因为它实现了一个非常广泛的接口。

除了复制整个 CompletableFuture 并对其进行修改之外,我不确定如何解决此问题。有任何想法吗?

4

1 回答 1

7

这似乎是 API 中的一个限制,CompletableFuture它没有提供直接的使用PriorityBlockingQueue. 幸运的是,我们可以毫不费力地破解它。在 Oracle 的 1.8 JVM 中,它们恰好命名了所有内部类的 fields fn,因此提取我们的优先级感知Runnables 可以毫不费力地完成:

public class CFRunnableComparator implements Comparator<Runnable> {

    @Override
    @SuppressWarnings("unchecked")
    public int compare(Runnable r1, Runnable r2) {
        // T might be AsyncSupply, UniApply, etc., but we want to
        // compare our original Runnables.
        return ((Comparable) unwrap(r1)).compareTo(unwrap(r2));
    }

    private Object unwrap(Runnable r) {
        try {
            Field field = r.getClass().getDeclaredField("fn");
            field.setAccessible(true);
            // NB: For performance-intensive contexts, you may want to
            // cache these in a ConcurrentHashMap<Class<?>, Field>.
            return field.get(r);
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new IllegalArgumentException("Couldn't unwrap " + r, e);
        }
    }
}

这假设您的Supplier班级是Comparable,例如:

public interface WithPriority extends Comparable<WithPriority> {
    int priority();
    @Override
    default int compareTo(WithPriority o) {
        // Reverse comparison so higher priority comes first.
        return Integer.compare(o.priority(), priority());
    }
}

public class PrioritySupplier<T> implements Supplier<T>, WithPriority {
    private final int priority;
    private final Supplier<T> supplier;
    public PrioritySupplier(int priority, Supplier<T> supplier) {
        this.priority = priority;
        this.supplier = supplier;
    }
    @Override
    public T get() {
        return supplier.get();
    }
    @Override
    public int priority() {
        return priority;
    }
}

使用如下:

PriorityBlockingQueue<Runnable> q = new PriorityBlockingQueue<>(11 /*default*/,
        new CFRunnableComparator());
ThreadPoolExecutor pool = new ThreadPoolExecutor(..., q);
CompletableFuture.supplyAsync(new PrioritySupplier<>(n, () -> {
    ...
}), pool);

如果您创建类似PriorityFunctionand的类PriorityBiConsumer,您可以使用相同的技术来调用类似thenApplyAsyncand的方法,并且whenCompleteAsync具有适当的优先级。

于 2016-01-28T14:52:47.863 回答