1

是否可以使用 cyclops-react 库同时拥有订阅者?例如,如果运行以下代码:

    ReactiveSeq<Integer> initialStream = ReactiveSeq.of(1, 2, 3, 4, 5, 6);

    ReactiveSubscriber<Integer> sub1 = Spouts.reactiveSubscriber();
    ReactiveSubscriber<Integer> sub2 = Spouts.reactiveSubscriber();

    FutureStream<Integer> futureStream = FutureStream.builder().fromStream(initialStream)
            .map(v -> v -1);

    futureStream.subscribe(sub1);
    futureStream.subscribe(sub2);

    CompletableFuture future1 = CompletableFuture.runAsync(() -> sub1.reactiveStream().forEach(v -> System.out.println("1 -> " + v)));
    CompletableFuture future2 = CompletableFuture.runAsync(() -> sub2.reactiveStream().forEach(v -> System.out.println("2 -> " + v)));

    try {
        future1.get();
        future2.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

我得到以下结果:

1 -> 0 
2 -> 0
2 -> 1
1 -> 0
1 -> 1
1 -> 1
2 -> 2
2 -> 3
2 -> 4
2 -> 5
1 -> 2
1 -> 2
1 -> 3
1 -> 4
1 -> 5
1 -> 3
1 -> 4
1 -> 5

我在订阅者流中得到重复的值。提前感谢您的帮助。

4

1 回答 1

0

cyclops-react 仅支持单个订阅者。我认为应该改变这里的行为以忽略第二次订阅尝试,而不是让它把两者都搞砸(我会记录一个错误 - 谢谢!)。

但是,您可以使用主题来达到相同的效果。我们可以使用主题重写您的示例

ReactiveSeq<Integer> initialStream = ReactiveSeq.of(1,2,3,4,5,6);



        FutureStream<Integer> futureStream = FutureStream.builder()
                                                         .fromStream(initialStream)
                                                         .map(v -> v -1);
        Queue<Integer> queue= QueueFactories.<Integer>boundedNonBlockingQueue(1000).build();
        Topic<Integer> topic = new Topic<Integer>(queue,QueueFactories.<Integer>boundedNonBlockingQueue(1000));

        ReactiveSeq<Integer> s2 = topic.stream();
        ReactiveSeq<Integer> s1 = topic.stream();

        Thread t = new Thread(()->{
            topic.fromStream(futureStream);
            topic.close();
        });
        t.start();


        CompletableFuture future1 = CompletableFuture.runAsync(() -> s1.forEach(v -> System.out.println("1 -> " + v)));
        CompletableFuture future2 = CompletableFuture.runAsync(() -> s2.forEach(v -> System.out.println("2 -> " + v)));

        try {

            future1.get();
            future2.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

并且输出更符合我们的预期

2 -> 0 1 -> 0 2 -> 1 1 -> 1 2 -> 2 1 -> 2 2 -> 3 1 -> 3 2 -> 4 1 -> 4 2 -> 5 1 -> 5

于 2018-07-25T09:30:54.013 回答