我有一个服务器(ROUTER 套接字),它绑定并允许单个客户端(DEALER 套接字)连接到它。然后服务器开始发送数据。
理想情况下,我想知道路由器何时达到其 hwm 设置并开始丢弃消息。我已在路由器上将 ZMQ_ROUTER_MANDATORY 设置为 1,但这也无济于事。即使我故意没有启动客户端,路由器也会继续报告消息已发送(isAlive = false,因此在另一端没有任何东西可以提取这些消息)。
我做错了什么还是 HWM 设置在 ROUTER 套接字上根本不可靠?
我在 Windows 7 64 位上使用 jeromq 版本 0.3.1 和 jdk 1.6.0_32
谢谢
public final class SenderSocket implements Runnable{
private final int total;
private final int sentHwm;
private final String address;
private final Socket sendSocket;
private final ExecutorService executor;
private final static String NAME = SenderSocket.class.getSimpleName( );
private final static Logger LOGGER = LoggerFactory.getLogger( NAME );
public SenderSocket( ZContext context, String address, int sentHwm, int total ){
this.address = address;
this.total = total;
this.sentHwm = sentHwm;
this.sendSocket = context.createSocket( ZMQ.ROUTER );
this.executor = Executors.newSingleThreadExecutor( );
}
public void init( ){
sendSocket.setSndHWM( sentHwm );
sendSocket.setRouterMandatory( true );
sendSocket.bind( address );
executor.execute( this );
LOGGER.info("ROUTER configured with HWM {} bound to {}.", sentHwm, address );
}
@Override
public void run( ){
for( int i =0; i <total; i++ ){
try{
String item = new StringBuilder(8).append(i).toString();
boolean result = sendSocket.send( item );
LOGGER.info("SENT>> [{}] [{}]", result, item );
}catch( ZMQException zmqEx ){
int errorCode = zmqEx.getErrorCode();
if( ZError.EHOSTUNREACH == errorCode ){
LOGGER.warn("Attempted to send message to but dealer is DOWN!");
}
if( ZMQ.Error.ETERM.getCode() == errorCode ){
LOGGER.error("Received error code [{}], terminating.");
stop();
}
LOGGER.error("ZMQException while sending message.", zmqEx);
}catch( Exception ex ){
LOGGER.error("Exception while sending message.", ex );
}
}
stop();
}
public void stop( ){
sendSocket.setLinger( 0 );
}
}
//客户
public class ReceiverSocket implements Runnable{
private final int hwm;
private final String address;
private final Socket recvSocket;
private final ExecutorService executor;
private volatile boolean isAlive;
private final static String NAME = ReceiverSocket.class.getSimpleName( );
private final static Logger LOGGER = LoggerFactory.getLogger( NAME );
public ReceiverSocket( ZContext context, String address, int hwm ){
this.address = address;
this.hwm = hwm;
this.recvSocket = context.createSocket( ZMQ.DEALER );
this.executor = Executors.newSingleThreadExecutor( );
}
public void init( ){
this.isAlive = false;
recvSocket.setRcvHWM( hwm );
recvSocket.connect( address );
executor.execute( this );
LOGGER.info("DEALER configured with HWM {} connected to {}.", hwm, address );
}
@Override
public void run( ){
Poller poller = new Poller( 1 );
poller.register( recvSocket, Poller.POLLIN );
while( isAlive ){
try{
int pollCount = poller.poll( );
if( pollCount == NEGATIVE_ONE ){
LOGGER.warn("ERROR! Was the thread interrupted?", pollCount );
isAlive = false;
return;
}
if( poller.pollin( ZERO ) ){
String data = recvSocket.recvStr( );
LOGGER.info("RECVD >> {} {}", data, NEWLINE );
}
}catch( Exception e ){
LOGGER.error("Exception while receving message.", e);
}
}
}
public void stop( ){
recvSocket.setLinger( 0 );
LOGGER.info("{} Stopped!", NAME );
}
}
//主要的
public static void main( String[ ] args ) throws InterruptedException{
int recvHwm = 5;
int sentHwm = 5;
int totalSent = 5000;
String address = "tcp://*:20000";
ZContext context = new ZContext( 1 );
ReceiverSocket recvr = new ReceiverSocket( context, address, recvHwm );
SenderSocket sender = new SenderSocket( context, address, sentHwm, totalSent );
recvr.init();
Thread.sleep( 1000 );
sender.init();
}