81

这个问题与我对Java 中现有协程实现的问题有关。如果,正如我所怀疑的那样,Java 中目前没有完整的协程实现,那么实现它们需要什么?

正如我在那个问题中所说,我知道以下内容:

  1. 您可以在幕后将“协程”实现为线程/线程池。
  2. 您可以在幕后使用 JVM 字节码做一些棘手的事情,以使协同程序成为可能。
  3. 所谓的“达芬奇机器”JVM 实现具有使协程在没有字节码操作的情况下可行的原语。
  4. 也有各种基于 JNI 的协程方法。

我将依次解决每个人的不足之处。

基于线程的协程

这种“解决方案”是病态的。协程的重点是避免线程、锁定、内核调度等的开销。协程应该是轻量级和快速的,并且只在用户空间中执行。用严格限制的全倾斜线程来实现它们会消除所有优势。

JVM 字节码操作

这个解决方案更实用,尽管有点难以实现。这与跳入 C 中的协程库的汇编语言(这是其中有多少工作)大致相同,其优势在于您只需担心一种架构并正确处理。

它还将您限制在仅在完全兼容的 JVM 堆栈(这意味着,例如,没有 Android)上运行您的代码,除非您可以找到一种方法在不兼容的堆栈上执行相同的操作。但是,如果您确实找到了一种方法来做到这一点,那么您现在的系统复杂性和测试需求就会增加一倍。

达芬奇机器

达芬奇机器很适合实验,但由于它不是标准的 JVM,它的功能不会随处可用。事实上,我怀疑大多数生产环境都会明确禁止使用达芬奇机器。因此我可以用它来做很酷的实验,但不能用于我希望发布到现实世界的任何代码。

这还有一个类似于上面的 JVM 字节码操作解决方案的附加问题:无法在替代堆栈(如 Android 的)上工作。

JNI 实现

这个解决方案使在 Java 中执行此操作的意义完全没有实际意义。CPU 和操作系统的每种组合都需要独立测试,并且每种组合都可能导致令人沮丧的细微故障。或者,当然,我可以将自己完全束缚在一个平台上,但这也使得在 Java 中做事的意义完全没有意义。

所以...

有没有办法在不使用这四种技术之一的情况下在 Java 中实现协程?还是我会被迫改用这四种气味中最难闻的一种(JVM 操作)?


编辑添加:

只是为了确保包含混淆,这是一个与我的另一个相关的问题,但不一样。那个人正在寻找一个现有的实现,以避免不必要地重新发明轮子。这个问题与如果另一个证明无法回答,一个人将如何在 Java 中实现协程有关。目的是在不同的线程上保留不同的问题。

4

8 回答 8

44

我会看看这个: http: //www.chiark.greenend.org.uk/~sgtatham/coroutines.html,它非常有趣,应该提供一个很好的起点。但是当然我们使用的是 Java,所以我们可以做得更好(或者可能更糟,因为没有宏 :))

根据我对协程的理解,您通常有一个生产者和一个消费者协程(或者至少这是最常见的模式)。但是从语义上讲,您不希望生产者调用消费者或反之亦然,因为这会引入不对称性。但是考虑到基于堆栈的语言的工作方式,我们需要有人进行调用。

所以这是一个非常简单的类型层次结构:

public interface CoroutineProducer<T>
{
    public T Produce();
    public boolean isDone();
}

public interface CoroutineConsumer<T>
{
    public void Consume(T t);
}

public class CoroutineManager
{
    public static Execute<T>(CoroutineProducer<T> prod, CoroutineConsumer<T> con)
    {
        while(!prod.IsDone()) // really simple
        {
            T d = prod.Produce();
            con.Consume(d);
        }
    }
}

现在当然困难的部分是实现接口,特别是很难将计算分解为单独的步骤。为此,您可能需要一整套其他的持久控制结构。基本思想是我们想要模拟控制的非本地转移(最终它有点像我们在模拟 a goto)。pc我们基本上希望通过将当前操作的状态保留在堆中而不是堆栈中来摆脱使用堆栈和(程序计数器)。因此,我们将需要一堆帮助类。

