我需要找到一种在java中并行执行任务(依赖和独立)的方法。
- 任务 A 和任务 C 可以独立运行。
- 任务 B 依赖于任务 A 的输出。
我检查了 java.util.concurrent Future 和 Fork/Join,但看起来我们无法向任务添加依赖项。
谁能指出我纠正Java API。
我需要找到一种在java中并行执行任务(依赖和独立)的方法。
我检查了 java.util.concurrent Future 和 Fork/Join,但看起来我们无法向任务添加依赖项。
谁能指出我纠正Java API。
在 Scala 中,这很容易做到,而且我认为您最好使用 Scala。这是我从这里提取的一个示例http://danielwestheide.com/(Scala新手指南第 16 部分:从这里出发)这个人有一个很棒的博客(我不是那个人)
让我们请一位咖啡师煮咖啡。要做的任务是:
或作为一棵树:
Grind _
Coffe \
\
Heat ___\_Brew____
Water \_____Combine
/
Foam ____________/
Milk
在使用并发 api 的 java 中,这将是:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Barrista {
static class HeatWater implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Heating Water");
Thread.sleep(1000);
return "hot water";
}
}
static class GrindBeans implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Grinding Beans");
Thread.sleep(2000);
return "grinded beans";
}
}
static class Brew implements Callable<String> {
final Future<String> grindedBeans;
final Future<String> hotWater;
public Brew(Future<String> grindedBeans, Future<String> hotWater) {
this.grindedBeans = grindedBeans;
this.hotWater = hotWater;
}
@Override
public String call() throws Exception
{
System.out.println("brewing coffee with " + grindedBeans.get()
+ " and " + hotWater.get());
Thread.sleep(1000);
return "brewed coffee";
}
}
static class FrothMilk implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "some milk";
}
}
static class Combine implements Callable<String> {
public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
super();
this.frothedMilk = frothedMilk;
this.brewedCoffee = brewedCoffee;
}
final Future<String> frothedMilk;
final Future<String> brewedCoffee;
@Override
public String call() throws Exception {
Thread.sleep(1000);
System.out.println("Combining " + frothedMilk.get() + " "
+ brewedCoffee.get());
return "Final Coffee";
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));
executor.execute(heatWaterFuture);
executor.execute(grindBeans);
executor.execute(brewCoffee);
executor.execute(frothMilk);
executor.execute(combineCoffee);
try {
/**
* Warning this code is blocking !!!!!!!
*/
System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
e.printStackTrace();
} finally{
executor.shutdown();
}
}
}
确保添加超时以确保您的代码不会永远等待某事完成,这是通过使用 Future.get(long, TimeUnit) 完成的,然后相应地处理失败。
但是在 scala 中要好得多,这里就像在博客上一样:准备一些咖啡的代码看起来像这样:
def prepareCappuccino(): Try[Cappuccino] = for {
ground <- Try(grind("arabica beans"))
water <- Try(heatWater(Water(25)))
espresso <- Try(brew(ground, water))
foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
所有方法都返回一个未来(类型为未来),例如研磨将是这样的:
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
// grinding function contents
}
对于所有实现,请查看博客,但仅此而已。您也可以轻松集成 Scala 和 Java。我真的建议在 Scala 而不是 Java 中做这种事情。Scala 需要更少的代码、更简洁和事件驱动。
具有依赖关系的任务的通用编程模型是Dataflow。每个任务只有一个但重复的依赖的简化模型是Actor 模型。Java 有很多 Actor 库,但用于数据流的却很少。另请参阅:which-actor-model-library-framework-for-java,java-pattern-for-nested-callbacks
使用阻塞队列。将任务 A 的输出放入队列,任务 B 阻塞,直到队列中有可用的内容。
文档包含实现此目的的示例代码:http: //docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html
Java 定义了一个 CompletableFuture 类。
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
这就是你要找的。它有助于构建执行流程。
如果任务 B 依赖于任务 A 的输出,我首先会质疑任务 B 是否真的是一个单独的任务。如果存在以下情况,分离任务将是有意义的:
假设它是一个单独的任务,那么您可以允许任务 A 和 B 共享一个BlockingQueue
,以便任务 A 可以传递任务 B 数据。
你需要的是一个CountDownLatch。
final CountDownLatch gate = new CountDownLatch(2);
// thread a
new Thread() {
public void run() {
// process
gate.countDown();
}
}.start();
// thread c
new Thread() {
public void run() {
// process
gate.countDown();
}
}.start();
new Thread() {
public void run() {
try {
gate.await();
// both thread a and thread c have completed
// process thread b
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
作为替代方案,根据您的场景,您也可以使用BlockingQueue来实现生产者-消费者模式。请参阅文档页面上的示例。
使用这个库https://github.com/familysyan/TaskOrchestration。它为您管理任务依赖性。
有一个专门用于此目的的 java 库(免责声明:我是这个库的所有者),称为Dexecutor
这是您如何获得所需结果的方法,您可以在此处阅读有关它的更多信息
@Test
public void testDependentTaskExecution() {
DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addIndependent("C");
executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);
}
private DefaultDependentTasksExecutor<String, String> newTaskExecutor() {
return new DefaultDependentTasksExecutor<String, String>(newExecutor(), new SleepyTaskProvider());
}
private ExecutorService newExecutor() {
return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}
private static class SleepyTaskProvider implements TaskProvider<String, String> {
public Task<String, String> provid(final String id) {
return new Task<String, String>() {
@Override
public String execute() {
try {
//Perform some task
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = id + "processed";
return result;
}
@Override
public boolean shouldExecute(ExecutionResults<String, String> parentResults) {
ExecutionResult<String, String> firstParentResult = parentResults.getFirst();
//Do some logic with parent result
if ("B".equals(id) && firstParentResult.isSkipped()) {
return false;
}
return true;
}
};
}
}