2

我一直在使用 StreamInsight v2.3 和它提供的更新的 Rx 功能。我正在研究将 SI 用于事件溯源实施。我已经调整了一些MSDN 示例代码以获得以下内容:

服务器进程的代码:

using (var server = Server.Create("Default"))
{
    var host = new ServiceHost(server.CreateManagementService());
    host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "http://localhost/SIDemo");
    host.Open();

    var myApp = server.CreateApplication("SIDemoApp");
    var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime); 
    mySource.Deploy("demoSource");

    Console.WriteLine("Hit enter to stop.");
    Console.ReadLine();

    host.Close();
}

客户端进程代码:

using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"http://localhost/SIDemo")))
{
    var myApp = server.Applications["SIDemoApp"];
    var mySource = myApp.GetObservable<long>("demoSource");

    using (var mySink = mySource.Subscribe(x => Console.WriteLine("Output - {0}", x)))
    {
        Console.WriteLine("Hit enter to stop.");
        Console.ReadLine(); 
    }
}

尝试运行它会产生以下错误:

不支持从远程 'System.Reactive.Linq.IQbservable`1[System.Int64]' 读取。使用“Microsoft.ComplexEventProcessing.Linq.RemoteProvider.Bind”方法通过远程观察者从源中读取。

我开始使用的示例代码定义了一个观察者和接收器,并将其绑定在 StreamInsight 服务器中。我试图让观察者保持在客户端进程中。有没有办法在客户端应用程序中为远程 StreamInsight 源设置观察者?这是否必须通过客户端观察到的服务器中的 WCF 端点之类的东西来完成?

4

1 回答 1

0

实际上错误是针对解决方案的。您需要“绑定”到源。

请检查以下片段:

//Get SOURCE from server 
var serverSource = myApp.GetStreamable<long>("demoSource");

//SINK for 'demoSource'
var sinkToBind = myApp.DefineObserver<long>( ()=> Observer.Create<long>( value => Console.WriteLine( "From client : " + value)));

//BINDING 
var processForSink = serverSource.Bind(sinkToBind).Run("processForSink");

另请注意,接收器将在服务器上运行,而不是像我一开始猜想它会在客户端上运行。如果您查看服务器和客户端的控制台应用程序,控制台输出正在写入服务器应用程序。

如果即使有一种方法可以在客户端运行接收器,我也不知道,我也想知道。

于 2016-10-26T17:31:27.437 回答