1

我尝试使用 zeromq 在 android 中实现一个简单的发布者和订阅者。当我尝试调试它时,它会在订阅者 recv 中循环。我不知道我哪里出错了。我认为它无法从发布者那里获得任何数据。

下面是代码:订阅者

package com.example.jeromqps;
        import java.util.*;

import org.jeromq.ZMQ;

import android.os.AsyncTask;
import android.os.Message;
import android.os.Messenger;
import android.os.RemoteException;
public class client implements Runnable {

    Messenger messenger;
    public client()
    {
        System.out.println("client started");
    }
    @Override
    public void run()
    { 
        ZMQ.Context context=ZMQ.context(1);     
        System.out.println("collecting data from server");
        ZMQ.Socket subscriber=context.socket(ZMQ.SUB);
        subscriber.connect("tcp://localhost:4444");
        String code="10001";
        subscriber.subscribe(code.getBytes());
        int totalvalue=0;
        //store the data in a data structure
        for(int i=0;i<10;i++)
        { 
           byte[] msg = subscriber.recv(0);
            String string=new String(subscriber.recv(0));
            StringTokenizer sscanf=new StringTokenizer(string," ");
            int value=Integer.valueOf(sscanf.nextToken());
            String string= new String(msg);
           System.out.println();
            totalvalue+=value;
        }
        int avg=totalvalue;
        Message msg1=Message.obtain();
        msg1.obj=string;
        try {
             messenger.send(msg1);
             System.out.println("sent to main");
             } catch (RemoteException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            }

        subscriber.close();
        context.term();

    }
}

发布者代码如下

    package com.example.jeromqps;
import java.util.*;
import org.jeromq.*;
public class server implements Runnable{

    public server()
    {
        System.out.println("server started");
    }
    @Override
    public void run()
    {
        ZMQ.Context context=ZMQ.context(1);
        ZMQ.Socket publisher=context.socket(ZMQ.PUB);
        publisher.bind("tcp://*:4444");

        Random srandom=new Random(System.currentTimeMillis());
         System.out.println("in server");
        while(!Thread.currentThread().isInterrupted())
        { //System.out.println("in while")
            int zipcode =1000 +srandom.nextInt(10000);
            int temperature = srandom.nextInt(215) - 80 + 1;
                String update = String.format("%05d %d", zipcode, temperature);
            String update="publisher";
            publisher.send(update.getBytes(),0);
        }
        publisher.close();
        context.term();
    }

}

主要内容如下:

      package com.example.jeromqps;

import android.os.Bundle;
import android.app.Activity;
import android.view.Menu;
import android.app.Activity;
import android.app.AlertDialog;
import android.os.Bundle;
import android.os.Message;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;
import android.os.Handler;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;






public class MainActivity extends Activity implements Handler.Callback {
    private TextView textView;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        new Thread(new server()).start();
        new Thread(new client()).start();
    }


    @Override
    public boolean handleMessage(Message arg0)
    {
        String str = new String((byte[]) arg0.obj);
        System.out.println("****");
        System.out.println(str);
        //new AlertDialog.Builder(this).setMessage(str).show();
        System.out.println("****");
        textView.append(str+ "\n");
        return false;
    }

}

在 byte[] 的程序循环中 msg =subscriber.recv(0); 在订阅者类中。我哪里错了?我错过了什么吗?

4

2 回答 2

0

首先,您的代码中有一些错误:

在发布者中,update定义了两次

  String update = String.format("%05d %d", zipcode, temperature);
  String update= "publisher";

您在订阅者代码中有类似的问题,string定义了两次......

String string = new String(subscriber.recv(0));
String string = new String(msg);

在订阅者中,您在同一迭代中收到两次消息..

    byte[] msg = subscriber.recv(0);
    String string = new String(subscriber.recv(0));

...您只需要在循环中接收...

String string = new String(subscriber.recv(0));

尝试解决这些问题,看看你能走多远......

于 2013-06-20T00:57:46.567 回答
0

这不是此处发布的问题的解决方案,但阅读此问题时我注意到它0被指定为方法中的第二个参数,该send(...)参数随后与方法中设置的参数匹配recv(...)

我设置了一个简单的发布/订阅系统,但无法弄清楚为什么没有发送消息。我正在使用recv(0)但在send(...)方法中指定了一些随机标志。更改值以0解决我的问题。

我想我会在这里发布这个,因为它是通过阅读我碰巧有这个想法的问题中的代码。所以也许这会在未来帮助别人。

于 2017-07-29T23:07:02.810 回答