1

我想在处理下一行之前暂停我的 cassandra 流以进行一些异步操作。

每一行都在一个可读的事件侦听器中接收。我试过使用 stream.pause 但它实际上并没有暂停流。我也在“数据”事件侦听器中尝试过同样的方法,但这也不起作用。也许会非常感谢您的见解和解决方案。这是我的代码。在可读和“等待”中使用 async 使用 await 实际上并不能阻止下一行在异步函数完成之前出现。

function start() {
let stream = client.stream('SELECT * FROM table');
stream
    .on('end', function () {
        console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
        console.error(err);
    })
    .on('readable', function () {
        let row = this.read();
        asyncFunctionNeedTowaitForthisBeforeNextRow()
    })
}

//下面的不行

function start() {
let stream = client.stream('SELECT * FROM table');
stream
    .on('end', function () {
        console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
        console.error(err);
    })
    .on('readable', async function () {
        let row = this.read();
        stream.pause();
        await asyncFunctionNeedTowaitForthisBeforeNextRow();
        stream.resume();
    })
 }
4

2 回答 2

0

stream.pause()不起作用的原因是因为readable事件触发了多次,所以再次调用了相同的异步函数。data事件也是如此。

我建议使用自定义可写流来正确处理所有这些异步内容。

可写流看起来像:

const {Writable} = require('stream');

const myWritable = new Writable({
  async write(chunk, encoding, callback) {
    let row = chunk.toString();
    await asyncFunctionNeedTowaitForthisBeforeNextRow();
    callback(); // Write completed successfully
  }
})

然后调整您的代码以使用此可写:

function start() {
  let stream = client.stream('SELECT * FROM table');
  stream.pipe(myWritable);
  stream
    .on('end', function () {
      console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
      console.error(err);
    })
}
于 2019-09-18T12:32:28.360 回答
0

请注意,即使您将事件的处理程序声明'readable'为异步函数,调用者也不等待返回的承诺完成,因为Stream期望事件处理程序正常执行。

一个解决方案可能是:

stream
  .on('end', () => {})
  .on('error', () => {})
  .on('data', row => {
    stream.pause();
    doSomethingAsync(row).then(() => stream.resume());
  });

请注意,理想情况下,您应该在执行异步操作时利用并行性,因此最好每次读取几行然后暂停。

于 2019-09-18T12:55:27.557 回答