6

我一直在玩 Java Flowoffer运算符,但是在阅读了文档并进行了测试后,我不明白。

这是我的测试

@Test
public void offer() throws InterruptedException {
    //Create Publisher for expected items Strings
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    //Register Subscriber
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.offer("item", (subscriber, value) -> false);
    Thread.sleep(500);
}

报价运算符接收要发出的项目和 BiPredicate 函数,据我了解阅读文档,只有在谓词函数为真的情况下才会发出项目。

Bur通过测试后的结果是

Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback

如果我返回 true 而不是 false,则结果没有变化。

任何人都可以更好地解释我这个操作员。

4

2 回答 2

7

不,谓词函数用于决定是否重试文档中提到的发布操作:

onDrop- 如果非空,则在拖放到订阅者时调用的处理程序,带有订阅者和项目的参数;如果它返回 true,则重新尝试(一次)提议

它不影响最初是否发送项目。

offer编辑:使用该方法时如何发生滴落的示例

我想出了一个例子,说明调用该offer方法时如何发生drop。我不认为输出是 100% 确定的,但是当它运行几次时有明显的区别。您可以将处理程序更改为返回 true 而不是 false,以查看重试如何减少由于饱和缓冲区导致的丢弃。在此示例中,通常会发生下降,因为最大缓冲区容量明显很小(传递给 的构造函数SubmissionPublisher)。但是,当在一小段睡眠期后启用重试时,丢弃的数据将被删除:

public class SubmissionPubliserDropTest {

    public static void main(String[] args) throws InterruptedException {
        // Create Publisher for expected items Strings
        // Note the small buffer max capacity to be able to cause drops
        SubmissionPublisher<String> publisher =
                               new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
        // Register Subscriber
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        // publish 3 items for each subscriber
        for(int i = 0; i < 3; i++) {
            int result = publisher.offer("item" + i, (subscriber, value) -> {
                // sleep for a small period before deciding whether to retry or not
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return false;  // you can switch to true to see that drops are reduced
            });
            // show the number of dropped items
            if(result < 0) {
                System.err.println("dropped: " + result);
            }
        }
        Thread.sleep(3000);
        publisher.close();
    }
}

class CustomSubscriber<T> implements Flow.Subscriber<T> {

    private Subscription sub;

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    @Override
    public void onError(Throwable th) {
        th.printStackTrace();
        sub.cancel();
    }

    @Override
    public void onNext(T arg0) {
        System.out.println("Got : " + arg0 + " --> onNext() callback");
        sub.request(1);
    }

    @Override
    public void onSubscribe(Subscription sub) {
        System.out.println("Subscription done");
        this.sub = sub;
        sub.request(1);
    }

}
于 2017-09-30T11:22:50.477 回答
3

SubmissionPublisher.offer指出

如果超出资源限制,则该项目可能会被一个或多个订阅者删除,在这种情况下,将调用给定的处理程序(如果非空),如果返回true,则重试一次。

只是为了理解,在你的两个电话中

publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked

publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked

但是仍然publisher将给定的项目发布给其当前的每个订阅者。这发生在您当前的情况下。


正如文档所建议的那样,验证您提供的处理程序是否通过尝试重现而被调用的场景在资源限制方面非常困难:

如果超出资源限制,则该项目可能会被一个或多个订阅者删除,在这种情况下,将调用给定的处理程序(如果非空),如果它返回 true,则重试一次。

然而,您可以尝试使用重载方法删除超时设置为基本最小值的项目offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

timeout- 在放弃之前等待任何订阅者的资源多长时间,以单位为单位

unit- 确定如何解释超时参数的 TimeUnit

由于这些offer方法可能会丢弃项目(立即或有限制的 timeout),这将提供插入处理程序然后重试的机会。

于 2017-09-30T12:22:00.053 回答