1

我有一个sourceStreamBaseData对象组成的。

我想把这个流分成n不同的流,然后BaseData根据自己的喜好过滤和转换每个对象。

最后,我希望n流只包含特定类型,并且分叉的流的长度可能会有所不同,因为将来可能会删除或添加数据。

我想我可以通过以下方式进行设置fork

import * as _ from 'highland';

interface BaseData {
    id: string;
    data: string;
}

const sourceStream = _([
    {id: 'foo', data: 'poit'},
    {id: 'foo', data: 'fnord'},
    {id: 'bar', data: 'narf'}]);

const partners = [
    'foo',
    'bar',
];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream.fork();

    partnerStream.filter((baseData: BaseData) => {
        return baseData.id === partner;
    });

    partnerStream.each(console.log);
});

我希望现在有两个流,并且foo-stream 包含两个元素:

{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }

bar-stream 包含一个元素:

{ id: 'bar', data: 'narf' }

然而我得到了一个错误:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
        throw new Error(
        ^

Error: Stream already being consumed, you must either fork() or observe()
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
    at Array.forEach (native)
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
    at Module._compile (module.js:570:32)
    at Object.Module._extensions..js (module.js:579:10)
    at Module.load (module.js:487:32)
    at tryModuleLoad (module.js:446:12)

如何将一个流分成多个流?


我也尝试链接调用,但我只得到一个流的结果:

partners.forEach((partner: string) => {
    console.log(partner);
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
        });

    partnerStream.each((item: BaseData) => {
        console.log(item);
    });
});

仅打印:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar

而不是预期的:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}

也可能是我误解的情况fork。根据其文档条目

Stream.fork() 分叉一个流,允许您添加具有共享背压的其他消费者。分叉给多个消费者的流只会以最慢的消费者可以处理它们的速度从其源中提取值。

注意:不要依赖分叉之间一致的执行顺序。此转换仅保证所有分叉将在处理第二个值 bar 之前处理一个值 foo。它不保证分叉处理 foo 的顺序。

提示:在分叉中修改流值时要小心(或使用这样做的库)。由于相同的值将传递给每个分叉,因此在一个分叉中所做的更改将在它之后执行的任何分叉中可见。再加上不一致的执行顺序,最终可能会出现细微的数据损坏错误。如果您需要修改任何值,则应制作副本并改为修改副本。

弃用警告:目前可以在消费流之后分叉(例如,通过转换)。这在下一个主要版本中将不再可能。如果您要分叉一个流,请始终在其上调用 fork。

因此,而不是“如何分叉流?” 我的实际问题可能是:如何将高原溪流即时复制到不同的溪流中?

4

2 回答 2

1

partnerStream.filter()返回一个的流。然后,您partnerStream再次使用消费partnerStream.each(),而无需调用fork()observe()。因此,要么链接partnerStream.filter().each()调用,要么将返回值分配给partnerStream.filter()变量并调用.each()它。

于 2017-02-07T08:44:42.263 回答
0

必须记住,在创建所有分叉之前不要使用分叉流。就好像一个人消费了一个分叉的流一样,它和它的“父级”都会被消费,从而使任何后续的分叉都从一个空流中分叉出来。

const partnerStreams: Array<Stream<BaseData>> = [];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
         }
    );

    partnerStreams.push(partnerStream);
});

partnerStreams.forEach((stream, index) => {
    console.log(index, stream);
    stream.toArray((foo) => {
        console.log(index, foo);
    });
});

它打印:

0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ]
1 [ { id: 'bar', data: 'narf' } ]
于 2017-02-07T09:59:28.673 回答