0

安卓代码

 public class androidconn extends Activity {
    private rabbitmqclient mConsumer;
    private TextView mOutput;

    /** Called when the activity is first created. */
    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.conn);

        //The output TextView we'll use to display messages
        mOutput =  (TextView) findViewById(R.id.moutput);

        //Create the consumer
        mConsumer = new rabbitmqclient("10.0.2.2:5672",
                "logs",
                "fanout");

        //Connect to broker
        mConsumer.connectToRabbitMQ();

        //register for messages
        mConsumer.setOnReceiveMessageHandler(new OnReceiveMessageHandler(){

            public void onReceiveMessage(byte[] message) {
                String text = "";
                try {
                    text = new String(message, "UTF");
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }

                mOutput.append("\n"+text);
            }
        });

    }

    @Override
    protected void onResume() {
        super.onPause();
        mConsumer.connectToRabbitMQ();
    }

    @Override
    protected void onPause() {
        super.onPause();
        mConsumer.dispose();
    }
}

兔子MQ代码

   public abstract class rabbitmq {
      public String mServer;
      public String mExchange;

      protected com.rabbitmq.client.Channel mModel = null;
      protected Connection  mConnection;

      protected boolean Running ;

      protected  String MyExchangeType ;

      /**
       *
       * @param server The server address
       * @param exchange The named exchange
       * @param exchangeType The exchange type name
     * @return 
       */
      public  rabbitmq(String server, String exchange, String exchangeType)
      {
          mServer = server;
          mExchange = exchange;
          MyExchangeType = exchangeType;
      }

      public void Dispose() throws SQLException
      {
          Running = false;

            try {
                if (mConnection!=null)
                    mConnection.close();
                if (mModel != null)
                      mModel.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

      }

      /**          * Connect to the broker and create the exchange
       * @return success
       */
      public boolean connectToRabbitMQ()
      {
          if(mModel!= null && mModel.isOpen() )//already declared
              return true;
          try
          {
              ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setHost(mServer);
              connectionFactory.setPort(5672);

            mConnection = (Connection) connectionFactory.newConnection();
              mModel = ((com.rabbitmq.client.Connection) mConnection).createChannel();
              mModel.exchangeDeclare(mExchange, MyExchangeType, true);

              return true;
          }
          catch (Exception e)
          {
              e.printStackTrace();
              return false;
          }
  }
}

rabbitmq客户端代码

     public class rabbitmqclient extends  rabbitmq{

    public rabbitmqclient(  
 String server,String exchange,String exchangeType) {

        super(server,exchange,exchangeType);
  }

    //The Queue name for this consumer
    private String mQueue;
    private QueueingConsumer MySubscription;

    //last message to post back
    private byte[] mLastMessage;

    // An interface to be implemented by an object that is interested in messages(listener)
     public interface OnReceiveMessageHandler{
         public void onReceiveMessage(byte[] message);
    };

    //A reference to the listener, we can only have one at a time(for now)
     private OnReceiveMessageHandler mOnReceiveMessageHandler;

    /**
     *
     * Set the callback for received messages
     * @param handler The callback
     */     public void setOnReceiveMessageHandler(OnReceiveMessageHandler handler)
 {
        mOnReceiveMessageHandler = handler;
    };

    private Handler mMessageHandler = new Handler();
    private Handler mConsumeHandler = new Handler();

    // Create runnable for posting back to main thread
    final Runnable mReturnMessage = new Runnable() {
        public void run() {
            mOnReceiveMessageHandler.onReceiveMessage(mLastMessage);
        }
    };

    final Runnable mConsumeRunner = new Runnable() {
        public void run() {
            Consume();
        }
    };

    /**
     * Create Exchange and then start consuming. A binding needs to be added before any messages will be delivered
 */
    @Override
    public boolean connectToRabbitMQ()
    {
       if(super.connectToRabbitMQ())
       {

           try {
               mQueue =  mModel.queueDeclare().getQueue();
               MySubscription = new QueueingConsumer(mModel);
               mModel.basicConsume(mQueue, false, MySubscription);
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
             if (MyExchangeType == "fanout")
                   AddBinding("");//fanout has default binding

            Running = true;
            mConsumeHandler.post(mConsumeRunner);

           return true;
       }
       return false;
    }

    /**
     * Add a binding between this consumers Queue and the Exchange with routingKey
     * @param routingKey the binding key eg GOOG
     */
    public void AddBinding(String routingKey)
    {
        try {
            mModel.queueBind(mQueue, mExchange, routingKey);
    } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * Remove binding between this consumers Queue and the Exchange with routingKey
     * @param routingKey the binding key eg GOOG
     */
    public void RemoveBinding(String routingKey)
    {
        try {
            mModel.queueUnbind(mQueue, mExchange, routingKey);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private void Consume()
    {
        Thread thread = new Thread()
        {

             @Override
                public void run() {
                 while(Running){
                    QueueingConsumer.Delivery delivery;
                    try {                           
 delivery = MySubscription.nextDelivery();
                    mLastMessage = delivery.getBody();
                        mMessageHandler.post(mReturnMessage);
                        try {
                                     mModel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException ie) {
                        ie.printStackTrace();
                    }
                 }
             }
             };
        thread.start();

    }

    public void dispose(){
        Running = false;
    }
}

并且 logcat 是 07-24 22:57:45.412: D/SntpClient(59): request time failed: java.net.SocketException: Address family not supported by protocol 请告诉我错误

4

1 回答 1

0

确保rabbitmq 服务器没有阻塞连接并且提供的连接信息是正确的。您也可以将其添加到 android 清单文件中:

<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> 

捕获 SocketException 并确保您可以连接到目标 IP 地址/端口号。

于 2012-07-24T17:54:35.640 回答