1

让我先解释一下我想要实现的目标。

假设我从事件流中传入以下数据

var data = new string[] { 
                "hello", 
                "Using", 
                "ok:michael", 
                "ok", 
                "begin:events", 
                "1:232", 
                "2:343", 
                "end:events", 
                "error:dfljsdf",
                "fdl", 
                "error:fjkdjslf",
                "ok"  
            };

当我订阅数据源时,我想得到以下结果

"ok:michael"
"ok"
"begin:events 1:232 2:343 end:events"
"error:dfljsdf"
"error:fjkdjslf"
"ok"

基本上,我想获取以ok 或 error开头的任何数据以及 begin 和 end 之间的数据。

到目前为止我已经尝试过了..

var data = new string[] { 
                "hello", 
                "Using", 
                "ok:michael", 
                "ok", 
                "begin:events", 
                "1:232", 
                "2:343", 
                "end:events", 
                "error:dfljsdf",
                "fdl", 
                "error:fjkdjslf",
                "ok"  
            };



            var dataStream = Observable.Generate(
                                data.GetEnumerator(), 
                                e => e.MoveNext(), 
                                e => e, 
                                e => e.Current.ToString(), 
                                e => TimeSpan.FromSeconds(0.1));         

            var onelineStream = from d in dataStream
                                where d.StartsWith("ok") || d.StartsWith("error")
                                select d;

            // ???
            // may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events"
            // but it is not working...
            var multiLineStream = from list in dataStream.Buffer<string, string, string>(
                                bufferOpenings: dataStream.Where(d => d.StartsWith("begin")),
                                bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end")))
                              select String.Join(" ", list);

            // merge two stream????
            // but I have no clue how to merge these twos :(

            mergeStream .Subscribe(d =>
            {
                Console.WriteLine(d);
                Console.WriteLine();
            });

由于我对响应式编程非常陌生,因此我无法让自己以响应式方式思考。:(

提前致谢。

4

1 回答 1

6

你是如此,如此接近正确答案!

本质上,您的onelineStream&multiLineStream查询几乎是正确的。

将它们合并在一起非常容易。只需这样做:

onelineStream.Merge(multiLineStream)

但是,您的查询不足之处Observable.Generate在于您用来引入值之间的延迟。这会创建一个 observable,如果您有多个订阅者,则可以将值“扇出”。

给定您的数据和定义,dataStream看看这段代码的行为方式:

dataStream.Select(x => "!" + x).Subscribe(Console.WriteLine);
dataStream.Select(x => "@" + x).Subscribe(Console.WriteLine);

你得到这些值:

!hello
@Using
!ok:michael
@ok
@1:232
!begin:events
@2:343
!end:events
!fdl
@error:dfljsdf
!error:fjkdjslf
@ok

请注意,一些订阅由一个订阅处理,而另一些则由另一个订阅处理。这意味着即使您的onelineStream&multiLineStream查询几乎是正确的,它们也只会看到一些数据,因此不会像您预期的那样运行。

您还可以获得可以跳过和重复值的竞争条件。所以最好避免这种可观察的。

在值之间引入延迟的更好方法是这样做:

var dataStream = data.ToObservable().Do(_ => Thread.Sleep(100));

现在这创建了一个“冷”可观察对象,这意味着每个新订阅者都将获得可观察对象的新订阅,因此从第一个值开始。

您的multiLineStream查询将无法在冷的 observable 上正常工作。

为了使数据流成为“热的”可观察的(在订阅者之间共享值),我们使用Publish操作符。

所以,multiLineStream现在看起来像这样:

var multiLineStream =
    dataStream.Publish(ds =>
        from list in ds.Buffer(
            ds.Where(d => d.StartsWith("begin")),
            b => ds.Where(d => d.StartsWith("end")))
        select String.Join(" ", list));

然后你可以得到你的结果,如下所示:

onelineStream.Merge(multiLineStream).Subscribe(d =>
{
    Console.WriteLine(d);
    Console.WriteLine();
});

这就是我得到的:

ok:michael
ok
begin:events 1:232 2:343 end:events
error:dfljsdf
error:fjkdjslf
ok

让我知道这是否适合你。

于 2012-05-12T01:01:01.753 回答