我正在使用node-pg-copy将数据从 mongodb 批量导入到postgresql db
.
我使用 observables 来映射从 mongodb 进来的数据,然后将其写入 copyFrom 正在使用的流中。类似于以下内容:
function writeToStream (source, stream) {
const trigger = Rx.Observable.fromEvent(stream, 'drain')
hotsource = source.publish()
const sub = trigger
.takeUntil(hotSource.last())
.subscribe(
data => stream.write(String(data)) && pauser.next(true), // pauser gets the next row to deal with concurrency. Using debugger, I can see that 'data' is ok every time.
err => stream.emit('error', err),
() => !stream._isStdio && stream.end()
)
sub.add(hotsource.connect())
}
const textSource = Rx.Observable.from(docs)
.map(doc => toTextFormat(doc)) // using debugger tools, I can see that every document passes through this method, and returns ok.
const copyStream = pgClient.query(copyFrom(
`COPY "${table}" (${columnNames.join(', ')}) FROM STDIN`
))
const subscription = writeToStream(textSource, copyStream)
使用调试器工具,我可以看到每个文档都正确映射并写入了流。
该脚本运行没有问题,并且在没有抛出任何异常的情况下完成。无论如何,当我检查 postgres 数据库的结果时,行数比预期的要少。有人知道为什么会这样吗?
注意:该脚本将几个 mongo 集合导入到 postgres 表中,但其中只有两个存在此问题。此外,这两个集合中的行数始终相同。
注二:本代码为简化版。原始脚本处理并发和其他潜在问题,我认为这些问题与此问题无关。
** 编辑:** 我已将 postgres 配置为将日志保存到文件中。文件中的日志也没有显示任何错误。