就反应式流而言,有一个发布者,它可以有尽可能多的订阅者。
但是假设,订阅者从发布者那里得到了一条消息。现在这个订阅者(比如 Subs1)更改/修改消息并将其传递给其他订阅者(比如 Subs2),后者使用修改后的消息。
那么这个 Subs1 订阅者可以充当发布者,可以将消息传递给新的 Subs2 订阅者吗?
我不确定它是否可能,但我认为这种情况是可能的。
如果可能,请提出一种可能的方法来做到这一点。
就反应式流而言,有一个发布者,它可以有尽可能多的订阅者。
但是假设,订阅者从发布者那里得到了一条消息。现在这个订阅者(比如 Subs1)更改/修改消息并将其传递给其他订阅者(比如 Subs2),后者使用修改后的消息。
那么这个 Subs1 订阅者可以充当发布者,可以将消息传递给新的 Subs2 订阅者吗?
我不确定它是否可能,但我认为这种情况是可能的。
如果可能,请提出一种可能的方法来做到这一点。
If we want to transform incoming message and pass it further to the next Subscriber, we need to implement the Processor interface. This acts both as a Subscriber because it receives messages, and as the Publisher because it processes those messages and sends them for further processing.
Here is the complete implementation to do this:
Create a MyTransformer class which implements Processor and extends SubmissionPublisher, as it will act both as Subscriber and Publisher:
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public MyTransformer(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onComplete() {
System.out.println("Transformer Completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(T item) {
System.out.println("Transformer Got : "+item);
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
Create an TestSubscriber class which implements the Subscriber interface and implement the required methods:
The onSubscribe() method is called before processing starts. The instance of the Subscription is passed as the argument. It is a class that is used to control the flow of messages between Subscriber and the Publisher.
The main method here is onNext() – this is called whenever the Publisher publishes a new message.
We are using the SubmissionPublisher class which implements the Publisher interface.
We’re going to be submitting N elements to the Publisher – which our TestSubscriber will be receiving.
Note, that we’re calling the close() method on the instance of the TestSubscriber. It will invoke onComplete() callback underneath on every Subscriber of the given Publisher.
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class TestSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumed = new LinkedList<>();
@Override
public void onComplete() {
System.out.println("Subsciber Completed");
}
@Override
public void onError(Throwable arg0) {
arg0.printStackTrace();
}
@Override
public void onNext(T item) {
System.out.println("In Subscriber Got : "+item);
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
MyTransformer is parsing the String as Integer – which means a conversion needs to happening here.
import java.util.List;
import java.util.concurrent.SubmissionPublisher;;
public class TestTransformer {
public static void main(String... args) {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
}
}