1

我正在使用node-pg-copy将数据从 mongodb 批量导入到postgresql db.

我使用 observables 来映射从 mongodb 进来的数据,然后将其写入 copyFrom 正在使用的流中。类似于以下内容:

function writeToStream (source, stream) {

  const trigger = Rx.Observable.fromEvent(stream, 'drain')
  hotsource = source.publish()

  const sub = trigger
    .takeUntil(hotSource.last())
    .subscribe(
      data => stream.write(String(data)) && pauser.next(true), // pauser gets the next row to deal with concurrency. Using debugger, I can see that 'data' is ok every time.
      err => stream.emit('error', err),
      () => !stream._isStdio && stream.end()
    )

  sub.add(hotsource.connect())
}

const textSource = Rx.Observable.from(docs)
  .map(doc => toTextFormat(doc)) // using debugger tools, I can see that every document passes through this method, and returns ok.

const copyStream = pgClient.query(copyFrom(
  `COPY "${table}" (${columnNames.join(', ')}) FROM STDIN`
))


const subscription = writeToStream(textSource, copyStream)

使用调试器工具,我可以看到每个文档都正确映射并写入了流。

该脚本运行没有问题,并且在没有抛出任何异常的情况下完成。无论如何,当我检查 postgres 数据库的结果时,行数比预期的要少。有人知道为什么会这样吗?

注意:该脚本将几个 mongo 集合导入到 postgres 表中,但其中只有两个存在此问题。此外,这两个集合中的行数始终相同。

注二:本代码为简化版。原始脚本处理并发和其他潜在问题,我认为这些问题与此问题无关。

** 编辑:** 我已将 postgres 配置为将日志保存到文件中。文件中的日志也没有显示任何错误。

4

0 回答 0