2

我一直对 Quasar 及其轻量级 Fibers 作为 Threads 的替代品感到好奇。在查阅了他们的API 文档后,我无法弄清楚如何将典型的ThreadPoolExecutor转换为 Fibers 池。

int maxThreadPoolSize = 10;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        maxThreadPoolSize,
        maxThreadPoolSize,
        10, TimeUnit.MINUTES,
        new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy()
);

for (int i = 0; i < 100; i++) {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // run some code
        }
    });
}

上面的代码创建了一个有 10 个线程的池,池前面的一个队列可以容纳 10 个元素和一个拒绝策略(当队列满时)让主线程自己执行一个 Runnable 任务。由于for循环创建了100个runnable,它们将在池中一次执行10个,10个排队,主线程自己拿起一个Runnable,直到其他线程完成,之后主线程返回将Runnables添加到executor。

您将如何使用 Quasar 的 Fibers 来做到这一点?它是不是一开始就应该这样使用?


编辑:我最初的问题措辞不佳。本质上,我试图找到一种机制来限制可以同时运行的 Fiber 数量。例如,如果已经有 200 个 Fiber 在运行,则不要启动更多 Fiber。如果最大数量的光纤正在运行,请等到一个完成后再启动一个新的。

4

4 回答 4

0

每个纤程由一个 FiberScheduler 调度,当您创建一个没有调度器的 Fiber 时,将创建一个 FiberForkJoinScheduler 并分配给该纤程。

简而言之,如果你想在线程池中管理你的纤程,使用 FiberExecutorScheduler: Quasar 的关于调度纤程的文档
你的代码可能是这样的

    int maxThreadPoolSize = 10;
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            maxThreadPoolSize,
            maxThreadPoolSize,
            10, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    FiberExecutorScheduler scheduler = new FiberExecutorScheduler("FibersInAPool", executor);
    for (int i = 0; i < 100; i++) {
        Fiber fiber = new Fiber<Void>(scheduler
                , new SuspendableCallable<Void>() {
            @Override
            public Void run() throws SuspendExecution, InterruptedException {
                // run some code
                return null;
            }
        });
        fiber.start();
    }
于 2018-11-30T04:34:26.617 回答
0

纤程非常便宜,因此您根本不需要池化(及其异步作业调度模型):只需启动纤程并让它运行常规顺序代码,每次您需要一个新的顺序进程与其他进程同时运行。

于 2016-10-09T12:56:21.583 回答
-1

java.util.concurrent.Semaphore 最终在我的特定设置中运行良好。

我的解决方案的一般要点:

  • 创建具有所需最大许可数的信号量(又名最大并发光纤)
  • 主线程负责从队列中提取要处理的任务
  • 主线程调用 semaphore.acquire():
    • 如果许可可用,则启动新的 Fiber 来处理任务
    • 如果获得所有许可,则信号量将阻塞主线程并等待直到许可可用
  • 一旦 Fiber 启动,主线程就会重复其逻辑。从队列中选择一个新任务并尝试启动一个新的 Fiber。

奖励:标准 Java 的 Semaphore 是固定的,不能动态调整许可数量。为了使它动态,这个链接派上用场了:http: //blog.teamlazerbeez.com/2009/04/20/javas-semaphore-resizing/

于 2016-12-08T20:31:03.210 回答
-2

我们刚刚发布了 kilim 2.0。它提供了一个 Fiber 和 Actor 实现(类似于 quasar)并由 ThreadPoolExecutor 支持

限制并发任务数量的最有效方法是将一个任务用作控制器并收听邮箱(我认为 quasar 调用这些通道)并维护正在运行的任务的计数。当每个任务完成时,向邮箱发送消息

通常,使用比内核更多的线程是没有意义的

于 2016-12-09T02:00:04.137 回答