例如:

假设在理想世界中,您想编写一个看起来像这样的消费者(伪代码):

boolean is_done;
int other_state;
while(!is_done)
{
    //read input
    //parse input
    //yield input to coroutine
    //update is_done and other_state;
}

我们需要抽象局部变量 likeis_done并且other_state我们需要抽象 while 循环本身,因为我们的yieldlike 操作不会使用堆栈。所以让我们创建一个while循环抽象和相关的类:

enum WhileState {BREAK, CONTINUE, YIELD}
abstract class WhileLoop<T>
{
    private boolean is_done;
    public boolean isDone() { return is_done;}
    private T rval;
    public T getReturnValue() {return rval;} 
    protected void setReturnValue(T val)
    {
        rval = val;
    }


    public T loop()
    {
        while(true)
        {
            WhileState state = execute();
            if(state == WhileState.YIELD)
                return getReturnValue();
            else if(state == WhileState.BREAK)
                    {
                       is_done = true;
                return null;
                    }
        }
    }
    protected abstract WhileState execute();
}

这里的基本技巧是将局部变量移动为变量并将范围块转换为类,这使我们能够在产生返回值后“重新进入”我们的“循环”。

现在实现我们的生产者

public class SampleProducer : CoroutineProducer<Object>
{
    private WhileLoop<Object> loop;//our control structures become state!!
    public SampleProducer()
    {
        loop = new WhileLoop()
        {
            private int other_state;//our local variables become state of the control structure
            protected WhileState execute() 
            {
                //this implements a single iteration of the loop
                if(is_done) return WhileState.BREAK;
                //read input
                //parse input
                Object calcluated_value = ...;
                //update is_done, figure out if we want to continue
                setReturnValue(calculated_value);
                return WhileState.YIELD;
            }
        };
    }
    public Object Produce()
    {
        Object val = loop.loop();
        return val;
    }
    public boolean isDone()
    {
        //we are done when the loop has exited
        return loop.isDone();
    }
}

可以对其他基本控制流结构进行类似的技巧。理想情况下,您将构建这些帮助类的库,然后使用它们来实现这些简单的接口,最终将为您提供协同例程的语义。我确信我在这里写的所有内容都可以被概括和扩展。

于 2010-05-17T06:02:12.823 回答
12

我建议在 JVM 上查看 Kotlin 协程。不过,它属于不同的类别。不涉及字节码操作,它也适用于 Android。但是,您将不得不在 Kotlin 中编写您的协程。好处是 Kotlin 在设计时考虑了与 Java 的互操作性,因此您仍然可以继续使用所有 Java 库并在同一个项目中自由组合 Kotlin 和 Java 代码,甚至将它们并排放在同一个目录中包。

kotlinx.coroutines 指南提供了更多示例,而协程设计文档解释了所有动机、用例和实现细节。

于 2017-04-05T10:10:49.997 回答
6

