36

I have a process which delegates asynch tasks to a pool of threads. I need to ensure that certain tasks are executed in order. So for example

Tasks arrive in order

Tasks a1, b1, c1, d1 , e1, a2, a3, b2, f1

Tasks can be executed in any order except where there is a natural dependancy, so a1,a2,a3 must be processed in that order by either allocating to the same thread or blocking these until I know the previous a# task was completed.

Currently it doesn't use the Java Concurrency package, but I'm considering changing to take avantage of the thread management.

Does anyone have a similar solution or suggestions of how to achieve this

4

8 回答 8

18

我编写了自己的 Executor,保证对具有相同键的任务进行任务排序。它使用队列映射来处理具有相同键的订单任务。每个键控任务使用相同的键执行下一个任务。

此解决方案不处理RejectedExecutionException或委托执行器的其他异常!所以委派的执行者应该是“无限的”。

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;

/**
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly).
*/
public class OrderingExecutor implements Executor{

    private final Executor delegate;
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();

    public OrderingExecutor(Executor delegate){
        this.delegate = delegate;
    }

    @Override
    public void execute(Runnable task) {
        // task without key can be executed immediately
        delegate.execute(task);
    }

    public void execute(Runnable task, Object key) {
        if (key == null){ // if key is null, execute without ordering
            execute(task);
            return;
        }

        boolean first;
        Runnable wrappedTask;
        synchronized (keyedTasks){
            Queue<Runnable> dependencyQueue = keyedTasks.get(key);
            first = (dependencyQueue == null);
            if (dependencyQueue == null){
                dependencyQueue = new LinkedList<Runnable>();
                keyedTasks.put(key, dependencyQueue);
            }

            wrappedTask = wrap(task, dependencyQueue, key);
            if (!first)
                dependencyQueue.add(wrappedTask);
        }

        // execute method can block, call it outside synchronize block
        if (first)
            delegate.execute(wrappedTask);

    }

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
        return new OrderedTask(task, dependencyQueue, key);
    }

    class OrderedTask implements Runnable{

        private final Queue<Runnable> dependencyQueue;
        private final Runnable task;
        private final Object key;

        public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
            this.task = task;
            this.dependencyQueue = dependencyQueue;
            this.key = key;
        }

        @Override
        public void run() {
            try{
                task.run();
            } finally {
                Runnable nextTask = null;
                synchronized (keyedTasks){
                    if (dependencyQueue.isEmpty()){
                        keyedTasks.remove(key);
                    }else{
                        nextTask = dependencyQueue.poll();
                    }
                }
                if (nextTask!=null)
                    delegate.execute(nextTask);
            }
        }
    }
}
于 2014-02-21T13:15:55.713 回答
14

当我过去这样做时,我通常由一个组件处理排序,然后将可调用/可运行对象提交给执行器。

就像是。

  • 有要运行的任务列表,其中一些具有依赖项
  • 创建一个 Executor 并使用 ExecutorCompletionService 进行包装
  • 搜索所有任务,任何没有依赖关系的任务,通过完成服务安排它们
  • 轮询完成服务
  • 随着每项任务的完成
    • 将其添加到“已完成”列表
    • 重新评估“已完成列表”中的任何等待任务,以查看它们是否“依赖完成”。如果是这样安排他们
    • 冲洗重复,直到所有任务都提交/完成

完成服务是一种能够在任务完成时获取任务的好方法,而不是尝试轮询一堆 Futures。但是,您可能希望在Map<Future, TaskIdentifier>通过完成服务安排任务时保留填充的内容,以便当完成服务为您提供完成的 Future 时,您可以确定它是哪个TaskIdentifier

如果您发现自己处于任务仍在等待运行的状态,但没有任何东西在运行并且无法安排任何事情,那么您就有了循环依赖问题。

于 2010-08-28T20:25:56.367 回答
3

When you submit a Runnable or Callable to an ExecutorService you receive a Future in return. Have the threads that depend on a1 be passed a1's Future and call Future.get(). This will block until the thread completes.

So:

