7

我的 Node 应用程序使用 Mongo 更改流,并且该应用程序在生产中运行 3 个以上的实例(最终,随着它的增长,这将成为一个更大的问题)。因此,当更改流中发生更改时,功能运行的次数与进程的次数一样多。

如何设置以使更改流仅运行一次?

这是我所拥有的:

const options = { fullDocument: "updateLookup" };

const filter = [
  {
    $match: {
      $and: [
        { "updateDescription.updatedFields.sites": { $exists: true } },
        { operationType: "update" }
      ]
    }
  }
];

const sitesStream = Client.watch(sitesFilter, options);

// Start listening to site stream
sitesStream.on("change", async change => {
  console.log("in site change stream", change);
  console.log(
    "in site change stream, update desc",
    change.updateDescription
  );

  // Do work...
  console.log("site change stream done.");
  return;
});
4

6 回答 6

4

只需使用 Mongodb 查询运算符即可轻松完成。您可以在 ID 字段上添加模查询,其中除数是您的应用程序实例数 (N)。余数则为 {0, 1, 2, ..., N-1} 的元素。如果您的应用程序实例按从零到 N-1 的升序编号,您可以这样编写过滤器:

const filter = [
  {
    "$match": {
      "$and": [
        // Other filters
        { "_id": { "$mod": [<number of instances>, <this instance's id>]}}
      ]
    }
  }
];
于 2019-11-13T14:01:39.337 回答
4

在强有力的保证下做到这一点是困难的,但并非不可能。我在这里写了一个解决方案的细节:https ://www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html

这些示例是用 Java 编写的,但重要的部分是算法。

它归结为一些技术:

  • 每个进程都尝试获取锁
  • 每个锁(或每个更改)都有一个关联的防护令牌
  • 处理每个更改必须是幂等的
  • 在处理更改时,令牌用于确保有序的、有效的一次更新。

博客文章中的更多详细信息。

于 2020-06-01T13:33:56.480 回答
1

听起来您需要一种在实例之间分区更新的方法。你看过 Apache Kafka 吗?基本上,您要做的是拥有一个将更改数据写入分区 Kafka 主题的应用程序,并让您的节点应用程序成为 Kafka 消费者。这将确保只有一个应用程序实例接收到更新。

根据您的分区策略,您甚至可以确保同一条记录的更新始终发送到同一个节点应用程序(如果您的应用程序需要维护自己的状态)。否则,您可以以循环方式分散更新。

使用 Kafka 的最大好处是您可以添加和删除实例,而无需调整配置。例如,您可以启动一个实例,它会处理所有更新。然后,一旦您启动另一个实例,它们就会开始处理一半的负载。只要有分区,您就可以对尽可能多的实例继续这种模式(如果需要,您可以将主题配置为具有 1000 个分区),这就是 Kafka 消费者组的强大功能。缩小则相反。

于 2018-08-24T16:07:17.970 回答
1

虽然 Kafka 选项听起来很有趣,但它是在我不熟悉的平台上进行的大量基础架构工作,所以我决定为我选择离家更近一些的东西,将 MQTT 消息发送到一个小的独立应用程序,并让 MQTT 服务器监控消息的唯一性。

siteStream.on("change", async change => {
  console.log("in site change stream);
  const mqttClient = mqtt.connect("mqtt://localhost:1883");
  const id = JSON.stringify(change._id._data);
  // You'll want to push more than just the change stream id obviously...
  mqttClient.on("connect", function() {
    mqttClient.publish("myTopic", id);
    mqttClient.end();
  });
});

我仍在制定 MQTT 服务器的最终版本,但评估消息唯一性的方法可能会将一组更改流 ID 存储在应用程序内存中,因为不需要持久化它们,并评估是否继续进行任何操作进一步基于之前是否看到过该更改流 ID。

var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
  client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
  context = message.toString().replace(/"/g, "");
  if (seen.indexOf(context) < 0) {
    seen.push(context);
    // Do stuff
  }
});

这不包括安全性等,但你明白了。

于 2018-08-30T13:03:15.200 回答
1

是否会在 DB 中调用一个字段,该字段status将根据从更改流接收到的事件使用 findAnUpdate 进行更新。因此,假设您同时从更改流中获得 2 个事件。第一个事件会将状态更新为start,如果状态为 ,另一个将抛出错误start。所以第二个事件不会处理任何业务逻辑。

于 2020-05-04T09:17:16.213 回答
0

我并不是说这些是坚如磐石的生产级解决方案,但我相信这样的事情可能会奏效

解决方案 1

应用读取-修改-写入

  1. 向文档添加version字段,所有创建的文档的版本=0
  2. 接收 ChangeStream 事件
  3. 阅读需要更新的文档
  4. 对模型执行更新
  5. 增量版本
  6. 更新两者都 id匹配的文档version,否则丢弃更改

是的,它会创建2 * n_application_replicas无用的查询,所以还有另一种选择

解决方案 2

  1. 在 mongo 中创建 ResumeTokens 集合,用于存储集合 -> 令牌映射
  2. 在changeStream handler代码中,写入成功后,更新集合中的ResumeToken
  3. 创建一个功能切换,将禁用在您的应用程序中读取 ChangeStream
  4. 仅将应用程序的单个实例配置为“阅读器”

如果“阅读器”失败,您可以在另一个节点上启用阅读,或重新部署“阅读器”节点。

结果:可能有无限数量的非读者副本,不会有任何无用的查询

于 2021-11-03T17:46:07.957 回答