-2

com.offbynull.coroutines 版本 1.1.0 消费者仅消费 7500 条消息。

请帮助我理解为什么这段代码只使用7500消息而不是30000.


public class DemoProducerConsumer {

    public static int cnt = 0;
    public static final int MAX = 10000;

    public static class Producer implements Coroutine {

        @Override
        public void run(Continuation ctn) throws Exception {
            String thName = Thread.currentThread().getName();
            System.out.println(thName + ") Producer starting...");
            Consumer consumer = new Consumer();
            for (int i = 0; i < 3; i++) {
                consumer.consume(ctn, "Hello:" + i);
            }
            System.out.println(thName + ") Producer published 3 messages");
        }
    }

    public static class Consumer {    
        public void consume(Continuation ctn, String message) {
            String thName = Thread.currentThread().getName();
            System.out.println(thName + ")" + message);
            cnt++;  // <<< SUSPECT bug here.
            ctn.suspend();  // <<< SUSPECT bug here.
        }
    }

    public static final void main(String... args) throws InterruptedException {

        String thName = Thread.currentThread().getName();
        System.err.println(thName + ") Preparing Producer ");

        new Thread(new Runnable() {
            public void run() {
                cnt = 0;
                Producer producer = new Producer();
                CoroutineRunner runner = new CoroutineRunner(producer);
                for (int i = 0; i < MAX; i++) {
                    runner.execute();
                }
                System.out.println(thName + ") Producer Looped " + MAX + " times.");
            }
        }).start();
        System.err.println(thName + ") Waiting " + (MAX * 3) + " message to be consumed...");
        Thread.sleep(10000);
        System.err.println(thName + ") Message consumed:" + cnt);
        System.err.println(thName + ") Exiting...");
    }    
}

我计划用它Thread Pool来实现更高性能的 MVC 服务器。

消费者和生产者的分离是必须的。

4

1 回答 1

1

这里是协程的作者。您似乎误解了 execute() 方法的工作原理。每次调用suspend(),execute()都会返回。当您再次调用 execute() 时,它将从您暂停的点继续执行该方法。

因此,如果您想完全执行您的协程 MAX 次,您需要将主循环更改为以下内容:

for (int i = 0; i < MAX; i++) {
    boolean stillExecuting;
    do {
        stillExecuting = runner.execute();
    } while (stillExecuting);
}

除此之外,由于您正在从单独的线程访问字段 cnt,您可能应该将 cnt 标记为 volatile:

public static volatile int cnt = 0;

使用上述更改运行会产生您对输出的期望:

main) Producer Looped 10000 times.
main) Message consumed:30000
main) Exiting...

此外,您应该花一些时间评估协程是否适合您的用例。我不明白您要解决的问题,但听起来普通的 Java 线程构造可能更合适。

于 2015-06-07T19:37:24.613 回答