问题标签 [changestream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
309 浏览

java - 在多个集合上使用 DocumentDB 更改流?

我正在实现 python 应用程序以使用 Change Stream 功能捕获 DocumentDB 上的更改我的设计是监视目标数据库中所有集合的更改并发布到某个消息队列以进行一些处理。

我的问题目前是 DocumentDB 支持 MongoDB API 版本 3.6,它不支持数据库级别的手表更改。有没有办法在当前 DocumentDB 版本上查看数据库级别的更改流。

0 投票
1 回答
771 浏览

mongodb - MongoDB Change Streams 读取过去的更改

我想读取数据库中集合的更改。现在我在直接使用 oplog 和Change Streams之间进行选择。我对 oplog Change Streams 能力几乎没有疑问

  1. 如何使用 Change Streams 从 oplog 一开始就开始读取更改?

使用 oplog 我可以find()在具有零时间戳的 oplog 集合上使用,但使用 Change Streams 我只能传递resumeAfterstartAfter令牌。当我没有指定光标选项时,它会从当前时刻开始读取更改,但在第一次运行时,我需要从头开始。

  1. 如果我的更改阅读器会落后于 oplog 并 resumeAfter变得过时怎么办?

文档

如果时间戳是过去的,oplog 必须有足够的历史来定位与令牌或时间戳关联的操作。

如何正确处理这种情况?

  1. 现在我需要从单个集合中读取更改,但将来我可能需要从多个集合中读取更改

使用 oplog,我可以扩展查找过滤器并继续使用状态中的时间戳。我可以对 Change Streams 做同样的事情吗?

0 投票
1 回答
513 浏览

mongodb - 具有多个任务的分布式官方 Mongodb Kafka 源连接器不起作用

我在我的 Windows 机器上运行 Apache Kafka,有两个 Kafka-Connect-Workers(端口 8083、8084)和一个带有三个分区的主题(一个复制)。我的问题是,每当我关闭其中一个 Kafka-Connect 工作人员时,我都能看到故障转移到其他工作人员,但由于任务数始终为 ONE,因此没有发生负载平衡。我使用官方 MongoDB-Kafka-Connector 作为 Source(ChangeStream),tasks.max=6。我尝试使用多个线程更新 MongoDB,以便它可以将更多数据推送到 Kafka-Connect,并可能使 Kafka-Connect 创建更多任务。即使在数据量更大的情况下,任务计数仍然是一个。

我如何确认只有一项任务正在运行?那是通过 api "http://localhost:8083/connectors/mongodb-connector/status" : 响应: { "name":"mongodb-connector", "connector": { "state":"RUNNING", "worker_id":"xx.xx.xx.xx:8083" } "tasks": [ { "id": 0, "state": "RUNNING" "worker_id": "xx.xx.xx.xx:8083" } ], "type": "source" } 我在这里遗漏了什么吗?为什么没有创建更多任务?

0 投票
1 回答
174 浏览

mongodb - 使用应用于 MongoDB 中的更改流的聚合阶段从子数组中删除元素?

我正在使用 MongoDB 4.2,并且我正在寻找一种方法来从子数组中删除所有元素,如果它不匹配使用与更改流兼容的聚合阶段的特定条件。兼容的阶段是:

  • $addFields
  • $匹配
  • $项目
  • $replaceRoot
  • $replaceWith
  • $编辑
  • $set
  • $未设置

例如,假设我们有一个集合 users,其中包含以下格式的文档:

我想使用兼容的聚合阶段的一个或组合来删除“访问”数组中与过滤器不匹配的元素,例如{ $elemMatch : { "level" : "Gold" } }. 我希望生成的文档如下所示:

是否可以在 MongoDB 中执行此操作?

0 投票
1 回答
203 浏览

mongodb - 如何在 mongodb 更改流中使用 $lookup?

基于mongoDB Change Stream Docs中的文档,我只能使用这些操作来获取更改流的输出:

  • $addFields
  • $匹配
  • $项目
  • $replaceRoot
  • $replaceWith(从 MongoDB 4.2 开始可用)
  • $编辑
  • $set(从 MongoDB 4.2 开始可用)
  • $unset(从 MongoDB 4.2 开始可用)

但我想使用 $lookup op :(

你有什么想法来实现这一目标吗?

0 投票
1 回答
342 浏览

java - MongoDB Java Driver 的 ChangeStream 性能问题

我需要观看来自 mongodb 的最新文档。我使用了 ChangeStream watch API 从集合中获取流文档。

我拥有的设置是一个副本集,其中 3 个节点在同一系统中运行,端口为 27017、27018 和 27019。该设置没有任何身份验证设置。

mongodb.conf 文件:

我已经对其中包含 72663 个文档的文件执行了批量插入。我从下面的程序中得到的每秒处理的记录只有 8073 条。

我必须观看的 Java 代码是。

有没有办法从更改流 API 中获得更好的性能。或者是否有任何其他 API 可以让我在观看文档时获得更好的性能。或任何其他可以提供更好性能的复制属性。

0 投票
0 回答
399 浏览

javascript - 在mongodb更改流中为同时插入的文档获取相同的恢复令牌

我有一个 nodejs 函数在 Azure Cosmos DB 的集合中侦听更改(insert和),它在更改流APIupdate的帮助下使用 API for MongoDB(3.6) 。从更改流接收的每个文档都有唯一的令牌(如文档中所述)。请也检查一下

但问题是,如果我同时向 cosmos db 插入一些文档,例如 100 或 200,则从流中接收到的那些插入文档具有相同的恢复令牌。这严重影响了我的变更流恢复策略

然而,这种奇怪的行为在独立的 mongodb 中并不存在。我测试并发现它仅在带有 mongo api 的 cosmosdb 中发生。有没有人有同样的问题?任何解决方法?

0 投票
2 回答
666 浏览

node.js - 如何检测对 MongoDB/Mongoose 中的集合所做的所有更改,包括删除?

我有一个通过猫鼬使用 MongoDB 的应用程序。在应用程序上有一个名为Notifications. 我想检测每次该集合发生变化(包括删除文档)并采取适当的措施。我在这里读到:

与 Model.remove() 一样,此函数 (Model.findOneAndDelete()、Model.deleteOne()) 不会触发 pre('remove') 或 post('remove') 挂钩。

然后我在这里读到:

更改流为您提供了一种方法来监听通过 MongoDB 数据库的所有插入更新。请注意,除非您连接到 MongoDB 副本集,否则更改流不起作用。

变更流是否将删除视为更新?如何收听数据库中的所有更改,包括删除文档?

0 投票
1 回答
958 浏览

node.js - 如何实现将数据从 mongo 流式传输到 elasticsearch 的 nodeJS 工作程序?

我正在构建一个基于CDC的应用程序,该应用程序使用Mongo Change Streams来侦听更改事件并近乎实时地索引 elasticsearch 中的更改。

到目前为止,我已经实现了一个工作程序,它调用一个函数来捕获事件、转换它们并在 elasticsearch 中对它们进行索引,在为 1 个 mongo 集合实现流时没有任何问题:

我已经使用无限循环(可能是一种不好的方法)实现了它,但我不确定当我必须保持更改流永远存在时有什么替代方案。

当我必须为另一个模型实现更改流时,问题就来了。由于第一个函数有一个阻塞的 while 循环,因此工作人员无法调用第二个函数来启动第二个更改流。

我想知道最好的方法是启动一个可以触发 x no 的工人。在不影响每个更改流的性能的情况下更改流。工作线程是正确的方法吗?

0 投票
1 回答
207 浏览

mongodb - 如果微服务实例被复制,则更改流复制

我已经在 J​​ava 微服务中实现了 MongoDB 更改流,当我复制我的微服务时,我看到更改流监视正在侦听两次。代码重复。有什么办法可以阻止这一切?