Kotlin 对协程使用以下方法
(来自https://kotlinlang.org/docs/reference/coroutines.html):

协程完全通过编译技术实现(不需要VM或OS端的支持),暂停通过代码转换工作。基本上,每个挂起函数(可能会应用优化,但我们不会在这里讨论)都转换为状态机,其中状态对应于挂起调用。就在暂停之前,下一个状态与相关的局部变量等一起存储在编译器生成的类的字段中。在该协程恢复时,局部变量被恢复,状态机从暂停后的状态开始。

挂起的协程可以作为保持其挂起状态和局部变量的对象进行存储和传递。此类对象的类型是 Continuation,这里描述的整体代码转换对应于经典的 Continuation-passing 风格。因此,挂起函数在底层采用了一个额外的 Continuation 参数。

在https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md查看设计文档

于 2017-04-06T04:08:09.400 回答
4

我刚刚遇到了这个问题,只想提一下,我认为有可能以类似于 C# 的方式实现协程或生成器。也就是说,我实际上并没有使用 Java,但 CIL 的限制与 JVM 非常相似。

C# 中的yield 语句是一种纯语言特性,不是 CIL 字节码的一部分。C# 编译器只是为每个生成器函数创建一个隐藏的私有类。如果在函数中使用 yield 语句,它必须返回一个 IEnumerator 或一个 IEnumerable。编译器将您的代码“打包”到类似状态机的类中。

C# 编译器可能会在生成的代码中使用一些“goto”,以便更轻松地转换为状态机。我不知道 Java 字节码的功能以及是否有类似无条件跳转的东西,但在“汇编级别”通常是可能的。

如前所述,此功能必须在编译器中实现。因为我对 Java 和它的编译器知之甚少,所以我不知道是否可以更改/扩展编译器,可能使用“预处理器”或其他东西。

我个人喜欢协程。作为一名 Unity 游戏开发人员,我经常使用它们。因为我用 ComputerCraft 玩了很多 Minecraft,我很好奇为什么 Lua (LuaJ) 中的协程是用线程实现的。

于 2013-04-20T12:13:48.943 回答
2

Oracle还有Quasar for Java 和Project Loom,它们对 JVM 进行了扩展以实现纤程和延续。这是Loom在 Youtoube 上的演示。还有几个。只需稍加搜索即可轻松找到。

于 2019-06-11T14:00:29.540 回答
1

Project Loom: https : //jdk.java.net/loom/ 向 Java 介绍 Continuations。一个例子:

static final ContinuationScope scope=new ContinuationScope("TST");

public static void main(String[] args) {
    example1();
}

// *********************************************************************
// *** EXAMPLE 1: Co-routine with three active phases:
// *********************************************************************

public static void example1() {
    
    Continuation coroutine=new Continuation(scope,new Runnable() {
        public void run() {
            System.out.println("Part 1 - Statements");
            Continuation.yield(scope); // DETACH 1
            System.out.println("Part 2 - Statements");
            Continuation.yield(scope); // DETACH 2
            System.out.println("Part 3 - Statements");
        }});
    
    
    coroutine.run(); // Vil utføre Part 1.
    System.out.println("Returns here after first DETACH(Yield)");
    coroutine.run(); // Vil utføre Part 2.
    System.out.println("Returns here after second DETACH(Yield)");
    coroutine.run(); // Vil utføre Part 3.
    System.out.println("Returns here after 'FINAL END'");
    System.out.println("Next line should be: IllegalStateException: Continuation terminated");
    coroutine.run(); // IllegalStateException: Continuation terminated
}
于 2021-04-07T15:43:01.460 回答
0

我有一个在 Java 中使用的 Coroutine 类。它基于线程,使用线程的优点是允许并行操作,这在多核机器上可能是一个优势。因此,您可能需要考虑基于线程的方法。

于 2012-09-04T10:33:05.063 回答
0

Java6+ 有另一种选择

pythonic协程实现:

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

class CorRunRAII {
    private final List<WeakReference<? extends CorRun>> resources = new ArrayList<>();

    public CorRunRAII add(CorRun resource) {
        if (resource == null) {
            return this;
        }
        resources.add(new WeakReference<>(resource));

        return this;
    }

    public CorRunRAII addAll(List<? extends CorRun> arrayList) {
        if (arrayList == null) {
            return this;
        }
        for (CorRun corRun : arrayList) {
            add(corRun);
        }

        return this;
    }

    @Override
    protected void finalize() throws Throwable {
        super.finalize();

        for (WeakReference<? extends CorRun> corRunWeakReference : resources) {
            CorRun corRun = corRunWeakReference.get();
            if (corRun != null) {
                corRun.stop();
            }
        }
    }
}

class CorRunYieldReturn<ReceiveType, YieldReturnType> {
    public final AtomicReference<ReceiveType> receiveValue;
    public final LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue;

    CorRunYieldReturn(AtomicReference<ReceiveType> receiveValue, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) {
        this.receiveValue = receiveValue;
        this.yieldReturnValue = yieldReturnValue;
    }
}

interface CorRun<ReceiveType, YieldReturnType> extends Runnable, Callable<YieldReturnType> {
    boolean start();
    void stop();
    void stop(final Throwable throwable);
    boolean isStarted();
    boolean isEnded();
    Throwable getError();

    ReceiveType getReceiveValue();
    void setResultForOuter(YieldReturnType resultForOuter);
    YieldReturnType getResultForOuter();

    YieldReturnType receive(ReceiveType value);
    ReceiveType yield();
    ReceiveType yield(YieldReturnType value);
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another);
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value);
}

