1

我有一个场景,我的生产者运行得比我的消费者快得多,有很多数据。但是,我不想阻止生产者。所以唯一的选择是通过文件缓冲额外的数据。

我设法提出了一个非最佳解决方案。但是,必须有一种更务实的方式来实现这一点。

我更熟悉 RxJS,我可能不会有太多问题,但我刚刚开始切换到高地,希望得到一些建议。

import fs from 'fs-promise'
import _ from 'highland'

export default function buffer(id, source) {
  const ctx = {
    fd: null,
    written: 0,
    read: 0,
    eof: false,
    err: null,
    path: `/tmp/${id}`
  }

  async function drain(push) {
    ctx.fd = ctx.fd || await fs.open(ctx.path, 'w+')

    const bufferSize = 1024 * 1024

    while (ctx.read < ctx.written) {
      let buffer = new Buffer(bufferSize)

      buffer = buffer.slice(0,
        await fs.read(ctx.fd, buffer, 0, buffer.length, ctx.read))

      if (buffer.length === 0) {
        break
      }

      ctx.read += buffer.length

      push(null, buffer)
    }
  }

  // TODO: What about errors?
  source.each(async (x) => {
    try {

      ctx.fd = ctx.fd || await fs.open(ctx.path, 'w+')

      if (x === _.nil) {
        ctx.eof = true
      } else {
        ctx.written += await fs.write(ctx.fd, x, 0, x.length, ctx.written)
      }
    } catch (err2) {
      ctx.err = err2
    }
  })

  return _(async (push, next) => {
    try {
      await drain()

      if (ctx.err) {
        throw ctx.err
      } else if (ctx.eof) {
        await drain()
        fs.unlink(ctx.path)
        push(null, _.nil)
      } else {
        setTimeout(next, 40)
      }
    } catch (err2) {
      fs.unlink(ctx.path)
      push(err2)
    }
  })
}
4

0 回答 0