我正在尝试从流式侦听器响应中创建一个 observable,但无法这样做。我对 EventObservable 和 Reactive Java 的想法很陌生。如果有人可以查看我的代码以查看它是否以正确的方式构建以及我对这个概念的理解是否走在正确的道路上,那将会很有帮助。
@Component
public class EventObservable {
private Observable<Event> observable = new Observable<Event>() {
@Override
protected void subscribeActual(Observer<? super Event> observer) {
return;
}
};
public Observable<Event> getObservable () {
return this.observable;
}
}
@EnableBinding(ConsumerChannels.class)
public class EventConsumer {
@Autowired
private EventObservable eventObservable;
@StreamListener(ConsumerChannels.EVENT_NOTIFICATION_CHANNEL)
public void fetchEvent(Message asd){
Event event = new Event("123123","google.io","This is an event","Singapore");
eventObservable.getObservable().just(event);
return;
}
}
@Component
public class SubscriptionGraphQlUtilities {
private Logger logger = LoggerFactory.getLogger(SubscriptionGraphQlUtilities.class);
@Value("classpath:schemas.graphqls")
private Resource schemaResource;
private final static EventPublisher EVENT_PUBLISHER = new EventPublisher();
private final GraphQLSchema graphQLSchema;
public SubscriptionGraphQlUtilities(){
graphQLSchema = buildSchema();
}
private GraphQLSchema buildSchema() {
//
// reads a file that provides the schema types
//
Reader streamReader = loadSchemaFile("schemas.graphqls");
TypeDefinitionRegistry typeRegistry = new SchemaParser().parse(streamReader);
RuntimeWiring wiring = RuntimeWiring.newRuntimeWiring()
.type(newTypeWiring("subscription")
.dataFetcher("subscribeEvent", getEventsDataFetcher())
)
.build();
return new SchemaGenerator().makeExecutableSchema(typeRegistry, wiring);
}
private DataFetcher getEventsDataFetcher(){
return environment -> {
return EVENT_PUBLISHER.getPublisher();
};
}
public GraphQLSchema getGraphQLSchema() {
return graphQLSchema;
}
@SuppressWarnings("SameParameterValue")
private Reader loadSchemaFile(String name) {
InputStream stream = getClass().getClassLoader().getResourceAsStream(name);
return new InputStreamReader(stream);
}
}
@EnableBinding(ConsumerChannels.class)
public class SubscriptionConsumer {
private Logger logger = LoggerFactory.getLogger(SubscriptionConsumer.class);
private final AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
private final SubscriptionGraphQlUtilities graphQlUtilities = new SubscriptionGraphQlUtilities();
Instrumentation instrumentation = new ChainedInstrumentation(
Collections.singletonList(new TracingInstrumentation())
);
@StreamListener(ConsumerChannels.QUERY_INPUT)
public void fetchSubscription(Message asd){
logger.info("Received subscription query:{}",asd.toString());
try {
String query = "subscription{subscribeEvent{name}}";
String query2 = "subscription {\n subscribeEvent{\n\t\tid\n name\n\t\tdescription\n\t\tlocation\n\t}\n}";
GraphQL graphQL = GraphQL
.newGraphQL(graphQlUtilities.getGraphQLSchema())
.instrumentation(instrumentation)
.build();
ExecutionResult executionResult = graphQL.execute(query2);
Publisher<ExecutionResult> eventStream = executionResult.getData();
eventStream.subscribe(new Subscriber<ExecutionResult>() {
@Override
public void onSubscribe(Subscription subscription) {
logger.info("Successfully subscribed");
subscriptionRef.set(subscription);
request(1);
}
@Override
public void onNext(ExecutionResult executionResult) {
logger.info("Sending event updates");
Object result = executionResult.getData();
logger.info("This is the event,{}", result);
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("Subscription threw an exception", throwable);
}
@Override
public void onComplete() {
logger.info("Subscription complete");
}
});
} catch (Exception e) {
e.printStackTrace();
}
return;
}
private void request(int n) {
Subscription subscription = subscriptionRef.get();
if (subscription != null) {
subscription.request(n);
}
}
}