abstract class CorRunSync<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> {

    private ReceiveType receiveValue;
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList = new ArrayList<>();

    // Outside

    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AtomicBoolean isEnded = new AtomicBoolean(false);
    private Throwable error;

    private YieldReturnType resultForOuter;

    @Override
    public boolean start() {

        boolean isStarted = this.isStarted.getAndSet(true);
        if ((! isStarted)
                && (! isEnded())) {
            receive(null);
        }

        return isStarted;
    }

    @Override
    public void stop() {
        stop(null);
    }

    @Override
    public void stop(Throwable throwable) {
        isEnded.set(true);
        if (throwable != null) {
            error = throwable;
        }

        for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) {
            CorRun child = weakReference.get();
            if (child != null) {
                child.stop();
            }
        }
    }

    @Override
    public boolean isStarted() {
        return isStarted.get();
    }

    @Override
    public boolean isEnded() {
        return isEnded.get();
    }

    @Override
    public Throwable getError() {
        return error;
    }

    @Override
    public ReceiveType getReceiveValue() {
        return receiveValue;
    }

    @Override
    public void setResultForOuter(YieldReturnType resultForOuter) {
        this.resultForOuter = resultForOuter;
    }

    @Override
    public YieldReturnType getResultForOuter() {
        return resultForOuter;
    }

    @Override
    public synchronized YieldReturnType receive(ReceiveType value) {
        receiveValue = value;

        run();

        return getResultForOuter();
    }

    @Override
    public ReceiveType yield() {
        return yield(null);
    }

    @Override
    public ReceiveType yield(YieldReturnType value) {
        resultForOuter = value;
        return receiveValue;
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another) {
        return yieldFrom(another, null);
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another, TargetReceiveType value) {
        if (another == null || another.isEnded()) {
            throw new RuntimeException("Call null or isEnded coroutine");
        }

        potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another));

        synchronized (another) {
            boolean isStarted = another.start();
            boolean isJustStarting = ! isStarted;
            if (isJustStarting && another instanceof CorRunSync) {
                return another.getResultForOuter();
            }

            return another.receive(value);
        }
    }

    @Override
    public void run() {
        try {
            this.call();
        }
        catch (Exception e) {
            e.printStackTrace();

            stop(e);
            return;
        }
    }
}

abstract class CorRunThread<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> {

    private final ExecutorService childExecutorService = newExecutorService();
    private ExecutorService executingOnExecutorService;

    private static final CorRunYieldReturn DUMMY_COR_RUN_YIELD_RETURN = new CorRunYieldReturn(new AtomicReference<>(null), new LinkedBlockingDeque<AtomicReference>());

    private final CorRun<ReceiveType, YieldReturnType> self;
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList;
    private CorRunYieldReturn<ReceiveType, YieldReturnType> lastCorRunYieldReturn;

    private final LinkedBlockingDeque<CorRunYieldReturn<ReceiveType, YieldReturnType>> receiveQueue;

    // Outside

    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AtomicBoolean isEnded = new AtomicBoolean(false);
    private Future<YieldReturnType> future;
    private Throwable error;

    private final AtomicReference<YieldReturnType> resultForOuter = new AtomicReference<>();

    CorRunThread() {
        executingOnExecutorService = childExecutorService;

        receiveQueue = new LinkedBlockingDeque<>();
        potentialChildrenCoroutineList = new ArrayList<>();

        self = this;
    }

