5

我想Pub/Sub在项目中实现域。基本上我不是 java 开发人员,使用谷歌帮助。我读了这个链接。我开始实现以下结构。 在此处输入图像描述

我写了 Java 应用程序名称,MessageConsumer.java用于从 AMQ 代理接收消息并放置在 Webserver(Apache Tomcat)中。

消息消费者代码:

 package PackageName;
 import java.io.IOException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.jms.*;
 import org.apache.activemq.ActiveMQConnectionFactory;
 public class Consumer extends HttpServlet {
 @Override
 protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
    throws ServletException, IOException {
try {
//creating connectionfactory object for way
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
//establishing the connection b/w this Application and Activemq
Connection connection=connectionFactory.createConnection();
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic queue=session.createTopic("MessageTesting");
javax.jms.MessageConsumer consumer=session.createConsumer(queue);
//fetching queues from Activemq
MessageListener listener = new MyListener();
consumer.setMessageListener(listener);
connection.start();
}
catch (Exception e) {
// TODO: handle exception
}
}

}

另外,我编写了另一个用于处理消息的 Java 应用程序(MyListener.java)。

MyListener.java 代码:

package PackageName;
import java.io.*;
import java.net.*;
import javax.jms.*;
public class MyListener implements MessageListener {
public void onMessage(Message msg) {
    TextMessage msg1=(TextMessage)msg;
    //just for your understanding I mention dummy code
    //System.out.println(msg1.getText());
    MyListener ml=new MyListener();
    try {

      ml.executeHttp("http://localhost:8080/ExecutableFileProcess/ClassName");
        System.out.println(msg1.getText());
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}}

两个 Java 应用程序都在网络服务器(Apache Tomcat)中。到目前为止,我们按照以下方式进行操作。

  1. 在向 Topic 发送消息之前,我们通过浏览器上的 HTTP 触发 MessageConsumer.java。

知道,我们正在尝试什么。最初我们不想触发MessageConsumer.java

意思是,假设MessageConsumer.java在 Webserver 中。最初如果 AMQ 从任何地方获取消息,我们的 MessageConsumer.java 应该处理自己的逻辑。

我希望,你们明白我们正在尝试什么。

我从来没有工作过Apache Camel,你能解释清楚吗?

谢谢。

4

2 回答 2

10

为什么要MessageConsumer手动触发 .java 作为调用Subscriber是 ActiveMQ 在您的情况下的责任。

从您的主题将您的消息发布到 ActiveMQ 服务器,所有订阅该主题的订阅者都将收到您的消息,而无需手动触发它。

将此称为您的初始 POC http://activemq.apache.org/hello-world.html

您可以使用下面的 java 代码订阅client2client3的主题

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the topic from which we will receive messages from = " testt"

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("testt");

        MessageConsumer consumer = session.createConsumer(topic);

        MessageListener listner = new MessageListener() {
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message"
                                + textMessage.getText() + "'");
                    }
                } catch (JMSException e) {
                    System.out.println("Caught:" + e);
                    e.printStackTrace();
                }
            }
        };
        consumer.setMessageListener(listner);

        try {
              System.in.read();
         } catch (IOException e) {
             e.printStackTrace();
         }
    connection.close();

}
}    
于 2013-09-07T10:44:54.037 回答
1

你检查过 Apache Camel 吗? http://camel.apache.org/

您可以使用骆驼定义路由以从 java 代码发布和订阅代理上的主题(例如与 spring bean 集成)。有很多示例,包括与 activemq 消息代理的交互。

于 2013-09-07T10:34:19.523 回答