7

假设我需要处理 3 种类型的请求:A、B 和 C,如下所示:

  • 请求同时处理。
  • 最多同时处理 K (<= 3) 个请求。
  • 同一类型的请求不能同时处理。

更一般地,类型数为 N,并发请求数为 K <= N。

你将如何用 Java 实现它java.util.concurrent

4

4 回答 4

2

您不能同时处理 K 个请求,这将违反第二条规则。最大并发请求数是数字类型。在你的情况下,它是三个。所以制作三个队列并将它们附加到三个线程。那是唯一的办法。Executors.newSingleThreadExecutor实现了这种技术。

public static void main(String[] args) {
    int N = 2;
    int K = 3;
    List<Executor> executors = new ArrayList<Executor>(N);
    for(int i = 0; i < N; i++){
        executors.add(Executors.newSingleThreadExecutor());
    }
    Map<Type, Executor> typeExecutors = new HashMap<Type, Executor>(K);
    int i = 0;
    for(Type t : Type.values()){
        typeExecutors.put(t, executors.get(i++ % executors.size()));
    }
}

enum Type{
    T1, T2, T3
}
于 2013-10-11T07:33:36.517 回答
0

我会创建三个Executors.newFixedThreadPool(1)

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)

并由工厂委托为任何执行者执行每次执行。

ExecutorService executor = ThreadFactory.getExecutorForRequest(RequestType type);
executor.execute(request);
于 2013-10-11T07:30:37.580 回答
0

请求同时处理。

您可以使用 Executor 服务。

最多同时处理K个请求。

在执行器中,设置最大线程数。

同一类型的请求不能同时处理。

您可能认为每种请求类型都有不同的锁。只要确保如果一个线程在指定时间内无法获得请求的锁,它应该屈服并继续下一个任务处理。

于 2013-10-11T07:31:12.797 回答
0

您的问题的领域可以建模为两个数据结构,我称之为挂起(将类型映射到无界的任务队列 - 这是任务等待运行的地方)和运行(每种类型最多有一个任务准备好运行,或实际上由执行程序运行)。

K 约束必须应用于running:它最多有 KTypeTask映射。

重点是您为所有任务处理分配的线程数与并发约束处理完全正交:您的线程池选择应该(除其他外)由要执行的任务类型(IO/CPU 限制? ),不受并发约束。

一个实现:

public class Foo {

    enum TaskType { A, B, C }

    class Task {
        TaskType type;
        Runnable runnable;
        volatile boolean running;
    }

    Map<TaskType, Queue<Task>> pending = new HashMap<TaskType, Queue<Task>>();

    Map<TaskType, Task> running = new HashMap<TaskType, Task>();

    ExecutorService executor = null; // Executor implementation is irrelevant to the problem

    /** Chooses a task of a random type between those not running. */
    TaskType choosePending(){
        Set running_types = running.keySet();
        running_types.removeAll(Arrays.asList(pending.keySet()));
        List shuffled = new ArrayList(running_types);
        Collections.shuffle(shuffled);
        return (TaskType) shuffled.get(0);
    }

    // note that max concurrency != parallelism level (which the executor is responsible for)
    final int MAX_CONCURRENCY = 3;

    void produce(){
        synchronized(running){
            if (running.size() < MAX_CONCURRENCY) {
                synchronized (pending){
                    TaskType t = choosePending();
                    running.put(t, pending.get(t).remove()) ;
                }
            }
        }
    }

    {
        new Thread(new Runnable() {
            public void run() {
                while (true) produce();
            }
        }).start();
    }

    Task chooseRunning(){
         for (Task t : running.values()){
             if (!t.running){
                 return t;
             }
         }
        return null;
    }

    void consume(){
        final Task t;
        synchronized (running){
            t = chooseRunning();
            if (t != null){
                t.running = true;
                executor.execute(new Runnable() {
                    public void run() {
                        t.runnable.run();
                        synchronized (running) {
                            running.remove(t);
                        }
                    }
                });
            }
        }
    }

    {
        new Thread(new Runnable() {
            public void run() {
                while (true) consume();
            }
        }).start();
    }

}
于 2013-11-07T09:42:02.887 回答