我不知道为什么我的 StreamInsight 引擎不能处理超过 140 个事件/秒,在每秒传递超过 8000 个事件的条件下。我在性能监视器中看到我的查询的事件数。并且在应用程序的控制台中就像引擎避免了很多事件。我有一个 id 为 200 的事件,然后是下一个 id 为 3330 的事件。有人知道问题出在哪里吗?
对于我的测试,我有一系列查询。第一个查询的输出是第二个查询的输入,依此类推,对于这种情况,结果不超过 140 个事件/秒。
现在我用一个简单的查询测试了我的应用程序,它输出从输入流接收到的所有事件,在这种情况下,服务器处理大约 2000 个事件/秒。我有一些带有结果的图像,但不幸的是我还不能放它们。困扰我的是为什么事件/秒的数量突然减少到 0 以及为什么引擎仍然没有考虑所有输入事件。
这是我的服务器配置和我正在使用的查询。
using (var server = Server.Create("SIInstance23"))
{
log.Info("StreamInsight Server started");
Application application = server.CreateApplication("StreamInsight Application Test");
ServiceHost host = new ServiceHost(server.CreateManagementService());
WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message);
binding.HostNameComparisonMode = HostNameComparisonMode.Exact;
host.AddServiceEndpoint(typeof(IManagementService),
binding,
"http://localhost:8081/StreamInsight/SIInstance23");
ScenarioWorkflow.NormalScenarioWorkflow(application, server, host);
}
public static void NormalScenarioWorkflow(this Application application, Server server, ServiceHost host)
{
host.Open();
IQStreamable<SensorDataEvent> inputStream = application.DefineSensorDataEventStream();
var simpleQuery = from e in inputStream
select e;
var simpleQueryConsumer = application.DefineConsumer(simpleQuery);
var simpleQueryBinding = simpleQuery.Bind(simpleQueryConsumer);
using (simpleQueryBinding.Run("process"))
{
Thread.Sleep(1000);
Console.WriteLine(string.Empty);
DiagnosticSettings settings = server.GetDiagnosticSettings(new Uri("cep:/Server/Application/StreamInsight Application Test/Entity/process/Query/StreamableBinding_1"));
settings.Aspects |= DiagnosticAspect.PerformanceCounters;
server.SetDiagnosticSettings(new Uri("cep:/Server/Application/StreamInsight Application Test/Entity/process/Query/StreamableBinding_1"), settings);
Console.WriteLine("***Hit Return to exit after viewing query output***");
Console.WriteLine();
Console.ReadLine();
}
host.Close();
}
接下来,当我尝试运行我的最后一个查询(从链中)时,我得到了大约 1500 个事件/秒。仍然存在事件/秒突然减少的问题。我认为对事件进行的每次转换都会产生一个需要由引擎处理的新转换。因此,来自查询链的事件数应该超过 1500。如果我错了,请纠正我。我是这个领域的新手,欢迎任何建议。
我认为问题出在下面的课程中。我也尝试使用PointEvent<SendorDataEvent>
而不是SensorDataEvent
,并尝试插入 CTI,但没有结果。
public class SocketEventInputAdapter : IObservable<SensorDataEvent>, IDisposable
{
public List<IObserver<SensorDataEvent>> observers { get; set; }
public object sync { get; set; }
public bool done { get; set; }
public SocketEventInputAdapter()
{
this.done = false;
this.observers = new List<IObserver<SensorDataEvent>>();
this.sync = new object();
SocketServer serverSocket = new SocketServer(4444, this);
}
internal void NotifyObservers(SensorDataEvent value)
{
lock (sync)
{
if (!done)
{
foreach (var observer in observers)
{
observer.OnNext(value);
}
}
}
}
public IDisposable Subscribe(IObserver<SensorDataEvent> observer)
{
lock (sync)
{
observers.Add(observer);
}
return new Subscription(this, observer);
}
void IDisposable.Dispose()
{
}
private sealed class Subscription : IDisposable
{
private readonly SocketEventInputAdapter _subject;
private IObserver<SensorDataEvent> _observer;
public Subscription(SocketEventInputAdapter subject, IObserver<SensorDataEvent> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
IObserver<SensorDataEvent> observer = _observer;
if (null != observer)
{
lock (_subject.sync)
{
_subject.observers.Remove(observer);
}
_observer = null;
}
}
}
}
谢谢你。