我有一个实现 Runnable 的 java 进程,用于向 zeromq 发布者/推送者订阅/拉取/拉取数据,收集一段时间的数据,一旦达到一定数量或记录,将它们插入数据库。一切正常,但是当发布者/推送者停止工作时,处理和收集数据的过程立即终止。发布者重新启动后,一切正常,但不会插入内存中收集的记录,因为该过程会立即停止。这会导致发布者重新启动时丢失数据,这是我的问题。
我已经尝试过检查线程中断,用各种 try/catch 语句等围绕代码。对于我可以尝试实施的其他解决方案,我真的迷失了,有什么建议吗?
这是代码的相关部分:
String address = "tcp://" + meta.getExporterAddress() + ":" + meta.getExporterPort();
ZMQ.Context context = ZMQ.context(10);
ZMQ.Socket socket;
if (exporterMode.toUpperCase().equals("STREAMER")) {
socket = context.socket(ZMQ.PULL);
socket.connect(address);
this.log.info("connected ZMQJob as PULL: " + exporterMode);
} else {
// BROKER
socket = context.socket(ZMQ.SUB);
socket.connect(address);
socket.subscribe(ZMQ.SUBSCRIPTION_ALL);
this.log.info("connected ZMQJob as SUB: " + exporterMode);
}
this.log.info("Started ZMQJob for " + meta.getTargetTableName());
long startTime = System.currentTimeMillis();
while (!Thread.currentThread().isInterrupted()) {
jsonString = Snappy.uncompress(socket.recv(0));
BufferedReader bufReader = new BufferedReader(new StringReader(new String(jsonString)));
while ((line = bufReader.readLine()) != null) {
jsonLines.add(line);
}
if (jsonLines.size() >= threshold || System.currentTimeMillis() > startTime + timeout * 1000) {
//db related code used for inserting here
}
}
我想也许对 while 循环使用不同的条件可能是一个解决方案,但我不确定是什么条件。