我正在学习使用 Java 9 中的 Reactive Streams 并在 Spring 5 中使用 org.reactivestreams 的反应式编程。
我正在使用 webflux api spring 5 和 spring boot2 创建一个 WebClient。
现在,当我们使用 webclient 设置请求的主体时,我们可以在 BodyInserters 方法中传递一个 Reactive Stream(具体来说是 spring 5 中的 org.reactivestreams),如下所示:
WebClient.RequestHeadersSpec<?> requestSpec1 = WebClient
.create()
.method(HttpMethod.POST).uri("/getDocs")
.body(BodyInserters
.fromPublisher(Mono.just("data"),
String.class)
);
并来自 fromPublisher 方法定义:
返回写入给定 Publisher 的 BodyInserter。
参数:publisher 发布者要流式传输到响应体 elementClass 发布者中包含的元素的类
现在,发布者和订阅者也在 java 9 反应流中,我创建的发布者之一是:
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
public class TestReactiveStreams {
public static SubmissionPublisher<String> publisher;
public static void main(String...strings ) {
publisher = new SubmissionPublisher();
TestSubscriber<String> subscriber = new TestSubscriber<String>();
publisher.subscribe(subscriber);
List<String> items = List.of("1","x","2","x","3","x");
System.out.println("Subscribers = "+publisher.getNumberOfSubscribers());
if(publisher.getNumberOfSubscribers() > 0) {
items.stream().forEach(p -> publisher.submit(p));
}
publisher.close();
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Consumed Elements = "+subscriber.consumed);
}
}
现在我的问题是:是否可以在 BodyInserters 的 fromPublisher 方法中使用来自 TestReactiveStreams 的发布者?
我尝试了以下方法:
BodyInserter<Publisher<String>, ReactiveHttpOutputMessage> inserter2 = BodyInserters
.fromPublisher(TestReactiveStreams.publisher, SubmissionPublisher.class);
但这给出了以下编译时错误:
The method fromPublisher(P, Class<T>) in the type BodyInserters is not applicable for the arguments
(SubmissionPublisher<String>, Class<SubmissionPublisher>)
请建议,我们怎样才能做到这一点?