4

我们在 CEP 上看到内存不足错误。线程转储显示大约有 32000 个线程在监视器上休眠。此外,即使 CEP JVM 选项指定在内存不足时生成 HeapDump,我们也没有看到任何堆转储生成。请建议。(CEP JVM -Xms256m -Xmx1536m)

1) Cassandra 在这个 CEP 上被禁用
2) CEP 版本是 2.1.0
3) CEP 前面是 WSO2 ESB(使用 BAM 中介)。
4) 除了向 CEP 发送实际的有效载荷数据外,ESB 还向 CEP 发送周期性的心跳(每 15 ec)。

5)我们还在 ESB 上配置了 JMX 代理,它每 15 分钟监视 CEP(cpu/内存线程)
6)即使指定了 -XX:HeapDumpPath= 参数,也没有找到堆转储

  • 在此 OOM 之前,CEP 连续运行了 7 天。重新启动后,我们观察到线程数以每天大约 4000-5000 个线程的速度稳步增加

CEP 日志..

[2013-06-10 05:31:49,040] ERROR -  Thread Thread[ActiveMQ InactivityMonitor  WriteCheckTimer,5,main] died {org.apache.zookeeper.server.NIOServerCnxn}
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:640)
at  java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(ThreadPoolExecutor.java:727)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:657)
at org.apache.activemq.transport.AbstractInactivityMonitor.writeCheck(AbstractInactivityMonitor.java:153)
at org.apache.activemq.transport.AbstractInactivityMonitor$2.run(AbstractInactivityMonitor.java:117)
at org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
at java.util.TimerThread.mainLoop(Timer.java:512)
at java.util.TimerThread.run(Timer.java:462)
[2013-06-10 05:31:49,040] ERROR -  Thread Thread[ActiveMQ InactivityMonitor WriteCheckTimer,5,main] died {org.apache.zookeeper.server.NIOServerCnxn}
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:640)
at java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(ThreadPoolExecutor.java:727)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:657)
at org.apache.activemq.transport.AbstractInactivityMonitor.writeCheck(AbstractInactivityMonitor.java:153)
at org.apache.activemq.transport.AbstractInactivityMonitor$2.run(AbstractInactivityMonitor.java:117)
at org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
at java.util.TimerThread.mainLoop(Timer.java:512)
at java.util.TimerThread.run(Timer.java:462)

CEP 中配置的一些查询

<cep:query name="xxxBuildUpQuery">
<cep:expression><![CDATA[from  xxxCEPIntgDataStream[interfaceInformationInterfaceName=='xxx-xxx' or interfaceInformationInterfaceName=='xxx-xxx'or 
                               interfaceInformationInterfaceName=='xxx-xxx' or interfaceInformationInterfaceName=='xxx-xxx' or 
                               interfaceInformationInterfaceName=='xxx-xxx' or interfaceInformationInterfaceName=='xxxx-xxx' or 
                               interfaceInformationInterfaceName=='xxx-xxx' or 
                               interfaceInformationInterfaceName=='xxx-xxx' ]#window.time(60000)
insert into buildUpStream interfaceInformationInterfaceName, count(interfaceInformationxxxId) as noOfInflowMsgs group by interfaceInformationInterfaceName]]></cep:expression>
<cep:output brokerName="activemqJmsBroker" topic="xxxBuildUpInfoTopic">
  <cep:xmlMapping>
    <xxxAnalytics>
      <buildUpInfo>
        <interfaceName>{interfaceInformationInterfaceName}</interfaceName>
        <buildUpPerMin>{noOfInflowMsgs}</buildUpPerMin>
      </buildUpInfo>
    </xxxAnalytics>
  </cep:xmlMapping>
</cep:output>
</cep:query>
<cep:query name="xxxQueueDepthQuery">
<cep:expression><![CDATA[from xxxIntgrQueueDepthData_v1
insert into xxxIntgrQueueDepthStream flowName,appName, queueDepth]]>  </cep:expression>
<cep:output brokerName="activemqJmsBroker" topic="xxxIntgrQueueDepthTopic">
  <cep:xmlMapping>
    <xxxAnalytics>
      <queueDepthInfo>
        <flowName>{flowName}</flowName> 
        <appName>{appName}</appName>
        <depth>{queueDepth}</depth>
      </queueDepthInfo>
    </xxxAnalytics>
  </cep:xmlMapping>
</cep:output>
</cep:query>
<cep:query name="xxxClockDataQuery">
  <cep:expression><![CDATA[from testStream
insert into testOutClockDataStream AEDateTime]]></cep:expression>
  <cep:output brokerName="activemqJmsBroker" topic="xxxClockDataTopic">
    <cep:xmlMapping>
      <xxxClockFeed>
        <data>
          <XXDateTime>{XXDateTime}</XXDateTime>
        </data>
      </xxxClockFeed>
    </cep:xmlMapping>
  </cep:output>
 </cep:query>
 <cep:query name="xxxSimltrPaymntAvgQuery_1">
  <cep:expression><![CDATA[from xxxCEPIntgDataStream#window.time(15000)
    insert into xxxSimltrPymntAvgData avg(amount) as avgAmount, currency group by currency]]></cep:expression>
  <cep:output brokerName="activemqJmsBroker" topic="xxxAvgPaymntDetails">
    <cep:xmlMapping>
      <xxxAnalytics>
        <avgPaymentData>
          <avgAmount>{avgAmount}</avgAmount>
          <currency>{currency}</currency>
        </avgPaymentData>
      </xxxAnalytics>
    </cep:xmlMapping>
  </cep:output>

谢谢拉吉夫帕蒂尔

4

2 回答 2

1

我发现 Siddhi Manager 以 Integer.MAX_VALUE 作为核心池大小启动了一个计划线程池。这意味着每个请求都会创建一个新线程,没有超时策略。(参考:ThreadPoolExecutor

在 WSO2 修复此问题之前,您可以更改此线程池的大小。Pe,在 org.wso2.siddhi.core.SiddhiManager 类中更改以下行:

this.siddhiContext.setScheduledExecutorService(Executors.newScheduledThreadPool(Integer.MAX_VALUE));

SiddhiManager 版本1.1.0-wso2v1 中的第 77 行)

对此:

this.siddhiContext.setScheduledExecutorService(Executors.newScheduledThreadPool(100));

此更改将创建 100 的核心池大小,最大池大小为 Integer.MAX_VALUE,并且空闲线程(超过核心池大小)将在完成后立即删除。

于 2013-06-21T10:23:19.137 回答
0

看起来 timeBatch 窗口是问题所在,我们有几个使用它的查询......一旦删除线程加速就被逮捕了。可能 CEP 的 timeBatch 窗口功能需要更多测试。

于 2013-06-21T05:43:05.010 回答