0

我正在使用 Hawtio 浏览我的 ActiveMQ 队列。我还希望能够在将 JMS 消息重新发送到另一个队列之前对其进行编辑。

我看不到如何在 Hawtio 中编辑消息,但这很好,我想直接在代理中修改消息是不合法的。

相反,我虽然会复制邮件正文并发送修改正文的新邮件。现在,我面临的问题是我只能看到消息正文的前 255 个字符。如何在 hawtio 中查看整个 ActiveMQ 消息?不仅仅是前 255 个字符。

4

2 回答 2

2

Hawtio 使用 JMX 接口浏览队列。它调用browse()队列上的方法。它将消息返回为CompositedData[].

当 aActiveMQBytesMessage被转换(检查 class org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory)时,添加了两个字段BodyLengthBodyPreview。这些字段返回以下数据。

  • BodyLength- JMS 消息体的长度
  • BodyPreview- JMS 消息体的前 255 个字节(硬编码的长度,正如Claus Ibsen在他的回答中已经说过的那样;-))

签入类org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory的方法Map<String, Object> getFields(Object o)

Hawtio 使用该字段BodyPreview来显示消息,用于非文本消息。

签入 Hawtio 文件hawtio-web/src/main/webapp/app/activemq/js/browse.ts

function createBodyText(message) {
    if (message.Text) {
        ...
    } else if (message.BodyPreview) {
        ...
        if (code === 1 || code === 2) {
            // bytes and text
            var len = message.BodyPreview.length;
            var lenTxt = "" + textArr.length;
            body = "bytes:\n" + bytesData + "\n\ntext:\n" + textData;
            message.textMode = "bytes (" + len + " bytes) and text (" + lenTxt + " chars)";
        } else {
            // bytes only
            var len = message.BodyPreview.length;
            body = bytesData;
            message.textMode = "bytes (" + len + " bytes)";
        }
        ...
    } else {
        message.textMode = "unsupported";
        ...

如果你想改变它,你必须在ActiveMQHawtio中改变它。

一个冗长而冗长的例子来演示解释。

import static java.lang.System.out;
import java.lang.management.ManagementFactory;
import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeType;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;

public class BodyPreviewExample {

    public static void main(String[] args) throws Exception {
        String password = "password";
        String user = "user";
        String queueName = "TEST_QUEUE";
        String brokerUrl = "tcp://localhost:61616";

        BrokerService broker = BrokerFactory.createBroker("broker:"+brokerUrl);
        broker.start();
        broker.waitUntilStarted();

        Connection conn = new ActiveMQConnectionFactory(brokerUrl)
                .createConnection(user, password);
        conn.start();

        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue producerQueue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(producerQueue);

        // create a dummy message
        StringBuilder sb = new StringBuilder(1000);
        for (int i = 0; i < 100; i++) {
            sb.append(">CAFEBABE<");
        }

        // create and send a JMSBytesMessage
        BytesMessage bytesMsg = session.createBytesMessage();
        bytesMsg.writeBytes(sb.toString().getBytes());
        producer.send(bytesMsg);

        // create and send a JMSTextMessage
        TextMessage textMsg = session.createTextMessage();
        textMsg.setText(sb.toString());
        producer.send(textMsg);

        producer.close();

        out.printf("%nmessage info via session browser%n");
        String format = "%-20s = %s%n";
        Queue consumerQueue = session.createQueue(queueName);
        QueueBrowser browser = session.createBrowser(consumerQueue);
        for (Enumeration p = browser.getEnumeration(); p.hasMoreElements();) {
            out.println();
            Object next = p.nextElement();
            if (next instanceof ActiveMQBytesMessage) {
                ActiveMQBytesMessage amq = (ActiveMQBytesMessage) next;
                out.printf(format, "JMSMessageID", amq.getJMSMessageID());
                out.printf(format, "JMSDestination", amq.getJMSDestination());
                out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
                out.printf(format, "BodyLength", amq.getBodyLength());
            } else if (next instanceof ActiveMQTextMessage) {
                ActiveMQTextMessage amq = (ActiveMQTextMessage) next;
                out.printf(format, "JMSMessageID", amq.getJMSMessageID());
                out.printf(format, "JMSDestination", amq.getJMSDestination());
                out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
                out.printf(format, "text.length", amq.getText().length());
            } else {
                out.printf("unhandled message type: %s%n", next.getClass());
            }
        }
        session.close();
        conn.close();

        // access the queue via JMX
        out.printf("%nmessage info via JMX browse operation%n");
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        ObjectName name = new ObjectName("org.apache.activemq:type=Broker"
                + ",brokerName=localhost"
                + ",destinationType=Queue"
                + ",destinationName=" + queueName);
        QueueViewMBean queue
                = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
                        name, QueueViewMBean.class, true);
        CompositeData[] browse = queue.browse();
        for (CompositeData compositeData : browse) {
            out.println();
            CompositeType compositeType = compositeData.getCompositeType();
            out.printf(format, "CompositeType", compositeType.getTypeName());
            out.printf(format,"JMSMessageID",compositeData.get("JMSMessageID"));
            if (compositeData.containsKey("BodyLength")) {
                // body length of the ActiveMQBytesMessage
                Long bodyLength = (Long) compositeData.get("BodyLength");
                out.printf(format, "BodyLength", bodyLength);
                // the content displayed by hawtio
                Byte[] bodyPreview = (Byte[]) compositeData.get("BodyPreview");
                out.printf(format, "size of BodyPreview", bodyPreview.length);
            } else if (compositeData.containsKey("Text")) {
                String text = (String) compositeData.get("Text");
                out.printf(format, "Text.length()", text.length());
            }
        }
        // uncomment if you want to check with e.g. JConsole
        // TimeUnit.MINUTES.sleep(5);
        broker.stop();
    }
}

示例输出

message info via session browser

JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:1
JMSDestination       = queue://TEST_QUEUE
JMSXMimeType         = jms/bytes-message
BodyLength           = 1000

JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:2
JMSDestination       = queue://TEST_QUEUE
JMSXMimeType         = jms/text-message
text.length          = 1000

message info via JMX browse operation

CompositeType        = org.apache.activemq.command.ActiveMQBytesMessage
JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:1
BodyLength           = 1000
size of BodyPreview  = 255

CompositeType        = org.apache.activemq.command.ActiveMQTextMessage
JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:2
Text.length()        = 1000
于 2016-07-08T11:27:49.337 回答
1

我认为当您使用 hawtio 使用的 JMX API 查询和浏览队列时,ActiveMQ 存在硬编码限制。但不记得它是否只有 255 个字节或更高。

查看hawtio设置,可能有一个ActiveMQ插件设置可以更改255个字符,也不记得了;)

于 2015-09-14T15:02:28.143 回答