2

我们在 ActiveMQ 中有一个问题,我们有大量的消息没有丢弃主题。主题设置为非持久性、非持久性。我们的 Activemq.xml 文件是

<beans>

  <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false" persistent="false">

<!--
    <persistenceAdapter>
      <journaledJDBC journalLogFiles="5" dataDirectory="../data"/>
    </persistenceAdapter>
-->

        <transportConnectors>
            <transportConnector uri="vm://localhost"/>
        </transportConnectors>

  </broker>

</beans>

我们在messaging-config.xml 中的主题定义是

<destination id="traceChannel">

    <properties>

        <network>
        <session-timeout>10</session-timeout>
    </network>

        <server>
            <message-time-to-live>10000</message-time-to-live>
            <durable>false</durable>
            <durable-store-manager>flex.messaging.durability.FileStoreManager</durable-store-manager>
        </server>

        <jms>
            <destination-type>Topic</destination-type>
            <message-type>javax.jms.ObjectMessage</message-type>
            <connection-factory>ConnectionFactory</connection-factory>
            <destination-jndi-name>dynamicTopics/traceTopic</destination-jndi-name>
            <delivery-mode>NON_PERSISTENT</delivery-mode>
            <message-priority>DEFAULT_PRIORITY</message-priority>
            <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
            <transacted-sessions>false</transacted-sessions>
            <initial-context-environment>
                <property>
                    <name>Context.INITIAL_CONTEXT_FACTORY</name>
                    <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
                </property>
                <property>
                    <name>Context.PROVIDER_URL</name>
                    <value>tcp://localhost:61616</value>
                </property>
            </initial-context-environment>
        </jms>
    </properties>

    <channels>
        <channel ref="rtmps" />
    </channels>

    <adapter ref="trace" />

</destination>

我想要实现的是,任何时候只有最后 10 条消息是关于主题的,因为让它在一夜之间运行会导致超过 150K 条关于该主题的消息,即使它应该只包含非常小的数量。

4

4 回答 4

5

据我所知,应该丢弃发送到没有订阅者的非持久主题的消息。只有当前注册的消费者才能获得消息副本。

您如何检查该主题是否包含这 15 万条消息?通过 JMX?

无论您的非持久主题不应缓存这 150K 消息这一事实,您都可以使用代理策略限制每个消费者存储的消息量:

<broker>
...    
  <pendingMessageLimitStrategy>
    <constantPendingMessageLimitStrategy limit="10"/>
  </pendingMessageLimitStrategy>
...
</broker>
于 2010-03-01T18:55:43.370 回答
2

我不确定您想要的内容是否可以存档。你可以做几件事:

首先,您可以花时间了解您的信息:

public ITopicPublisher CreateTopicPublisher(string selector)
        {
            try
            {
                IMessageProducer producer = m_session.CreateProducer(m_topic);
                ActiveMQPublisher publisher = new ActiveMQPublisher(producer);

                // here we put a time to live to 1min for eg
                TimeSpan messageTTL = TimeSpan.FromMilliseconds(60000);
                publisher.TimeToLive = messageTTL;

                if (!String.IsNullOrEmpty(selector))
                {
                    publisher.IsSelector = true;
                    publisher.Selector = selector;
                }
                return publisher;
            }
            catch (Exception ex)
            {
                Logger.Exception(ex);
                throw;
            }
        }

早上你仍然会收到 150k 条消息,但是一旦有一个消费者连接到你的主题,过期的消息就会消失。

您可能还想看看驱逐策略(Eviction Strategy

编辑:我刚刚意识到一件事。你说“让它在一夜之间运行会导致超过 15 万条关于该主题的消息,即使它应该只包含非常小的数量。” 在主题中,您没有必须接收消息的概念。如果没有人订阅某个主题,则删除那里发送的消息。仍然“排队的消息”将增加一。如果您只想发送“最后 10 条消息”,也许您应该使用队列而不是主题?

于 2010-02-18T09:23:47.173 回答
1

不知道它是否有效,还没有机会测试它,但请看一下这里http://activemq.apache.org/slow-consumer-handling.html中的 constantPendingMessageLimitStrategy

祝克劳迪奥好运

于 2010-02-26T18:22:25.103 回答
-1

这篇文章中的一些事情不太清楚

  • 根据我的解析和尝试添加它的努力..它需要更多的上下文,所以我在下面为那些可能不想经历 ActiveMQ 谷歌斗争的人添加了它..
  • 这就是我的activemq.xml 中的内容,我根本没有看到这项工作,所以我希望我能从其他人那里获得一些见解,他们可能能够为我指明正确的方向。

    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic=">"
                       topicPrefetch="10"/>
          <policyEntry topic=">"
                       producerFlowControl="false"/>
          <policyEntry topic=".>"
                       >
            <messageEvictionStrategy>
              <oldestMessageEvictionStrategy/>
            </messageEvictionStrategy>
    
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="10"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>
    
于 2014-07-10T16:41:12.020 回答