1

考虑一个问题:

  • 按行拆分文件
  • 将行写入结果文件
  • 如果结果文件超过某个大小,则创建一个新的结果文件

例如,如果我有一个重量为 4gb 且拆分大小等于 1gb 的文件。结果是四个文件的重量为 1gb。

我正在寻找具有 Rx*/Bacon 或任何其他类似库的任何语言的解决方案。

4

1 回答 1

0

我在 Coffee with Highland.js 中的解决方案:

_ = require('underscore')
H = require('highland')
fs = require('fs')
debug = require('debug')
log = debug('main')
assert = require('assert')

readS = H(fs.createReadStream('walmart.dump')).map((buffer) ->
  { buffer: buffer }
)
MAX_SIZE = 10 ** 7
counter = 0
nextStream = ()->
  stream = fs.createWriteStream("result/data#{counter}.txt")
  wrapper = H.wrapCallback(stream.write.bind(stream))
  counter += 1
  return wrapper


debug('profile')('start')
s = readS.scan({
    size: 0
    stream: nextStream()
  }, (acc, {buffer}) ->
  debug('scan')(acc, buffer)
  acc.size += buffer.length
  acc.buffer = buffer
  if acc.size > MAX_SIZE
      debug('notify')(counter - 1, acc.size)
      acc.size = 0
      acc.stream = nextStream()
  log(acc)
  return acc
).filter((x)->x.buffer?)

s.parallel 4
s.flatMap((x) ->
  debug('flatMap')(x)
  x.stream(x.buffer)
)
.done -> debug('profile')('finish')

walmart.dump是一个包含 6gb 文本的文本文件。拆分 649 个文件需要:

  profile start +0ms
  profile finish +53s
于 2015-06-03T13:28:13.950 回答