ExecutorService exec = Executor.newFixedThreadPool(5);
Runnable a1 = ...
final Future f1 = exec.submit(a1);
Runnable a2 = new Runnable() {
  @Override
  public void run() {
    f1.get();
    ... // do stuff
  }
}
exec.submit(a2);

and so on.

于 2010-01-28T10:13:16.520 回答
2

您可以使用 Executors.newSingleThreadExecutor(),但它只会使用一个线程来执行您的任务。另一种选择是使用 CountDownLatch。这是一个简单的例子:

public class Main2 {

public static void main(String[] args) throws InterruptedException {

    final CountDownLatch cdl1 = new CountDownLatch(1);
    final CountDownLatch cdl2 = new CountDownLatch(1);
    final CountDownLatch cdl3 = new CountDownLatch(1);

    List<Runnable> list = new ArrayList<Runnable>();
    list.add(new Runnable() {
        public void run() {
            System.out.println("Task 1");

            // inform that task 1 is finished
            cdl1.countDown();
        }
    });

    list.add(new Runnable() {
        public void run() {
            // wait until task 1 is finished
            try {
                cdl1.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("Task 2");

            // inform that task 2 is finished
            cdl2.countDown();
        }
    });

    list.add(new Runnable() {
        public void run() {
            // wait until task 2 is finished
            try {
                cdl2.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("Task 3");

            // inform that task 3 is finished
            cdl3.countDown();
        }
    });

    ExecutorService es = Executors.newFixedThreadPool(200);
    for (int i = 0; i < 3; i++) {
        es.submit(list.get(i));
    }

    es.shutdown();
    es.awaitTermination(1, TimeUnit.MINUTES);
}
}
于 2014-06-12T02:49:33.737 回答
2

另一种选择是创建自己的执行程序,将其称为 OrderedExecutor,并创建一个封装的 ThreadPoolExecutor 对象数组,每个内部执行程序有 1 个线程。然后,您提供一种机制来选择其中一个内部对象,例如,您可以通过提供您的类的用户可以实现的接口来做到这一点:

executor = new OrderedExecutor( 10 /* 池大小 */, new OrderedExecutor.Chooser() {
  公共 int 选择(可运行可运行){
     MyRunnable myRunnable = (MyRunnable)runnable;
     返回 myRunnable.someId();
  });

executor.execute(new MyRunnable());

然后,OrderedExecutor.execute() 的实现将使用选择器获取一个 int,您使用池大小对其进行修改,这就是您在内部数组中的索引。想法是“someId()”将为所有“a”等返回相同的值。

于 2010-08-28T20:16:21.843 回答
1

我为这个问题创建了一个 OrderingExecutor。如果您将相同的键传递给具有不同可运行对象的方法 execute(),则具有相同键的可运行对象的执行将按照调用 execute() 的顺序进行,并且永远不会重叠。

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

/**
 * Special executor which can order the tasks if a common key is given.
 * Runnables submitted with non-null key will guaranteed to run in order for the same key.
 *
 */
public class OrderedExecutor {

    private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
            new ConcurrentLinkedQueue<Runnable>());

    private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>();
    private Executor delegate;
    private volatile boolean stopped;

    public OrderedExecutor(Executor delegate) {
        this.delegate = delegate;
    }

    public void execute(Runnable runnable, Object key) {
        if (stopped) {
            return;
        }

        if (key == null) {
            delegate.execute(runnable);
            return;
        }

        Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> {
            v.add(runnable);
            return v;
        });
        if (queueForKey == null) {
            // There was no running task with this key
            Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>());
            newQ.add(runnable);
            // Use putIfAbsent because this execute() method can be called concurrently as well
            queueForKey = taskMap.putIfAbsent(key, newQ);
            if (queueForKey != null)
                queueForKey.add(runnable);
            delegate.execute(new InternalRunnable(key));
        }
    }

    public void shutdown() {
        stopped = true;
        taskMap.clear();
    }

    /**
     * Own Runnable used by OrderedExecutor.
     * The runnable is associated with a specific key - the Queue&lt;Runnable> for this
     * key is polled.
     * If the queue is empty, it tries to remove the queue from taskMap. 
     *
     */
    private class InternalRunnable implements Runnable {

