考虑一个问题:
- 按行拆分文件
- 将行写入结果文件
- 如果结果文件超过某个大小,则创建一个新的结果文件
例如,如果我有一个重量为 4gb 且拆分大小等于 1gb 的文件。结果是四个文件的重量为 1gb。
我正在寻找具有 Rx*/Bacon 或任何其他类似库的任何语言的解决方案。
考虑一个问题:
例如,如果我有一个重量为 4gb 且拆分大小等于 1gb 的文件。结果是四个文件的重量为 1gb。
我正在寻找具有 Rx*/Bacon 或任何其他类似库的任何语言的解决方案。
我在 Coffee with Highland.js 中的解决方案:
_ = require('underscore')
H = require('highland')
fs = require('fs')
debug = require('debug')
log = debug('main')
assert = require('assert')
readS = H(fs.createReadStream('walmart.dump')).map((buffer) ->
{ buffer: buffer }
)
MAX_SIZE = 10 ** 7
counter = 0
nextStream = ()->
stream = fs.createWriteStream("result/data#{counter}.txt")
wrapper = H.wrapCallback(stream.write.bind(stream))
counter += 1
return wrapper
debug('profile')('start')
s = readS.scan({
size: 0
stream: nextStream()
}, (acc, {buffer}) ->
debug('scan')(acc, buffer)
acc.size += buffer.length
acc.buffer = buffer
if acc.size > MAX_SIZE
debug('notify')(counter - 1, acc.size)
acc.size = 0
acc.stream = nextStream()
log(acc)
return acc
).filter((x)->x.buffer?)
s.parallel 4
s.flatMap((x) ->
debug('flatMap')(x)
x.stream(x.buffer)
)
.done -> debug('profile')('finish')
walmart.dump
是一个包含 6gb 文本的文本文件。拆分 649 个文件需要:
profile start +0ms
profile finish +53s