我正在构建一个基于CDC的应用程序,该应用程序使用Mongo Change Streams来侦听更改事件并近乎实时地索引 elasticsearch 中的更改。
到目前为止,我已经实现了一个工作程序,它调用一个函数来捕获事件、转换它们并在 elasticsearch 中对它们进行索引,在为 1 个 mongo 集合实现流时没有任何问题:
function syncChangeEvents() {
const stream = ModelA.watch()
while (!stream.isClosed()) {
if (await stream.hasNext()) {
const event = stream.next()
// transform event
// index to elasticsearch
}
}
}
我已经使用无限循环(可能是一种不好的方法)实现了它,但我不确定当我必须保持更改流永远存在时有什么替代方案。
当我必须为另一个模型实现更改流时,问题就来了。由于第一个函数有一个阻塞的 while 循环,因此工作人员无法调用第二个函数来启动第二个更改流。
我想知道最好的方法是启动一个可以触发 x no 的工人。在不影响每个更改流的性能的情况下更改流。工作线程是正确的方法吗?