        private Object key;

        public InternalRunnable(Object key) {
            this.key = key;
        }

        @Override
        public void run() {
            while (true) {
                // There must be at least one task now
                Runnable r = taskMap.get(key).poll();
                while (r != null) {
                    r.run();
                    r = taskMap.get(key).poll();
                }
                // The queue emptied
                // Remove from the map if and only if the queue is really empty
                boolean removed = taskMap.remove(key, EMPTY_QUEUE);
                if (removed) {
                    // The queue has been removed from the map,
                    // if a new task arrives with the same key, a new InternalRunnable
                    // will be created
                    break;
                } // If the queue has not been removed from the map it means that someone put a task into it
                  // so we can safely continue the loop
            }
        }
    }

    /**
     * Special Queue implementation, with equals() and hashCode() methods.
     * By default, Java SE queues use identity equals() and default hashCode() methods.
     * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()).
     *
     * @param <E> The type of elements in the queue.
     */
    private static class QueueWithHashCodeAndEquals<E> implements Queue<E> {

        private Queue<E> delegate;

        public QueueWithHashCodeAndEquals(Queue<E> delegate) {
            this.delegate = delegate;
        }

        public boolean add(E e) {
            return delegate.add(e);
        }

        public boolean offer(E e) {
            return delegate.offer(e);
        }

        public int size() {
            return delegate.size();
        }

        public boolean isEmpty() {
            return delegate.isEmpty();
        }

        public boolean contains(Object o) {
            return delegate.contains(o);
        }

        public E remove() {
            return delegate.remove();
        }

        public E poll() {
            return delegate.poll();
        }

        public E element() {
            return delegate.element();
        }

        public Iterator<E> iterator() {
            return delegate.iterator();
        }

        public E peek() {
            return delegate.peek();
        }

        public Object[] toArray() {
            return delegate.toArray();
        }

        public <T> T[] toArray(T[] a) {
            return delegate.toArray(a);
        }

        public boolean remove(Object o) {
            return delegate.remove(o);
        }

        public boolean containsAll(Collection<?> c) {
            return delegate.containsAll(c);
        }

        public boolean addAll(Collection<? extends E> c) {
            return delegate.addAll(c);
        }

        public boolean removeAll(Collection<?> c) {
            return delegate.removeAll(c);
        }

        public boolean retainAll(Collection<?> c) {
            return delegate.retainAll(c);
        }

        public void clear() {
            delegate.clear();
        }

        @Override
        public boolean equals(Object obj) {
            if (!(obj instanceof QueueWithHashCodeAndEquals)) {
                return false;
            }
            QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj;
            return Arrays.equals(toArray(), other.toArray());
        }

        @Override
        public int hashCode() {
            return Arrays.hashCode(toArray());
        }

    }

}
于 2015-07-28T16:48:25.267 回答
0

Habanero-Java 库中,有一个数据驱动任务的概念,可以用来表达任务之间的依赖关系,避免线程阻塞操作。在幕后 Habanero-Java 库使用 JDK 的 ForkJoinPool(即 ExecutorService)。

例如,您的任务 A1、A2、A3、... 的用例可以表示如下:

HjFuture a1 = future(() -> { doA1(); return true; });
HjFuture a2 = futureAwait(a1, () -> { doA2(); return true; });
HjFuture a3 = futureAwait(a2, () -> { doA3(); return true; });

请注意,a1、a2 和 a3 只是对 HjFuture 类型的对象的引用,并且可以在您的自定义数据结构中进行维护,以在任务 A2 和 A3 在运行时进入时指定依赖关系。

有一些教程幻灯片可用。您可以找到更多文档,如javadocAPI 摘要引物

于 2014-04-01T22:23:29.210 回答
0

我已经编写了我赢得的执行程序服务,它是序列感知的。它对包含某些相关参考和当前进行中的任务进行排序。

您可以在https://github.com/nenapu/SequenceAwareExecutorService上完成实现

于 2017-01-24T11:01:33.770 回答