1

我有一个实现 Runnable 的 java 进程,用于向 zeromq 发布者/推送者订阅/拉取/拉取数据,收集一段时间的数据,一旦达到一定数量或记录,将它们插入数据库。一切正常,但是当发布者/推送者停止工作时,处理和收集数据的过程立即终止。发布者重新启动后,一切正常,但不会插入内存中收集的记录,因为该过程会立即停止。这会导致发布者重新启动时丢失数据,这是我的问题。

我已经尝试过检查线程中断,用各种 try/catch 语句等围绕代码。对于我可以尝试实施的其他解决方案,我真的迷失了,有什么建议吗?

这是代码的相关部分:

    String address = "tcp://" + meta.getExporterAddress() + ":" + meta.getExporterPort();
    ZMQ.Context context = ZMQ.context(10);
    ZMQ.Socket socket;

    if (exporterMode.toUpperCase().equals("STREAMER")) {
        socket = context.socket(ZMQ.PULL);
        socket.connect(address);
        this.log.info("connected ZMQJob as PULL: " + exporterMode);
    } else {
        // BROKER
        socket = context.socket(ZMQ.SUB);
        socket.connect(address);
        socket.subscribe(ZMQ.SUBSCRIPTION_ALL);
        this.log.info("connected ZMQJob as SUB: " + exporterMode);
    }

    this.log.info("Started ZMQJob for " + meta.getTargetTableName());

    long startTime = System.currentTimeMillis();
    while (!Thread.currentThread().isInterrupted()) {
        jsonString = Snappy.uncompress(socket.recv(0));
        BufferedReader bufReader = new BufferedReader(new StringReader(new String(jsonString)));

        while ((line = bufReader.readLine()) != null) {
            jsonLines.add(line);
        }

        if (jsonLines.size() >= threshold || System.currentTimeMillis() > startTime + timeout * 1000) {
            //db related code used for inserting here
        }
        }

我想也许对 while 循环使用不同的条件可能是一个解决方案,但我不确定是什么条件。

4

0 回答 0