    @Override
    public void run() {
        try {
            self.call();
        }
        catch (Exception e) {
            stop(e);
            return;
        }

        stop();
    }

    @Override
    public abstract YieldReturnType call();

    @Override
    public boolean start() {
        return start(childExecutorService);
    }

    protected boolean start(ExecutorService executorService) {
        boolean isStarted = this.isStarted.getAndSet(true);
        if (!isStarted) {
            executingOnExecutorService = executorService;
            future = (Future<YieldReturnType>) executingOnExecutorService.submit((Runnable) self);
        }
        return isStarted;
    }

    @Override
    public void stop() {
        stop(null);
    }

    @Override
    public void stop(final Throwable throwable) {
        if (throwable != null) {
            error = throwable;
        }
        isEnded.set(true);

        returnYieldValue(null);
        // Do this for making sure the coroutine has checked isEnd() after getting a dummy value
        receiveQueue.offer(DUMMY_COR_RUN_YIELD_RETURN);

        for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) {
            CorRun child = weakReference.get();
            if (child != null) {
                if (child instanceof CorRunThread) {
                    ((CorRunThread)child).tryStop(childExecutorService);
                }
            }
        }

        childExecutorService.shutdownNow();
    }

    protected void tryStop(ExecutorService executorService) {
        if (this.executingOnExecutorService == executorService) {
            stop();
        }
    }

    @Override
    public boolean isEnded() {
        return isEnded.get() || (
                future != null && (future.isCancelled() || future.isDone())
                );
    }

    @Override
    public boolean isStarted() {
        return isStarted.get();
    }

    public Future<YieldReturnType> getFuture() {
        return future;
    }

    @Override
    public Throwable getError() {
        return error;
    }

    @Override
    public void setResultForOuter(YieldReturnType resultForOuter) {
        this.resultForOuter.set(resultForOuter);
    }

    @Override
    public YieldReturnType getResultForOuter() {
        return this.resultForOuter.get();
    }

    @Override
    public YieldReturnType receive(ReceiveType value) {

        LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue = new LinkedBlockingDeque<>();

        offerReceiveValue(value, yieldReturnValue);

        try {
            AtomicReference<YieldReturnType> takeValue = yieldReturnValue.take();
            return takeValue == null ? null : takeValue.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

    @Override
    public ReceiveType yield() {
        return yield(null);
    }

    @Override
    public ReceiveType yield(final YieldReturnType value) {
        returnYieldValue(value);

        return getReceiveValue();
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another) {
        return yieldFrom(another, null);
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value) {
        if (another == null || another.isEnded()) {
            throw new RuntimeException("Call null or isEnded coroutine");
        }

        boolean isStarted = false;
        potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another));

        synchronized (another) {
            if (another instanceof CorRunThread) {
                isStarted = ((CorRunThread)another).start(childExecutorService);
            }
            else {
                isStarted = another.start();
            }

            boolean isJustStarting = ! isStarted;
            if (isJustStarting && another instanceof CorRunSync) {
                return another.getResultForOuter();
            }

            TargetYieldReturnType send = another.receive(value);
            return send;
        }
    }

    @Override
    public ReceiveType getReceiveValue() {

        setLastCorRunYieldReturn(takeLastCorRunYieldReturn());

        return lastCorRunYieldReturn.receiveValue.get();
    }

    protected void returnYieldValue(final YieldReturnType value) {
        CorRunYieldReturn<ReceiveType, YieldReturnType> corRunYieldReturn = lastCorRunYieldReturn;
        if (corRunYieldReturn != null) {
            corRunYieldReturn.yieldReturnValue.offer(new AtomicReference<>(value));
        }
    }

    protected void offerReceiveValue(final ReceiveType value, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) {
        receiveQueue.offer(new CorRunYieldReturn(new AtomicReference<>(value), yieldReturnValue));
    }

    protected CorRunYieldReturn<ReceiveType, YieldReturnType> takeLastCorRunYieldReturn() {
        try {
            return receiveQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

    protected void setLastCorRunYieldReturn(CorRunYieldReturn<ReceiveType,YieldReturnType> lastCorRunYieldReturn) {
        this.lastCorRunYieldReturn = lastCorRunYieldReturn;
    }

    protected ExecutorService newExecutorService() {
        return Executors.newCachedThreadPool(getThreadFactory());
    }

    protected ThreadFactory getThreadFactory() {
        return new ThreadFactory() {
            @Override
            public Thread newThread(final Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                    @Override
                    public void uncaughtException(Thread thread, Throwable throwable) {
                        throwable.printStackTrace();
                        if (runnable instanceof CorRun) {
                            CorRun self = (CorRun) runnable;
                            self.stop(throwable);
                            thread.interrupt();
                        }
                    }
                });
                return thread;
            }
        };
    }
}

