0

我创建了一个应用程序,它读取 mongo 更改流以进行更新和插入,然后我们对更改的数据采取行动。下面是我的代码片段

private void listenChangeStream() {
        Runnable changeListener = new Runnable() {
            @Override
            public void run() {
                String fullDoc = null;
                String updateInfo = null;

                while (cursor.hasNext()) {
                    try {
                        ChangeStreamDocument<Document> next = cursor.next();
                        String id = next.getDocumentKey().getString("id").getValue();
                        LOGGER.debug("Change Stream recived:{}", next);
                        String operationType = next.getOperationType().getValue();
                        if ("insert".equals(operationType) || "replace".equals(operationType)) {
                               fullDoc = next.getFullDocument().toString();
                            if (fullDoc.contains("image_info")) {
                                kafkaProducer
                                        .pushOfflineProcessingData(new DataPackets(Id, OfflineProcessType.IMAGE));
                            }
                        } else if ("update".equals(operationType)) {
                               updateInfo = next.getUpdateDescription().toString();
                            if (updateInfo.contains("image_info"))
                                kafkaProducer
                                        .pushOfflineProcessingData(new DataPackets(Id, OfflineProcessType.IMAGE));
                        } 

                    } catch (Exception ex) {
                        LOGGER.info("Exception has come in cahnge listener::", ex);
                    }
                }

            }
        };
        executor = Executors.newFixedThreadPool(1);
        executor.execute(changeListener);

    }

private MongoCursor<ChangeStreamDocument<Document>> getCursor(MongoCollection<Document> supplierCollection, List<Bson> pipeline) {
        MongoCursor<ChangeStreamDocument<Document>> cursor;     
             cursor = supplierCollection.watch(pipeline).iterator();        
        return cursor;
    }

这工作正常。我面临的问题是,当我启动服务器时,更改流开始读取旧提交的更改。我不想要。我希望在部署之后只选择新的更新。

任何人都可以建议如何做到这一点?

4

1 回答 1

3

任何人都可以建议如何做到这一点?

在带有 MongoDB Java 驱动程序 v3.8 的 MongoDB v4.0 中,您可以为MongoClient.watch()startAtOperationTime指定参数。

更改流将仅提供在指定时间戳之后发生的更改。针对 MongoDB 服务器运行的任何命令都将返回一个操作时间,该操作时间可用作参数的值。默认值是在创建更改流之前从服务器获取的操作时间。

或者,您也可以缓存_id从更改流通知中看到的最后一次。这是一个resumeToken您可以传递给resumeAfter()方法以在resumeToken. 例如:

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();

另请参阅MongoDB 更改流

于 2018-07-12T05:02:32.407 回答