1

我在 Eclipse 中使用 jeromq-0.3.2.jar 库运行这个官方 Java 示例,如果我“运行”它,它就不起作用,它只有在我设置一些断点并“调试”它时才起作用。

消息似乎丢失了。我自己使用 route-req 模式的应用程序也有这个问题。

这是他们的官方示例,如果这不起作用,那还能做什么?有人可以尝试一下并找出原因吗?

http://zguide.zeromq.org/java:rtreq

或代码在这里:

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

import java.util.Random;

/**
* ROUTER-TO-REQ example
*/
public class rtreq
{
    private static Random rand = new Random();
    private static final int NBR_WORKERS = 10;

    private static class Worker extends Thread {

        @Override
        public void run() {

            Context context = ZMQ.context(1);
            Socket worker = context.socket(ZMQ.REQ);
            ZHelper.setId (worker);  //  Set a printable identity

            worker.connect("tcp://localhost:5671");

            int total = 0;
            while (true) {
                //  Tell the broker we're ready for work
                worker.send ("Hi Boss");

                //  Get workload from broker, until finished
                String workload = worker.recvStr ();
                boolean finished = workload.equals ("Fired!");
                if (finished) {
                    System.out.printf ("Completed: %d tasks\n", total);
                    break;
                }
                total++;

                //  Do some random work
                try {
                    Thread.sleep (rand.nextInt (500) + 1);
                } catch (InterruptedException e) {
                }
            }
            worker.close();
            context.term();
        }
    }

    /**
     * While this example runs in a single process, that is just to make
     * it easier to start and stop the example. Each thread has its own
     * context and conceptually acts as a separate process.
     */
    public static void main (String[] args) throws Exception {
        Context context = ZMQ.context(1);
        Socket broker = context.socket(ZMQ.ROUTER);
        broker.bind("tcp://*:5671");

        for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
        {
            Thread worker = new Worker ();
            worker.start ();
        }

        //  Run for five seconds and then tell workers to end
        long endTime = System.currentTimeMillis () + 5000;
        int workersFired = 0;
        while (true) {
            //  Next message gives us least recently used worker
            String identity = broker.recvStr ();
            broker.sendMore (identity);
            broker.recvStr ();     //  Envelope delimiter
            broker.recvStr ();     //  Response from worker
            broker.sendMore ("");

            //  Encourage workers until it's time to fire them
            if (System.currentTimeMillis () < endTime)
                broker.send ("Work harder");
            else {
                broker.send ("Fired!");
                if (++workersFired == NBR_WORKERS)
                    break;
            }
        }

        broker.close();
        context.term();
    }
}
4

1 回答 1

0

为了使这个例子有效,你需要确保每个工人都有唯一的 id。

例如,我删除了

ZHelper.setId (worker);  //  Set a printable identity

并使用 workerId 字符串向 Worker 类添加构造函数。

结果:

import org.zeromq.ZMQ;

import java.util.Random;

/**
 * ROUTER-TO-REQ example
 */
public class rtreq {
    private static Random rand = new Random();
    private static final int NBR_WORKERS = 10;

    private static class Worker extends Thread {

        private String workerId;

        Worker(String workerId) {
            this.workerId = workerId;
        }

        @Override
        public void run() {
            ZMQ.Context context = ZMQ.context(1);
            ZMQ.Socket worker = context.socket(ZMQ.REQ);
            worker.setIdentity(workerId.getBytes());

            worker.connect("tcp://localhost:5671");

            int total = 0;
            while (true) {
                //  Tell the broker we're ready for work
                worker.send("Hi Boss");

                //  Get workload from broker, until finished
                String workload = worker.recvStr();
                boolean finished = workload.equals("Fired!");
                if (finished) {
                    System.out.printf(workerId + " completed: %d tasks\n", total);
                    break;
                }
                total++;

                //  Do some random work
                try {
                    Thread.sleep(rand.nextInt(500) + 1);
                } catch (InterruptedException e) {
                }
            }

            worker.close();
            context.term();
        }
    }

    /**
     * While this example runs in a single process, that is just to make
     * it easier to start and stop the example. Each thread has its own
     * context and conceptually acts as a separate process.
     */
    public static void main(String[] args) throws Exception {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket broker = context.socket(ZMQ.ROUTER);
        broker.bind("tcp://*:5671");

        // starting all workers
        for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++) {
            Thread worker = new Worker("worker-" + workerNbr);
            worker.start();
        }

        //  Run for five seconds and then tell workers to end
        long endTime = System.currentTimeMillis() + 5000;
        int workersFired = 0;
        while (true) {
            //  Next message gives us least recently used worker
            String identity = broker.recvStr();
            broker.sendMore(identity);
            broker.recvStr();     //  Envelope delimiter
            broker.recvStr();     //  Response from worker
            broker.sendMore("");

            //  Encourage workers until it's time to fire them
            if (System.currentTimeMillis() < endTime)
                broker.send("Work harder");
            else {
                broker.send("Fired!");
                if (++workersFired == NBR_WORKERS)
                    break;
            }
        }

        broker.close();
        context.term();
    }
}

系统输出:

worker-2 completed: 27 tasks
worker-3 completed: 20 tasks
worker-8 completed: 24 tasks
worker-6 completed: 23 tasks
worker-0 completed: 20 tasks
worker-9 completed: 21 tasks
worker-7 completed: 20 tasks
worker-5 completed: 21 tasks
worker-4 completed: 19 tasks
worker-1 completed: 25 tasks
于 2017-03-04T08:55:52.330 回答