我正在尝试从 kafka 获取消息并使用 Spring 将其发送到 RSocket。使用 React 在 Spring Java 和客户端发布服务器端
@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {
@Bean
public Function<Integer, Mono<Integer>> rsocketConsumer(RSocketRequester.Builder builder,
RsocketConsumerProperties rsocketConsumerProperties) {
RSocketRequester rSocketRequester = builder.websocket(URI.create("ws://localhost:7000/"));
return input -> rSocketRequester.route(rsocketConsumerProperties.getRoute()).data(input).retrieveMono(Integer.class);
}
}
@EnableBinding(Sink.class)
public class Listener {
@Autowired
private Function<Integer, Mono<Integer>> rsocketConsumer;
@StreamListener(Sink.INPUT)
public void fireAndForget(Integer val) {
System.out.println(val);
rsocketConsumer.apply(val).subscribe();
}
}
@Controller
public class ServerController {
@MessageMapping("data")
public Mono<Integer> hello(Integer integer) {
return Mono.just(integer);
}
}
我在服务器端做错了什么,因为我的客户端已连接但无法获取新消息
client.connect().subscribe({
onComplete: socket => {
socket.fireAndForget({
data: { message: "hello from javascript!" },
metadata: null
});
},
onError: error => {
console.log("got error");
console.error(error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
console.log("subscribe!");
console.log(cancel);
// cancel.cancel();
}
});