1

我试图找出如何使用 JMSQueueAppender 但没有关于如何做同样事情的正确说明。经过反复试验,我能够使用 log4j 配置 JMSQueueAppender(使用我自己的 JMSQueueAppender 类)。

下面是 log4j.properties 文件中的配置。

log4j.rootLogger=INFO, stdout, jms

## Be sure that ActiveMQ messages are not logged to 'jms' appender
log4j.logger.org.apache.activemq=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %-5p %c - %m%n

## Configure 'jms' appender. You'll also need jndi.properties file in order to make it work
log4j.appender.jms=com.util.JMSQueueAppender
log4j.appender.jms.initialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
log4j.appender.jms.providerUrl=tcp://localhost:61616
log4j.appender.jms.queueBindingName=myQueue
log4j.appender.jms.queueConnectionFactoryBindingName=QueueConnectionFactory

jndi.properties 文件应如下所示:(请注意,它应该放在 classpath 中,在项目目录中,它应该放在 src 目录中)

queue.myQueue=myQueue

JMSQueueAppender 类如下所示:(在此代码中,它位于包结构 com.util 中)

public class JMSQueueAppender extends AppenderSkeleton {

protected QueueConnection queueConnection;
protected QueueSession queueSession;
protected QueueSender queueSender;
protected Queue queue;

String initialContextFactory;
String providerUrl;
String queueBindingName;
String queueConnectionFactoryBindingName;

public 
JMSQueueAppender() {
}


/**
 * The <b>InitialContextFactory</b> option takes a string value.
 * Its value, along with the <b>ProviderUrl</b> option will be used
 * to get the InitialContext.
 */
public void setInitialContextFactory(String initialContextFactory) {
this.initialContextFactory = initialContextFactory;
}

/**
 * Returns the value of the <b>InitialContextFactory</b> option.
 */
public String getInitialContextFactory() {
return initialContextFactory;
}

/**
 * The <b>ProviderUrl</b> option takes a string value.
 * Its value, along with the <b>InitialContextFactory</b> option will be used
 * to get the InitialContext.
 */
public void setProviderUrl(String providerUrl) {
this.providerUrl = providerUrl;
}

/**
 * Returns the value of the <b>ProviderUrl</b> option.
 */
public String getProviderUrl() {
return providerUrl;
}

/**
 * The <b>QueueConnectionFactoryBindingName</b> option takes a
 * string value. Its value will be used to lookup the appropriate
 * <code>QueueConnectionFactory</code> from the JNDI context.
 */
public void setQueueConnectionFactoryBindingName(String queueConnectionFactoryBindingName) {
this.queueConnectionFactoryBindingName = queueConnectionFactoryBindingName;
}

/**
 * Returns the value of the <b>QueueConnectionFactoryBindingName</b> option.
 */
public String getQueueConnectionFactoryBindingName() {
return queueConnectionFactoryBindingName;
}

/**
 * The <b>QueueBindingName</b> option takes a
 * string value. Its value will be used to lookup the appropriate
 * destination <code>Queue</code> from the JNDI context.
 */
public void setQueueBindingName(String queueBindingName) {
this.queueBindingName = queueBindingName;
}

/**
   Returns the value of the <b>QueueBindingName</b> option.
*/
public String getQueueBindingName() {
return queueBindingName;
}


/**
 * Overriding this method to activate the options for this class
 * i.e. Looking up the Connection factory ...
 */
public void activateOptions() {

QueueConnectionFactory queueConnectionFactory;

try {

    Context ctx = getInitialContext();      
    queueConnectionFactory = (QueueConnectionFactory) ctx.lookup(queueConnectionFactoryBindingName);
    queueConnection = queueConnectionFactory.createQueueConnection();

    queueSession = queueConnection.createQueueSession(false,
                              Session.AUTO_ACKNOWLEDGE);

    Queue queue = (Queue) ctx.lookup(queueBindingName);
    queueSender = queueSession.createSender(queue);

    queueConnection.start();

    ctx.close();      

} catch(Exception e) {
    errorHandler.error("Error while activating options for appender named ["+name+
               "].", e, ErrorCode.GENERIC_FAILURE);
}
}

protected InitialContext getInitialContext() throws NamingException {
try {
    Hashtable ht = new Hashtable();

    //Populate property hashtable with data to retrieve the context.
    ht.put(Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
    ht.put(Context.PROVIDER_URL, providerUrl);

    return (new InitialContext(ht));

} catch (NamingException ne) {
    LogLog.error("Could not get initial context with ["+initialContextFactory + "] and [" + providerUrl + "]."); 
    throw ne;
}
}


protected boolean checkEntryConditions() {

String fail = null;

if(this.queueConnection == null) {
    fail = "No QueueConnection";
} else if(this.queueSession == null) {
    fail = "No QueueSession";
} else if(this.queueSender == null) {
    fail = "No QueueSender";
} 

if(fail != null) {
    errorHandler.error(fail +" for JMSQueueAppender named ["+name+"].");      
    return false;
} else {
    return true;
}
}

/**
* Close this JMSQueueAppender. Closing releases all resources used by the
* appender. A closed appender cannot be re-opened. 
*/
public synchronized // avoid concurrent append and close operations
void close() {

if(this.closed) 
    return;

LogLog.debug("Closing appender ["+name+"].");
this.closed = true;    

try {
    if(queueSession != null) 
    queueSession.close();   
    if(queueConnection != null) 
    queueConnection.close();
} catch(Exception e) {
    LogLog.error("Error while closing JMSQueueAppender ["+name+"].", e);    
}   

// Help garbage collection
queueSender = null;
queueSession = null;
queueConnection = null;
}

/**
 * This method called by {@link AppenderSkeleton#doAppend} method to
 * do most of the real appending work.  The LoggingEvent will be
 * be wrapped in an ObjectMessage to be put on the JMS queue.
 */
public void append(LoggingEvent event) {

if(!checkEntryConditions()) {
    return;
}

try {

    ObjectMessage msg = queueSession.createObjectMessage();
    msg.setObject(event);
    queueSender.send(msg);

} catch(Exception e) {
    errorHandler.error("Could not send message in JMSQueueAppender ["+name+"].", e, 
               ErrorCode.GENERIC_FAILURE);
}
}

public boolean requiresLayout() {
return false;
}  
}

Ans 队列侦听器类如下所示:

public class MyLogQueueListener  implements MessageListener  {
public MyLogQueueListener() throws Exception {        
    // create a logTopic topic consumer        
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");        
    Connection conn = factory.createConnection();        
    Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        
    conn.start(); 

     Destination destination = sess.createQueue("myQueue");                
     // Create a MessageConsumer from the Session to the Topic or Queue                
     MessageConsumer consumer = sess.createConsumer(destination); 
    consumer.setMessageListener(this);        
    // log a message        
    Logger log = Logger.getLogger(MyLogQueueListener.class);        
    log.info("Test log");

    // clean up        
    Thread.sleep(1000000);        
    consumer.close();        
    sess.close();        
    conn.close();        
    System.exit(1);    
}         

public static void main(String[] args) throws Exception {        
    new MyLogQueueListener();    

}     

public void onMessage(Message message) {        
    try {            
        // receive log event in your consumer            
        LoggingEvent event = (LoggingEvent)((ActiveMQObjectMessage)message).getObject();            
        System.out.println("Queue: Received log [" + event.getLevel() + "]: "+ event.getMessage());        
    } catch (Exception e) {            
        e.printStackTrace();        
    }    
}
}

我花了 2 天时间来设置上述内容并使其正常工作。希望这对打算使用 JMSQueueAppender 的人有所帮助。(特别感谢在网上发布 JMSQueueAppender 代码的人)。

4

0 回答 0