1

我正在研究构建一个简单的解决方案,其中生产者服务将事件推送到消息队列,然后让流服务通过 gRPC 流 API 提供这些服务。

Cloud Pub/Sub 似乎非常适合这项工作,但是扩展流服务意味着该服务的每个副本都需要创建自己的订阅并在缩减之前将其删除,这似乎不必要地复杂,而不是平台的预期用途。

另一方面,Kafka 似乎适用于这样的事情,但我想避免必须管理底层平台本身,而是利用云基础设施。

我还应该提到,拥有流式 API 的原因是允许流向前端(可能无法访问底层基础设施)

有没有更好的方法来使用 GCP 平台做这样的事情,而无需部署和管理我自己的基础设施?

4

2 回答 2

3

如果您本质上想要临时订阅,那么您可以在创建订阅时在Subscription对象上设置一些内容:

  1. 将 设置expiration_policy为较小的持续时间。当订阅者在该时间段内没有收到消息时,订阅将被删除。权衡是,如果您的订阅者由于持续时间超过此期间的暂时性问题而关闭,则订阅将被删除。默认情况下,到期时间为 31 天。您可以将此设置为低至 1 天。对于拉取订阅者,订阅者只需停止向 Cloud Pub/Sub 发出请求,即可在其到期时启动计时器。对于推送订阅,计时器在没有消息成功传递到端点时启动。因此,如果没有发布消息或端点为所有推送的消息返回错误,则计时器有效。

  2. 减小 的值message_retention_duration。这是在订阅者未接收消息并确认消息的情况下保留消息的时间段。默认情况下,这是 7 天。您可以将其设置为低至 10 分钟。权衡是,如果您的订阅者断开连接或处理消息的时间超过此持续时间,则早于该时间的消息将被删除,订阅者将看不到它们。

完全关闭的订阅者可能只是自己调用 DeleteSubscription 以便订阅立即消失,但对于意外关闭的订阅者,设置这两个属性将最小化订阅继续存在的时间和消息数量(这将永远不会交付)将被保留。

请记住,Cloud Pub/Sub 配额限制每个主题和每个项目 1 到 10,000 个订阅。因此,如果创建了很多订阅并且处于活动状态或未清理(手动或在expiration_policy'sttl过去后自动),则可能无法创建新订阅。

于 2019-03-19T13:34:08.920 回答
0

我认为您最初的想法比临时订阅更好。我的意思是它有效,但感觉完全不自然。看你的要求是什么。例如,客户端是否只需要在连接时接收消息,还是都需要获取所有消息?

仅在连接时

你最初的想法是更好的imo。我可能会做的是创建一个客户端可以连接的 gRPC 流服务。该实现本质上是一个观察者模式。消费者将收到一条消息,然后遍历订阅者以向所有订阅者执行“发送”。从那里,任何时候客户端连接到服务,它只是向该观察者集合注册自己,并在断开连接时取消注册。水平扩展是被动的,因为客户端会粘在他们连接的任何实例上。

如果最终,每个人​​都会收到信息

该概念与上述类似,但客户端不会在断开连接时从观察者隐式取消注册。相反,它会显式地注册和取消注册(通过为此而设计的方法/命令)。修改“on disconnected”逻辑,告诉观察者列表客户端已经离线。那么消费者的广播逻辑略有不同。现在它遍历列表并说“如果在线,则发送,否则排队”,并将消息发送到临时队列(属于客户端)。然后,您的“连接时”逻辑将在通知消费者它重新联机之前将所有排队的消息发送到客户端。基本上是一个收件箱。在 RabbitMQ 等大多数产品中,设置临时的、自删除的队列非常容易。我觉得你' 不过,我必须做一些管理是否可以删除队列。例如,除非客户端明确取消订阅或长时间处于非活动状态,否则永远不要删除队列。如果不这样做,整个收件箱的想法就会分崩离析。

在此处输入图像描述

上面选择的答案与我在这里订阅的内容最相似,因为订阅是队列。如果我这样做了,那么我可能会将它实现为内部总线而不是观察者(因为它是不必要的) - 您按需为连接客户端创建一个消费者,该客户端实际上只是转发消息。消息消费者根据客户端是否连接进行订阅和取消订阅。正如 Kamal 所指出的,如果您的规模超过 pubsub 允许的最大订阅数,您将遇到问题。如果你发现自己处于那个位置,那么你可以通过实现上面的模式来解除这个约束。这基本上是相同的模式,但是您将责任转移到您的基础设施上,其中唯一的限制是您自己的资源。

在此处输入图像描述

gRPC 使这种机制非常简单。或者,对于 Web,如果您使用 Microsoft 堆栈,那么 SignalR 也可以让这变得非常容易。客户端连接到集线器,您可以发布到所有连接的客户端。这里的消费者模式基本保持不变,但您不必手动实现观察者模式。

(注意:图中的箭头是依赖的方向,不是数据流的方向)

于 2021-06-21T14:48:18.917 回答