0

I'm working on porting over some Reactive Extension queries to StreamInsight but have run into an issue with an overlapping window query.

I have a source setup in my StreamInsight server and I'm trying to write an overlapping window query like this:

var source = streamInsightServer.GetObservable<EventPattern<MyEventArg>>("EventSource");

var query = source.Window(new TimeSpan(0, 0, 1), new TimeSpan(0, 0, 0, 250));

where source is IQbservable<EventPattern<MyEventArg>> and query is then IQbservable<IObserverable<EventPattern<MyEventArg>>>

With Reactive the observer was created as follows:

_observer = query.Subscribe(evts =>
            {
                evts.Count().Subscribe(c =>
                {
                    //push output here
                });
            });

How can I attach an observer to retrieve the equivalent output from StreamInsight?

4

2 回答 2

0

好的,所以我设法创建了两个接收器,它们实现了与 Rx 订阅相同的输出,如下所示:

var query = source.Window(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(250));
query.Deploy("windowQuery");
var innerSink = applicationStatus.DefineObserver(() => Observer.Create<int>(c => /*output here*/));
innerSink.Deploy("innerSink");
var sink = applicationStatus.DefineObserver(() => Observer.Create<IObservable<EventPattern<MyEventArg>>>(e => e.Count().AsQbservable().Bind(innerSink).Run().Void()));
sink.Deploy("mySink");
ProcessBinding = query.Bind(sink).Run("processBindingName");

需要注意的是,尽管该.Void()方法只是一个扩展方法,它什么都不做并返回 void,这会导致表达式被解释为 anAction而不是 aFunc以匹配所需的签名。此外,通过运行内部接收器创建的所有绑定都不会被处理和建立;这些可以在 StreamInsight 事件流调试器中看到。

虽然这实现了相同的结果,但由于扩展方法 hack 并且没有处理内部进程绑定,我不确定它是否是一个好的解决方案。仍在寻找是否有人知道如何在没有这些问题的情况下编写此代码!

于 2015-02-18T14:22:35.523 回答
0

如果您的目标是共享数据源,那么您需要使用.With().

所以你的过程应该是:

var process = query.Bind(innerSink).With(query.Bind(sink).Run("processBindingName");

当您完成该过程或想要关闭应用程序时,您只需调用.Dispose().

就您的窗口而言,您应该像这样使用 HoppingWindow:

var query = from win in source.HoppingWindow(new TimeSpan(0, 0, 1), new TimeSpan(0, 0, 0, 250))
select win.Count();
于 2015-02-19T01:24:50.820 回答