有没有人观察到他们的 ActiveMQ KahaDB 或 LevelDB 存储的磁盘空间增长缓慢?
我们正在使用以下内容:
- Windows Server 2008 SP1(用于部署)和 Windows 7 SP1(用于开发)
- Java 1.7
- ActiveMQ 5.11.1 和 5.13.2(尝试了 KahaDB 和 LevelDB)
- ServiceMix 5.4.0,其 Karaf 服务器托管基于 Camel 的 Java 应用程序 bean,其 ActiveMQ 服务器未使用
- 消息速率 = 每秒 200 条 XML 消息传入和大约相同的传出(由 Java 应用程序 bean 转换为 JSON)
- 队列数 = 大约 50 个队列共享上述消息;特别是 2 个队列正在处理多达一半的流量
- 所有消息设置为:
- 交付持久性 = false
- 生存时间 (TTL) = 1 或 2 分钟(最长)
卡哈数据库
将 ActiveMQ 与 KahaDB 一起使用时,我观察到 db*.log 文件最初是从 db-1.log 到 db-2.log,然后到 db-3.log 等等,以及较旧的 db* .log 文件按预期清理(删除)。然后在某个时间点可能是 10 小时后,旧的 db*.log 文件不再被清理;新的 db*.log 文件出现,KahaDB 开始以每天大约 4 GB 的速度扩展。当它最终达到我们配置的 50 GB 上限时,服务器当然会停止工作。
似乎一小部分消息未能过期(尽管被赋予了 TTL)并因此阻止了正常清理。我试图手动删除旧的 db*.log 文件,但我被阻止这样做,并警告这些文件仍在使用中。
KahaDB db*.log 文件在文本编辑器中是“半可读的”(!);即我可以识别应用程序消息,但这并不能告诉我真正发生了什么。
级别数据库
我最近尝试将 LevelDB 与 ActiveMQ 5.13.2 一起使用,尽管存储速度慢得多(每天大约 200 MB),但该存储似乎仍在扩展。
我相信 LevelDB 数据存储使用压缩;大多数文件似乎是二进制文件,因此在没有合适的阅读器/浏览器的情况下非常不透明。
有没有人找到或实现了一个工具来读取/浏览 KahaDB 或 LevelDB 消息存储的非持久消息?
我们的配置
下面是我们的activemq.xml通常的样子:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="XXXX" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<deadLetterStrategy>
<discarding />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
-->
<managementContext>
<managementContext createConnector="true" connectorPort="1099"/>
</managementContext>
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<levelDB directory="${activemq.data}/leveldb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<networkConnectors>
<networkConnector
uri="XXX"
networkTTL="XXX"
duplex="XXX"
prefetchSize="XXX"
userName="${networkconnector.username}"
password="${networkconnector.password}"
name="XXX" >
<excludedDestinations>
</excludedDestinations>
<staticallyIncludedDestinations>
<queue physicalName="XXX" />
<queue physicalName="XXX" />
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
<transportConnectors>
<transportConnector name="amqp" uri="amqp://0.0.0.0:XXXX?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="openwire" uri="tcp://0.0.0.0:XXXX?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="openwire-outbound" uri="tcp://0.0.0.0:XXXX?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
</transportConnectors>
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="XXX" password="XXX" groups="XXX" />
</users>
</simpleAuthenticationPlugin>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry topic=">" read="XXX" write="XXX" admin="XXX" />
<authorizationEntry queue=">" read="XXX" write="XXX" admin="XXX" />
<authorizationEntry queue="XXX" read="XXX" write="XXX" admin="XXX" />
<authorizationEntry queue="ActiveMQ.Advisory.>" read="XXX" write="XXX" admin="XXX" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="XXX" write="XXX" admin="XXX" />
</authorizationEntries>
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins" />
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
<runtimeConfigurationPlugin checkPeriod="1000" />
</plugins>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
</beans>