2

我正在学习使用 Java 9 中的 Reactive Streams 并在 Spring 5 中使用 org.reactivestreams 的反应式编程。

我正在使用 webflux api spring 5 和 spring boot2 创建一个 WebClient。

现在,当我们使用 webclient 设置请求的主体时,我们可以在 BodyInserters 方法中传递一个 Reactive Stream(具体来说是 spring 5 中的 org.reactivestreams),如下所示:

WebClient.RequestHeadersSpec<?> requestSpec1 = WebClient
                                                .create()
                                                .method(HttpMethod.POST).uri("/getDocs")
                                                .body(BodyInserters
                                                .fromPublisher(Mono.just("data"), 
                                                                String.class)
                                                );

并来自 fromPublisher 方法定义:

返回写入给定 Publisher 的 BodyInserter。

参数:publisher 发布者要流式传输到响应体 elementClass 发布者中包含的元素的类

现在,发布者和订阅者也在 java 9 反应流中,我创建的发布者之一是:

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

public class TestReactiveStreams {

    public static SubmissionPublisher<String> publisher;

    public static void main(String...strings ) {
        publisher = new SubmissionPublisher();
        TestSubscriber<String> subscriber = new TestSubscriber<String>();
        publisher.subscribe(subscriber);

        List<String> items = List.of("1","x","2","x","3","x");

        System.out.println("Subscribers = "+publisher.getNumberOfSubscribers());

        if(publisher.getNumberOfSubscribers() > 0) {
            items.stream().forEach(p -> publisher.submit(p));
        }

        publisher.close();


        try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("Consumed Elements = "+subscriber.consumed);

    }


}

现在我的问题是:是否可以在 BodyInserters 的 fromPublisher 方法中使用来自 TestReactiveStreams 的发布者?

我尝试了以下方法:

BodyInserter<Publisher<String>, ReactiveHttpOutputMessage> inserter2 = BodyInserters
                .fromPublisher(TestReactiveStreams.publisher, SubmissionPublisher.class);

但这给出了以下编译时错误:

The method fromPublisher(P, Class<T>) in the type BodyInserters is not applicable for the arguments 
 (SubmissionPublisher<String>, Class<SubmissionPublisher>)

请建议,我们怎样才能做到这一点?

4

0 回答 0