0

我想为我的库中的一种方法启用 Promise 取消,reduce. 我只对取消异步迭代的 Promise 感兴趣,因为它们很可能无限期挂起。

const reduceAsyncIterable = async (fn, possiblyX0, state, x) => {
  const iter = x[Symbol.asyncIterator]()
  const y0 = isUndefined(possiblyX0) ? (await iter.next()).value : possiblyX0
  if (isUndefined(y0)) {
    throw new TypeError('reduce(...)(x); x cannot be empty')
  }
  let y = await fn(y0, (await iter.next()).value)
  for await (const xi of iter) {
    if (state.cancelled) return // stops async iterating if `cancel` called
    y = await fn(y, xi)
  }
  return y
}

const reduce = (fn, x0) => {
  if (!isFunction(fn)) {
    throw new TypeError('reduce(x, y); x is not a function')
  }
  return x => {
    if (isIterable(x)) return reduceIterable(fn, x0, x)
    if (isAsyncIterable(x)) {
      const state = { cancelled: false, resolve: () => {} }
      const p = new Promise((resolve, reject) => {
        state.resolve = resolve
        reduceAsyncIterable(fn, x0, state, x).then(
          y => state.cancelled || resolve(y)
        ).catch(reject)
      })
      p.cancel = () => { state.cancelled = true; state.resolve() } // shortcircuit the Promise `p` on `cancel` call
      return p
    }
    if (is(Object)(x)) return reduceObject(fn, x0, x)
    throw new TypeError('reduce(...)(x); x invalid')
  }
}

上面的代码似乎可以工作,但我不禁觉得这里有内存泄漏。特别是在await iter.next()for await (const xi of iter)。如果这些 await 语句永远占用(对于异步迭代器可能会如此),则reduceAsyncIterable可能永远不会返回。从用户的角度来看,这很好,因为在 中发生短路reduce,因为用户看到的 Promise 已解决。但是从计算机的角度来看,取消这个操作的 Promise 会导致内存泄漏吗?

我希望能够cancel在返回的承诺上使用该函数,如下所示:

const myOngoingTaskPromise = reduce(someReducer, null)(myInfiniteAsyncIterable)

myOngoingTaskPromise.cancel() // resolves myOngoingTaskPromise with undefined

myOngoingTaskPromise // Promise { undefined }
4

1 回答 1

1

我找到了方法,Promise.race就像秘密武器什么的

    if (isAsyncIterable(x)) {
      const state = { cancel: () => {} }
      const cancelToken = new Promise((_, reject) => { state.cancel = reject })
      const p = Promise.race([
        reduceAsyncIterable(fn, x0, x),
        cancelToken,
      ])
      p.cancel = () => { state.cancel(new Error('cancelled')) }
      return p
    }

没有内存泄漏

减少异步可迭代承诺取消

于 2020-06-13T18:00:28.663 回答