我正在尝试实现一个简单的 pub sub 示例,其中我有一个服务器,并且正在向客户端发布有关正常运行时间的定期通知。
这是作为 Windows 服务的一部分运行的 - 与 InnoSetup、launch4j 和 Apache procrun/prunsrv 捆绑在一起。
线程不会超出上下文的创建。可能出了什么问题?
import java.io.IOException;
import java.util.Date;
import org.msgpack.MessagePack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import org.ocpsoft.prettytime.*;
/**
* Notification service for updates to configuration in the logger
* @author Aalhad
*/
public class NotificationServer extends Thread {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private volatile boolean shouldRun;
private PrettyTime upTime;
private PreferenceManager prefMgr = PreferenceManager.getInstance();
public ZMQ.Context context;
public ZMQ.Socket pubSocket;
public NotificationServer() {
log.debug("Entered notification server constructor ......................");
context = ZMQ.context(1);
log.debug("THIS DOES NOT GET PRINTED ... it is as if we are blocking in ZMQ.context!!!");
pubSocket = context.socket(ZMQ.PUB);
pubSocket.bind("tcp://*:"+prefMgr.getNotificationPort());
pubSocket.bind("ipc://powerlogger");
log.debug("NotificationServer created");
}
@Override
public void run() {
log.debug("Entering run loop of Notification Server");
setStarting();
log.debug("Writing to tcp port: {}", prefMgr.getNotificationPort());
upTime = new PrettyTime();
ConfigMessage msg = prefMgr.getConfigMessage();
MessagePack msgPack = new MessagePack();
byte[] sendBytes;
try {
log.debug("Going ahead and sending: {}", msg);
sendBytes = msgPack.write(msg);
pubSocket.send(sendBytes);
log.debug("Finished sending msg");
} catch (IOException ex) {
log.error("Could not send first config notification",ex);
}
//On starts and restarts, we send the current configuration to our
//subscribers
String upSince;
while (shouldRun()) {
log.trace("In the notification loop");
upSince = upTime.format(new Date(0));
log.trace("============================================================== Started: {}", upSince );
ConfigMessage cfgMsg = new ConfigMessage();
cfgMsg.msgType = MessageType.UPSINCE;
cfgMsg.message = upSince;
try {
// ..... code here to write the time into a
// messagepack structure and publishing it
sleep(5000);
log.trace("After sleeping in notification loop");
} catch (InterruptedException ex) {
log.error("Notification thread disturbed when sleeping.");
}
}
}
public synchronized void shutDown() {
shouldRun = false;
log.trace("Set shouldRun to false in discovery server");
try {
if (pubSocket != null) {
pubSocket.close();
context.term();
}
}
catch(Exception e) {
log.error("Interesting situation when trying to close the discovery socket when shutting down",e);
}
}
public synchronized void setStarting() {
shouldRun = true;
}
private synchronized boolean shouldRun() {
return shouldRun;
}
}