7

我有这样的服务:

class DemoService {
    Result process(Input in) {
        filter1(in);
        if (filter2(in)) return...
        filter3(in);
        filter4(in);
        filter5(in);
        return ...

    }
}

现在我想要它更快,我发现有些过滤器可以同时启动,而有些过滤器必须等待其他过滤器完成。例如:

filter1--
         |---filter3--
filter2--             |---filter5
          ---filter4--

意思是:

1.filter1和filter2可以同时启动,filter3和filter4也可以同时启动

2.filter3和filter4必须等待filter2完成

还有一件事

如果 filter2 返回 true,则 'process' 方法立即返回并忽略以下过滤器。

现在我的解决方案是使用 FutureTask:

            // do filter's work at FutureTask
        for (Filter filter : filters) {
            FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
            executorService.execute(futureTask);
        }

        //when all FutureTask are submitted, wait for result
        for(Filter filter : filters) {
            if (filter.isReturnNeeded()) {
                FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
                riskResult = futureTask.get();
                if (canReturn(filter, riskResult)) {
                    returnOk = true;
                    return riskResult;
                }
            }
        }

我的可调用过滤器:

public class CallableFilter implements Callable<RiskResult> {

    private Filter filter;
    private Context context;

    @Override
    public RiskResult call() throws Exception {
        List<Filter> dependencies = filter.getDependentFilters();
        if (dependencies != null && dependencies.size() > 0) {

            //wait for its dependency filters to finish
            for (Filter d : dependencies) {
                FutureTask<RiskResult> futureTask = context.getTask(d.getId());
                futureTask.get();

            }
        }

        //do its own work
        return filter.execute(context);
    }
}

我想知道:

1.在案例中使用FutureTask是个好主意吗?有更好的解决方案吗?

2.线程上下文切换的开销。

谢谢!

4

2 回答 2

6

在 Java 8 中,您可以使用CompletableFuture将您的过滤器链接在一起。使用 thenApply 和 thenCompose 系列方法将新的异步过滤器添加到 CompletableFuture - 它们将在上一步完成后执行。thenCombine 当两个独立的 CompletableFuture 都完成后合并。使用 allOf 等待两个以上 CompletableFuture 对象的结果。

如果你不能使用 Java 8,那么 Guava ListenableFuture可以做同样的事情,请参阅Listenable Future Explained。使用 Guava,您可以等待多个独立运行的过滤器完成 Futures.allAsList - 这也返回一个 ListenableFuture。

使用这两种方法的想法是,在您声明未来的操作、它们之间的依赖关系以及它们的线程之后,您将返回一个 Future 对象,该对象封装了您的最终结果。

编辑:可以通过使用 complete() 方法显式完成 CompletableFuture 或使用 Guava SettableFuture(实现 ListenableFuture)来实现早期返回

于 2015-03-02T18:40:00.437 回答
1

您可以使用 aForkJoinPool进行并行化,这是专门针对这种并行计算考虑的:

(...) 方法 join() 及其变体仅适用于完成依赖是非循环的;也就是说,并行计算可以描述为有向无环图 (DAG) (...)

(见ForkJoinTask

a 的优点ForkJoinPool是每个任务都可以生成新任务并等待它们完成而不会实际阻塞正在执行的线程(否则,如果等待其他任务完成的任务多于可用线程,则可能会导致死锁)。

这是一个到目前为止应该可以使用的示例,尽管它还有一些限制:

  1. 它忽略过滤结果
  2. 如果过滤器 2 返回,它不会提前完成执行true
  3. 未实现异常处理

此代码背后的主要思想:每个过滤器都表示为Node可能依赖于其他节点(= 过滤器必须在此过滤器执行之前完成)。依赖节点作为并行任务产生。

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Node<V> extends RecursiveTask<V> {
    private static final short VISITED = 1;

    private final Callable<V> callable;
    private final Set<Node<V>> dependencies = new HashSet<>();

    @SafeVarargs
    public Node(Callable<V> callable, Node<V>... dependencies) {
        this.callable = callable;
        this.dependencies.addAll(Arrays.asList(dependencies));
    }

    public Set<Node<V>> getDependencies() {
        return this.dependencies;
    }

    @Override
    protected V compute() {
        try {
            // resolve dependencies first
            for (Node<V> node : dependencies) {
                if (node.tryMarkVisited()) {
                    node.fork(); // start node
                }
            }

            // wait for ALL nodes to complete
            for (Node<V> node : dependencies) {
                node.join();
            }

            return callable.call();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }

    public boolean tryMarkVisited() {
        return compareAndSetForkJoinTaskTag((short) 0, VISITED);
    }
}

使用示例:

public static void main(String[] args) {
    Node<Void> filter1 = new Node<>(filter("filter1"));
    Node<Void> filter2 = new Node<>(filter("filter2"));
    Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2);
    Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2);
    Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4);
    Node<Void> root = new Node<>(() -> null, filter5);

    ForkJoinPool.commonPool().invoke(root);
}

public static Callable<Void> filter(String name) {
    return () -> {
        System.out.println(Thread.currentThread().getName() + ": start " + name);
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + ": end   " + name);
        return null;
    };
}
于 2015-03-03T10:00:26.150 回答