4

我有一个服务器(ROUTER 套接字),它绑定并允许单个客户端(DEALER 套接字)连接到它。然后服务器开始发送数据。

理想情况下,我想知道路由器何时达到其 hwm 设置并开始丢弃消息。我已在路由器上将 ZMQ_ROUTER_MANDATORY 设置为 1,但这也无济于事。即使我故意没有启动客户端,路由器也会继续报告消息已发送(isAlive = false,因此在另一端没有任何东西可以提取这些消息)。

我做错了什么还是 HWM 设置在 ROUTER 套接字上根本不可靠?

我在 Windows 7 64 位上使用 jeromq 版本 0.3.1 和 jdk 1.6.0_32

谢谢

public final class SenderSocket implements Runnable{

    private final int total;
    private final int sentHwm;
    private final String address;
    private final Socket sendSocket;
    private final ExecutorService executor;

    private final static String NAME        = SenderSocket.class.getSimpleName( );
    private final static Logger LOGGER      = LoggerFactory.getLogger( NAME );


    public SenderSocket( ZContext context, String address, int sentHwm, int total ){
        this.address        = address;
        this.total          = total;
        this.sentHwm        = sentHwm;
        this.sendSocket     = context.createSocket( ZMQ.ROUTER );
        this.executor       = Executors.newSingleThreadExecutor( );
    }


    public void init( ){

        sendSocket.setSndHWM( sentHwm );
        sendSocket.setRouterMandatory( true );
        sendSocket.bind( address );

        executor.execute( this );
        LOGGER.info("ROUTER configured with HWM {} bound to {}.", sentHwm, address );

    }



    @Override
    public void run( ){         

        for( int i =0; i <total; i++ ){      

            try{

                String item     = new StringBuilder(8).append(i).toString();
                boolean result  = sendSocket.send( item );

                LOGGER.info("SENT>> [{}] [{}]", result, item );

            }catch( ZMQException zmqEx ){

                int errorCode = zmqEx.getErrorCode();

                if( ZError.EHOSTUNREACH == errorCode ){
                    LOGGER.warn("Attempted to send message to but dealer is DOWN!");
                }

                if( ZMQ.Error.ETERM.getCode() == errorCode ){
                    LOGGER.error("Received error code [{}], terminating.");
                    stop();
                }

                LOGGER.error("ZMQException while sending message.", zmqEx);

            }catch( Exception ex ){
                LOGGER.error("Exception while sending message.", ex );
            }

        }

        stop();

    }


    public void stop( ){
        sendSocket.setLinger( 0 );
    }


}

//客户

    public class ReceiverSocket implements Runnable{

        private final int hwm;
        private final String address;
        private final Socket recvSocket;
        private final ExecutorService executor;

        private volatile boolean isAlive;

        private final static String NAME        = ReceiverSocket.class.getSimpleName( );
        private final static Logger LOGGER      = LoggerFactory.getLogger( NAME );


        public ReceiverSocket( ZContext context, String address, int hwm ){
            this.address        = address;
            this.hwm            = hwm;
            this.recvSocket     = context.createSocket( ZMQ.DEALER );
            this.executor       = Executors.newSingleThreadExecutor( );
        }


        public void init( ){

            this.isAlive = false;

            recvSocket.setRcvHWM( hwm );
            recvSocket.connect( address );
            executor.execute( this );

            LOGGER.info("DEALER configured with HWM {} connected to {}.", hwm, address );

        }



        @Override
        public void run( ){         

            Poller poller       = new Poller( 1 );
            poller.register( recvSocket, Poller.POLLIN );

            while(  isAlive ){      

                try{

                    int pollCount       = poller.poll( );

                    if( pollCount == NEGATIVE_ONE ){
                        LOGGER.warn("ERROR! Was the thread interrupted?", pollCount );
                        isAlive = false;
                        return;
                    }

                    if( poller.pollin( ZERO ) ){
                        String data = recvSocket.recvStr( );
                        LOGGER.info("RECVD >> {} {}", data, NEWLINE );
                    }

                }catch( Exception e ){
                    LOGGER.error("Exception while receving message.", e);
                }

            }

        }


        public void stop( ){
            recvSocket.setLinger( 0 );
            LOGGER.info("{} Stopped!", NAME );
        }
}

//主要的

public static void main( String[ ] args ) throws InterruptedException{

        int recvHwm          = 5;
        int sentHwm          = 5;
        int totalSent        = 5000;
        String address       = "tcp://*:20000";
        ZContext context     = new ZContext( 1 );

        ReceiverSocket recvr = new ReceiverSocket( context, address, recvHwm );
        SenderSocket sender  = new SenderSocket( context, address, sentHwm, totalSent );

        recvr.init();
        Thread.sleep( 1000 );

        sender.init();

    }
4

2 回答 2

3

路由器强制和高水位线无关。

我已在路由器上将 ZMQ_ROUTER_MANDATORY 设置为 1,但这也无济于事。即使我故意没有启动客户端,路由器仍继续报告消息已发送

即使没有对等点连接到路由器,路由器也不会引发异常,除非您为特定客户端 ID 处理消息。

//#1 no exception raised here, message dropped silently
rtr.setRouterMandatory(true)
rtr.bind("tcp://*:20000")
rtr.send("omg!")

//#2 exception raised here
rtr.setRouterMandatory(true)
rtr.bind("tcp://*:20000")
rtr.sendMore("client1")
rtr.sendMore("")
rtr.send("omg!")

代码示例 #2 引发异常,因为您告诉路由器将“omg”发送到具有 identity 的对等方client1。路由器套接字通过为每个连接的对等方分配一个随机标识来跟踪所有连接。如果路由器没有连接client1,或者,如果client1之前断开连接,路由器将在这两种情况下引发异常。

您可以在客户端分配一个身份来覆盖路由器的随机身份分配:

client.setIdentity("client1".getBytes())
client.connect("tcp://*:20000")

上面的代码阻止路由器套接字在示例 #2 中抛出异常

我建议阅读这个,它解释了消息寻址和封装;了解它的工作原理对于使用路由器套接字至关重要。

于 2013-11-22T19:17:58.860 回答
0

在 python 中,这使得路由器在这种情况下提高 zmq.Again:

    self.context = zmq.Context()
    self.socket = self.context.socket(zmq.ROUTER)

    # make sure router doesn't just drop unroutable message
    self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1)

    # make sure when there is unroutable message to raise exception after timeout
    timeout_sec = 2.5
    timeout_milisec = int(timeout_sec * 1000)
    self.socket.setsockopt(zmq.SNDTIMEO, timeout_milisec)

http://grokbase.com/t/zeromq/zeromq-dev/12aje3ya9t/zmq-router-mandatory-was-zmq-router-behavior

于 2016-05-17T07:32:24.927 回答