1

我有一个IQStreamable使用.DefineObservableMicrosoft.ComplexEventProcessing.Application

代码看起来很正常,但我不明白的是,当我使用Task.Run()into传入的参数时DefineObservable,出现异常。

但是,当我直接使用该属性而不将其传递给内部方法Task.Run()时,它起作用了。

例外

An unhandled exception of type 'System.InvalidOperationException' occurred in Microsoft.ComplexEventProcessing.Diagnostics.dll

Additional information: Cannot serialize value of type 'System.IObservable'1[ValueObjects.Price]'.

方法

private void Monitor(IObservable<Price> priceObservable)
    {
        const string applicationName = "RealtimeMonitoring";

        Microsoft.ComplexEventProcessing.Application application = PriceObserver.Server.CreateApplication(applicationName);
        IQStreamable<Price> sStreamable = application
            //.DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable)
            .DefineObservable<Price>(() => PriceObservable)
            .ToPointStreamable( => PointEvent<Price>.CreateInsert(DateTime.Now, price), AdvanceTimeSettings.IncreasingStartTime);

        var standingQuery = from p in streamable select price ;
        var sink = application.DefineObserver(() => new PriceObserver());

        using (standingQuery.Bind(sink).Run())
        {
            // some code...
        }
    }

来电:

Task.Run(()=>Monitor(PriceRealtimeProvider.Instance.PriceObservable)

问题:

  1. StreamInsight 是否序列化观察者对象?为什么?

  2. 之间有什么不同

.DefineObservable<Price>(() => PriceObservable)

这个 DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable)

为什么使用参数会导致问题?

4

1 回答 1

0
  1. 是的,但我仍然不知道最初的设计和原因。
  2. 对于这个电话:.DefineObservable<Price>(() => PriceObservable)。这意味着参数在应用程序内存中。并且参数需要被序列化才能传递给远程服务器。所以在这之后,参数实际上是在 StreamInsight 服务器内存中。由于它是接口类型,因此无法序列化参数。

对于此调用:DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable),我猜,这被视为委托调用,因此Instance.PriceObservable尚未实例化,直到 StreamInsight 服务器调用代码。发生这种情况时,一切都在 StreamInsight 服务器内存中。所以不需要序列化。

总之,第二次调用不会发生序列化。

我愿意纠正。

于 2015-02-06T08:11:29.960 回答