0

我尝试编写一个程序highland.js来下载几个文件,解压缩并解析为对象,然后将对象流合并为一个流flatMap并打印出来。

function download(url) {
    return _(request(url))
        .through(zlib.createGunzip())
        .errors((err) => console.log('Error in gunzip', err))
        .through(toObjParser)
        .errors((err) => console.log('Error in OsmToObj', err));
}  

const urlList = ['url_1', 'url_2', 'url_3'];

_(urlList)
    .flatMap(download)
    .each(console.log);

当所有 URL 都有效时,它工作正常。如果 URL 无效且没有下载文件,则 gunzip 报告错误。我怀疑发生错误时流会关闭。我希望flatMap其他流会继续,但是该程序不会下载其他文件并且没有打印任何内容。

处理流中错误的正确方法是什么,以及如何flatMap在一个流出现错误后不停止?

在命令式编程中,我可以添加调试日志来跟踪错误发生的位置。如何调试流式代码?

PS。toObjParser是一个节点变换流。它采用 OSM XML 的可读流并输出与 Overpass OSM JSON 兼容的对象流。见https://www.npmjs.com/package/osm2obj

2017-12-19 更新:

我试着push按照errors@amsross 的建议打电话。为了验证是否push真的有效,我推送了一个 XML 文档,并通过以下解析器对其进行了解析,我从输出中看到了它。但是,流仍然停止并且 url_3 没有下载。

function download(url) {
    console.log('download', url);
    return _(request(url))
        .through(zlib.createGunzip())
        .errors((err, push) => {
            console.log('Error in gunzip', err);
            push(null, Buffer.from(`<?xml version='1.0' encoding='UTF-8'?>
<osmChange version="0.6">
<delete>
<node id="1" version="2" timestamp="2008-10-15T10:06:55Z" uid="5553" user="foo" changeset="1" lat="30.2719406" lon="120.1663723"/>
</delete>
</osmChange>`));
        })
        .through(new OsmToObj())
        .errors((err) => console.log('Error in OsmToObj', err));
}

const urlList = ['url_1_correct', 'url_2_wrong', 'url_3_correct'];

_(urlList)
    .flatMap(download)
    .each(console.log);
4

1 回答 1

0

2017 年 12 月19 日更新:好的,所以我不能给你一个很好的理由,但我可以告诉你,从使用downloadinsequence产生的流切换到merge将它们组合在一起可能会给你你的结果后。不幸的是(或不是?),您将不再以任何规定的顺序获得结果。

const request = require('request')
const zlib = require('zlib')
const h = require('highland')

// just so you can see there isn't some sort of race
const rnd = (min, max) => Math.floor((Math.random() * (max - min))) + min
const delay = ms => x => h(push => setTimeout(() => {
  push(null, x)
  push(null, h.nil)
}, ms))

const download = url => h(request(url))
  .flatMap(delay(rnd(0, 2000)))
  .through(zlib.createGunzip())

h(['urlh1hcorrect', 'urlh2hwrong', 'urlh3hcorrect'])
  .map(download).merge()
  // vs .flatMap(download) or .map(download).sequence()
  .errors(err => h.log(err))
  .each(h.log)

2017 年 12 月 3 日更新:当流中遇到错误时,它会结束该流。为避免这种情况,您需要处理错误。您当前errors用于报告错误,但不处理它。您可以执行以下操作以转到流中的下一个值:

.errors((err, push) => {
  console.log(err)
  push(null) // push no error forward
})

原文:不知道are的输入输出类型很难回答toObjParser

因为through将值流传递给提供的函数并期望返回值流,所以您的问题可能在于toObjParser具有类似Stream -> Object或的签名Stream -> Stream Object,其中错误发生在内部流上,直到它不会发出任何错误消耗。

的输出是.each(console.log)什么?如果它正在记录流,那很可能是您的问题。

于 2017-11-30T18:38:57.507 回答