现在您可以通过这种方式使用 pythonic 协程(例如斐波那契数)

线程版本:

class Fib extends CorRunThread<Integer, Integer> {

    @Override
    public Integer call() {
        Integer times = getReceiveValue();
        do {
            int a = 1, b = 1;
            for (int i = 0; times != null && i < times; i++) {
                int temp = a + b;
                a = b;
                b = temp;
            }
            // A pythonic "yield", i.e., it returns `a` to the caller and waits `times` value from the next caller
            times = yield(a);
        } while (! isEnded());

        setResultForOuter(Integer.MAX_VALUE);
        return getResultForOuter();
    }
}

class MainRun extends CorRunThread<String, String> {

    @Override
    public String call() {

        // The fib coroutine would be recycled by its parent
        // (no requirement to call its start() and stop() manually)
        // Otherwise, if you want to share its instance and start/stop it manually,
        // please start it before being called by yieldFrom() and stop it in the end.
        Fib fib = new Fib();
        String result = "";
        Integer current;
        int times = 10;
        for (int i = 0; i < times; i++) {

            // A pythonic "yield from", i.e., it calls fib with `i` parameter and waits for returned value as `current`
            current = yieldFrom(fib, i);

            if (fib.getError() != null) {
                throw new RuntimeException(fib.getError());
            }

            if (current == null) {
                continue;
            }

            if (i > 0) {
                result += ",";
            }
            result += current;

        }

        setResultForOuter(result);

        return result;
    }
}

同步(非线程)版本:

class Fib extends CorRunSync<Integer, Integer> {

    @Override
    public Integer call() {
        Integer times = getReceiveValue();

        int a = 1, b = 1;
        for (int i = 0; times != null && i < times; i++) {
            int temp = a + b;
            a = b;
            b = temp;
        }
        yield(a);

        return getResultForOuter();
    }
}

class MainRun extends CorRunSync<String, String> {

    @Override
    public String call() {

        CorRun<Integer, Integer> fib = null;
        try {
            fib = new Fib();
        } catch (Exception e) {
            e.printStackTrace();
        }

        String result = "";
        Integer current;
        int times = 10;
        for (int i = 0; i < times; i++) {

            current = yieldFrom(fib, i);

            if (fib.getError() != null) {
                throw new RuntimeException(fib.getError());
            }

            if (current == null) {
                continue;
            }

            if (i > 0) {
                result += ",";
            }
            result += current;
        }

        stop();
        setResultForOuter(result);

        if (Utils.isEmpty(result)) {
            throw new RuntimeException("Error");
        }

        return result;
    }
}

执行(两个版本都可以):

// Run the entry coroutine
MainRun mainRun = new MainRun();
mainRun.start();

// Wait for mainRun ending for 5 seconds
long startTimestamp = System.currentTimeMillis();
while(!mainRun.isEnded()) {
    if (System.currentTimeMillis() - startTimestamp > TimeUnit.SECONDS.toMillis(5)) {
        throw new RuntimeException("Wait too much time");
    }
}
// The result should be "1,1,2,3,5,8,13,21,34,55"
System.out.println(mainRun.getResultForOuter());
于 2017-11-24T08:47:04.790 回答