假设我需要处理 3 种类型的请求:A、B 和 C,如下所示:
- 请求同时处理。
- 最多同时处理 K (<= 3) 个请求。
- 同一类型的请求不能同时处理。
更一般地,类型数为 N,并发请求数为 K <= N。
你将如何用 Java 实现它java.util.concurrent
?
假设我需要处理 3 种类型的请求:A、B 和 C,如下所示:
更一般地,类型数为 N,并发请求数为 K <= N。
你将如何用 Java 实现它java.util.concurrent
?
您不能同时处理 K 个请求,这将违反第二条规则。最大并发请求数是数字类型。在你的情况下,它是三个。所以制作三个队列并将它们附加到三个线程。那是唯一的办法。Executors.newSingleThreadExecutor实现了这种技术。
public static void main(String[] args) {
int N = 2;
int K = 3;
List<Executor> executors = new ArrayList<Executor>(N);
for(int i = 0; i < N; i++){
executors.add(Executors.newSingleThreadExecutor());
}
Map<Type, Executor> typeExecutors = new HashMap<Type, Executor>(K);
int i = 0;
for(Type t : Type.values()){
typeExecutors.put(t, executors.get(i++ % executors.size()));
}
}
enum Type{
T1, T2, T3
}
我会创建三个Executors.newFixedThreadPool(1)
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)
并由工厂委托为任何执行者执行每次执行。
ExecutorService executor = ThreadFactory.getExecutorForRequest(RequestType type);
executor.execute(request);
请求同时处理。
您可以使用 Executor 服务。
最多同时处理K个请求。
在执行器中,设置最大线程数。
同一类型的请求不能同时处理。
您可能认为每种请求类型都有不同的锁。只要确保如果一个线程在指定时间内无法获得请求的锁,它应该屈服并继续下一个任务处理。
您的问题的领域可以建模为两个数据结构,我称之为挂起(将类型映射到无界的任务队列 - 这是任务等待运行的地方)和运行(每种类型最多有一个任务准备好运行,或实际上由执行程序运行)。
K 约束必须应用于running:它最多有 KType
个Task
映射。
重点是您为所有任务处理分配的线程数与并发约束处理完全正交:您的线程池选择应该(除其他外)由要执行的任务类型(IO/CPU 限制? ),不受并发约束。
一个实现:
public class Foo {
enum TaskType { A, B, C }
class Task {
TaskType type;
Runnable runnable;
volatile boolean running;
}
Map<TaskType, Queue<Task>> pending = new HashMap<TaskType, Queue<Task>>();
Map<TaskType, Task> running = new HashMap<TaskType, Task>();
ExecutorService executor = null; // Executor implementation is irrelevant to the problem
/** Chooses a task of a random type between those not running. */
TaskType choosePending(){
Set running_types = running.keySet();
running_types.removeAll(Arrays.asList(pending.keySet()));
List shuffled = new ArrayList(running_types);
Collections.shuffle(shuffled);
return (TaskType) shuffled.get(0);
}
// note that max concurrency != parallelism level (which the executor is responsible for)
final int MAX_CONCURRENCY = 3;
void produce(){
synchronized(running){
if (running.size() < MAX_CONCURRENCY) {
synchronized (pending){
TaskType t = choosePending();
running.put(t, pending.get(t).remove()) ;
}
}
}
}
{
new Thread(new Runnable() {
public void run() {
while (true) produce();
}
}).start();
}
Task chooseRunning(){
for (Task t : running.values()){
if (!t.running){
return t;
}
}
return null;
}
void consume(){
final Task t;
synchronized (running){
t = chooseRunning();
if (t != null){
t.running = true;
executor.execute(new Runnable() {
public void run() {
t.runnable.run();
synchronized (running) {
running.remove(t);
}
}
});
}
}
}
{
new Thread(new Runnable() {
public void run() {
while (true) consume();
}
}).start();
}
}