0

我使用Nesper作为 CEP 引擎来处理我的应用程序中的事件。

我正在尝试对以下 EPL 语句进行建模:

  • 事件的Value场是在有限的窗口时间内平均的。
  • 如果这些平均值中的任何一个与1期望值匹配,则会生成一个事件。

我将其建模为:

SELECT (
(AVG(ParameterEvent1.Value) = 1) OR 
(AVG(ParameterEvent2.Value) = 1)
...
(AVG(ParameterEvent50.Value) = 1)
) AS BooleanValue 
FROM 
ParameterEvent(Id = 1).win:time(3 sec) AS ParameterEvent1, 
ParameterEvent(Id = 2).win:time(3 sec) AS ParameterEvent2 
...
ParameterEvent(Id = 50).win:time(3 sec) AS ParameterEvent50

在一个单独的线程中,我以50 event/sec的稳定速率为引擎提供参数值。

com.espertech.esper.client.EPException: ReaderWriterLock timeout expired在发生异常之前,此设置会导致大量 CPU 和 RAM 使用。

我想知道是什么导致了这个问题。有趣的是,当我将窗口从更改为时win:time(3 sec)std:lastevent()此问题得到解决。但是,我需要窗口来确保参数的值1在生成事件前 3 秒。

完整的演示代码如下。你只需要安装Nesper包来运行它:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using com.espertech.esper.client;

namespace NesperTester
{
    class Program
    {
        public class ParameterEvent
        {
            public int Id { get; set; }
            public int Value { get; set; }
        }

        static EPServiceProvider engine;

        const int NParameters = 50;

        static void Main(string[] args)
        {
            engine = EPServiceProviderManager.GetDefaultProvider();

            engine.EPAdministrator.Configuration.AddEventType("ParameterEvent", typeof(ParameterEvent));

            CreateEPLStatement();

            Task.Factory.StartNew(() => SimulateParameterChange(), TaskCreationOptions.LongRunning);

            System.Threading.Thread.Sleep(int.MaxValue);
        }

        static void CreateEPLStatement()
        {
            const int windowLength = 3;

            var sb = new StringBuilder();

            for (int i = 1; i <= NParameters; i++)
            {
                sb.AppendFormat("(AVG(ParameterEvent{0}.Value) = 1) OR ", i);
            }

            var rule = sb.ToString();
            rule = rule.Remove(rule.Length - 4, 4);
            sb.Clear();

            for (int paramId = 1; paramId <= NParameters; paramId++)
            {
                sb.AppendFormat("ParameterEvent(Id = {0}).win:time({1} sec) AS ParameterEvent{0}, ", paramId, windowLength);
                //sb.AppendFormat("ParameterEvent(Id = {0}).std:lastevent() AS ParameterEvent{0}, ", paramId, windowLength);
            }

            var selectSources = sb.ToString();
            selectSources = selectSources.Remove(selectSources.Length - 2, 2);

            var eplStr = string.Format("SELECT ({0}) AS BooleanValue FROM {1}", rule, selectSources);

            var statementName = string.Format("statement1");

            var statement = engine.EPAdministrator.CreateEPL(eplStr, statementName, null);
            statement.Start();
            statement.Events += Statement_Events;
        }


        static bool PrevState = false;
        static void Statement_Events(object sender, UpdateEventArgs e)
        {
            var underlying = e.NewEvents[0].Underlying as Dictionary<string, object>;

            if (underlying.Last().Value == null)
            {
                Console.WriteLine("Statement value is unknown");
            }
            else
            {
                var statementValue = (bool)underlying.First().Value;

                if (statementValue != PrevState)
                {
                    PrevState = statementValue;
                    Console.WriteLine("Statement is {0}", statementValue);
                }
            }
        }

        static void SimulateParameterChange()
        {
            int counter = 0;

            while (true)
            {
                for (int i = 1; i <= NParameters; i++)
                {
                    var evt = new ParameterEvent();

                    evt.Id = i;

                    if (i == 1 && (counter/4) % 2 == 0)
                    {
                        evt.Value = 1;
                    }
                    else
                    {
                        evt.Value = 0;
                    }

                    engine.EPRuntime.SendEvent(evt);

                    System.Threading.Thread.Sleep(1000 / NParameters);
                }

                counter++;
            }
        }
    }
}
4

1 回答 1

0

请参阅http://espertech.com/esper/solution_patterns.php#expiry-3 用于触发的“有”子句。

于 2017-06-25T16:54:25.997 回答