1

我正在使用nestjs创建一些微服务

例如,我有xyz服务都通过grpc互连,但我希望服务 x 在特定实体更改时向 webapp 发送更新,所以我考虑了 server-sent-events [对任何其他更好的解决方案开放]

根据nestjs 文档,他们有一个以n 间隔运行的函数,用于sse 路由,似乎资源枯竭。有没有办法在有更新时实际发送事件。

假设我在同一个服务中有另一个 api 调用,由单击另一个 web 应用程序的按钮触发,我如何触发事件仅在单击按钮时触发,而不是持续发送事件。此外,如果您知道任何惯用的方法来实现这一点,将不胜感激,希望它是最后的手段。

[奖金问题]

我还考虑使用MQTT发送事件。但是我感觉单个服务不可能拥有MQTT 和 gRPC。我对使用 MQTT 持怀疑态度,因为它的延迟以及它将如何影响内部消息传递。如果我可以限制外部客户端,那就太好了(即,x 服务使用 gRPC 进行内部连接,使用 MQTT 进行 webapp 只需要一个由 mqtt 公开的路由)。(PS我是微服务新手,所以请全面了解您的解决方案:p)

提前感谢您阅读到最后!

4

2 回答 2

3

你可以。重要的是在 NestJSSSE中是用 Observables 实现的,所以只要你有一个可以添加的 observable,你就可以用它来发回 SSE 事件。最简单的方法是使用Subjects。我曾经在某个地方有一个这样的例子,但一般来说,它看起来像这样

@Controller()
export class SseController {
  constructor(private readonly sseService: SseService) {}

  @SSE()
  doTheSse() {
    return this.sseService.sendEvents();
  }
}
@Injectable()
export class SseService {
  private events = new Subject();

  addEvent(event) {
    this.events.next(event);
  }

  sendEvents() {
    return this.events.asObservable();
  }
}
@Injectable()
export class ButtonTriggeredService {
  constructor(private readonly sseService: SseService) {}

  buttonClickedOrSomething() {
    this.sseService.addEvent(buttonClickedEvent);
  }
}

请原谅上面的伪代码性质,但总的来说,它确实展示了如何使用 Subjects 为 SSE 事件创建 observables。只要@SSE()端点返回一个具有正确形状的可观察对象,你就是黄金。

于 2021-04-21T19:54:12.493 回答
3

使用 NestJS 的 SSE 处理事件有更好的方法:

请参阅此回购代码示例:

https://github.com/ningacoding/nest-sse-bug/tree/main/src

基本上你有一个服务:

import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";

@Injectable()
export class EventsService {

    private readonly emitter = new EventEmitter();

    subscribe() {
        return fromEvent(this.emitter, 'eventName');
    }

    async emit(data) {
        this.emitter.emit('eventName', {data});
    }

}

显然,eventName可以是任何类似于具有用户 ID 的频道

例如:“events/for/<user_id>”并且订阅该频道的用户将仅接收该频道的事件,并且仅在被触发时;)

于 2021-10-26T16:03:48.393 回答