1

我已经开始玩 Reactor,但我的第一个活动遇到了麻烦:D

按照github中的例子,我尝试写一个“hello world”但没有成功......

问题是什么?

代码:

package reactor;

import static reactor.event.selector.Selectors.$;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.event.Event;
import reactor.function.Consumer;

public class Main {

    public static void main(String[] args) {

        final Environment env = new Environment();

        final Reactor reactor = Reactors.reactor(env);

        String topic = "event.message";

        reactor.on($(topic), new Consumer<Event<Message>>(){

            @Override
            public void accept(Event<Message> t) {
                System.out.println("Hello World");
            }

        });

        final Message event = new Message();
        reactor.notify(topic, Event.wrap(event));
        System.out.println("ends");
    }

    public static class Message{

    }
}

输出:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
ends
4

1 回答 1

2

不要忘记这ReactorReactive Streams范式的实现以及所有的一切async

因此,您reactor.notify(topic, Event.wrap(event));是在单独EventRouter的.handlerThread

所以,你的main头应该等到所有下游工作都完成。

或添加Thread.sleep(1000);and ofmain或用于CoutDownLatch等待来自该 Reactor 线程的事件(com.lmax.disruptor.RingBuffer默认):

final CountDownLatch stopLatch = new CountDownLatch(1);

reactor.on($(topic), new Consumer<Event<Message>>(){

    @Override
    public void accept(Event<Message> t) {
        System.out.println("Hello World");
        stopLatch.countDown();
    }

});
....
stopLatch.await();
于 2014-09-13T12:53:33.043 回答