1

我打算将RSocket用于我的通知系统。我想将Spring Boot RSocket 用于我的后端(Java),而对于我的前端,我将使用 Angular 使用 rsocket-js

我能够快速启动一个请求流交互模型,其中我可以在我的系统中提取所有通知。请参阅我的后端的代码片段:

  @MessageMapping("streams")
  public Flux<Notification> requestStream() {

    log.info("Streaming to notifications...");

    return streamEventService.retrieveAllNotifications().log();

  }

现在在我的前端,我有以下代码片段:

export class RsocketClientService {

  // backend ws endpoint
  private readonly wsURL = 'ws://localhost:7000/notification';

  client: any;
  socket: any

  constructor() { 
    this.client = new RSocketClient({
      serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
      },
      setup: {
          keepAlive: 10000,
          lifetime: 180000,
          dataMimeType: 'application/json',
          metadataMimeType: 'message/x.rsocket.routing.v0',       
          payload: {
            data: 23
          }
      },
      transport: new RSocketWebSocketClient({
          url: this.wsURL
      }),
      responder: new EchoResponder()
    });
  }

  public connect() {
    console.log("initializeSocket...")
    this.client.connect().subscribe({
      onComplete: (socket: any) => {
        this.socket = socket;
        this.socket.connectionStatus().subscribe( (status: any) => {
            console.log("Connection status? ", status);
        });
      },
      onError: (error: any) => {
        console.error("Connection onError? " + error);
      },
      onSubscribe: (cancel: any) => {
        console.log("Connection onSubscribe? cancel?");
      }
    });
  }

 public retrieveNotifications() {
    this.socket.requestStream({
      data: null,
      metadata: String.fromCharCode('streams'.length) + 'streams'
    })
    .subscribe({
      onComplete: () => {
        console.log("onComplete?");
      },
      onError: (error: any) => {
        console.error("onError? error: " + error);
      },
      onNext: (payload: any) => {
        console.log("onNext? payload: ", payload);
      },
      onSubscribe: (subscription: any) => {
        console.log("onSubscribe?");
        subscription.request(1000000);
      },
    });
  }

我在 UI 中有一个按钮,如果单击该按钮,将调用方法retrieveNotifications ,该方法将订阅我的后端requestStream中的 rsocket 消息映射方法。

一切正常,我可以看到我的回复。现在我的问题是,如果在我的服务器上有一个新数据插入到数据库中,例如,我如何从我的后端服务器发送通知消息到前端说“嘿!新数据已推送到数据库中。” 我有点卡在服务器如何能够以某种方式使用火灾并忘记客户端。

4

2 回答 2

1

您希望在连接建立时服务器端向客户端发送请求。

您可以从服务器获取此连接的 RSocketRequester,然后使用它创建四种方法之一(FNF、请求响应、请求流、流流)向客户端发送请求。在客户端,您可以通过上述四种方法之一在 EchoResponder 类中接收数据。

于 2021-07-19T07:16:51.597 回答
0

看起来您需要创建一个返回 void 的新控制器函数,并且当您在数据库中插入一个对象时,您将该对象从该函数传递到前端,并以角度连接到它,就像您做的那样...尝试检查此链接是否有火灾并忘记方法...希望这有助于https://www.baeldung.com/spring-boot-rsocket

于 2021-07-12T09:22:26.097 回答