0

我正在使用snowflake-sdksnowflake-promise流式传输结果(以避免在内存中加载太多对象)。

对于每个流式传输的行,我想处理接收到的信息(执行回写的类似 ETL 的作业)。我的代码非常基本,类似于这个简单的雪花承诺示例

我目前的问题是.on('data', ...)调用的频率超出了我的处理能力。(我的类似 ETL 的工作跟不上接收到的行,我的数据库连接池执行回写操作已经用尽)。

我尝试将rowStreamHighWaterMark设置为各种值(1、10 [默认]、100、1000、2000 和 4000)以减慢/ backpressure流的速度。可读但不幸的是,它没有改变任何东西。

我错过了什么 ?如何更好地控制何时使用读取数据?

4

1 回答 1

0

如果这是同步写入的,您会看到“被推送的数据过多”而不是您可以同时写入的数据”不会发生,因为:

while(data){
    data.readrow()
    doSomethineAwesome()
    writeDataViaPoolTheBacksUp()
} 

只是不能旋转到快。

现在,如果您在一个异步线程上接受数据,并将该数据推送到队列中并在另一个异步线程中排空队列,您将遇到您描述的问题(即您的队列爆炸)。因此,当写入线程太落后时,您需要减慢/暂停读取线程的完成。

鉴于正在写入假定的队列,当它变得太长时,停止。

您可能会这样做的另一种方式是没有工作队列,但每次满足条件时都会触发异步写入。这很糟糕,因为您无法跟踪出色的工作,并且您正在对数据库进行许多小的更新,如果是雪花的话,它真的不喜欢。更好的方法是构建一组本地数据更改,我们将其称为批处理,当批处理达到某个大小时,您在一次操作中刷新更改集(并且在输入完成时刷新批处理,以捕获渣)

于 2022-01-18T20:22:35.283 回答