0

我编写了一个 ExecutorService,用于并行执行多个线程并获得找到的第一个结果,我使用“invokeAny”方法来完成它。

类似于以下内容:

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Test {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        Set<Callable<String>> callables = new HashSet<Callable<String>>();

        callables.add(new Callable<String>() {
            public String call() throws Exception {
                System.out.println("Task 1 started...");
                return "Task 1 Finished";
            }
        });


        callables.add(new Callable<String>() {
            public String call() throws Exception {
                System.out.println("Task2 started...");
                return "Task 2 Finished";
            }
        });

        callables.add(new Callable<String>() {
            public String call() throws Exception {
                return "Task 3 Finished";
            }
        });

        try {
            String result = executorService.invokeAny(callables);
            System.out.println("result = " + result);
        } catch (Throwable e) {
            e.printStackTrace();
        }

        executorService.shutdown();
    }

}

一切正常,但是因为我的代码执行了很多线程,所以使用了大量的资源(CPU)。所以我在网上看了一下,我发现人们建议使用 Quasar 来解决这些问题......根据规范,它应该作为 java“并发”框架工作,所以我看了看,我找到了这个类“ FiberExecutorScheduler ”,这似乎很有希望,但我找不到一种方法来组织我的工作以实现与我以前的代码相同的行为。

有没有办法用 Quasar 来实现它?你能举个例子吗?任何帮助将不胜感激。

4

1 回答 1

0

根据 Fiber 文档,您可以按照以下代码将 Callable 转换为 Fiber,其中 V 是您的返回类型。

new Fiber<V>() {
  @Override
  protected V run() throws SuspendExecution, InterruptedException {
        // your code
    }
}.start();

您可以将您的转换ExecutorService为 a FiberExecutorScheduler,您可以在其中调用newFiber以将您的光纤添加到执行程序。

我发现这个库的文档非常缺乏。没有使用示例,即使在他们自己的网站上也是如此。查看他们的 repo,我发现了以下测试用例。

@Test
public void simpleTest1() throws Exception {
    final CompletableFuture<String> fut = new CompletableFuture<String>();

    final Fiber<String> fiber = new Fiber<>(scheduler, () -> {
        try {
            return AsyncCompletionStage.get(fut);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }).start();

    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(200);
                fut.complete("hi!");
            } catch (InterruptedException e) {
            }
        }
    }).start();

    assertThat(fiber.get(), equalTo("hi!"));

}

如果我是你,我会尝试根据这个测试用例继续下去。

于 2018-03-29T14:33:03.190 回答