我在我的应用程序中添加了一个步骤以通过 GridFS 持久化文件,并添加了一个名为“已处理”的元数据字段作为计划任务的标志,该任务检索新文件并将其发送以进行处理。由于 GridFS 的 Java 驱动程序没有允许更新元数据的方法,因此我使用 MongoCollection 将“fs.files”集合更新为“metadata.processing”为真。
我使用 GridFSBucket.find(eq("metadata.processed", false) 获取新文件进行处理,然后在处理完成后将 metadata.processed 更新为 true。如果我在应用程序运行时添加新文件,这将有效。但是,如果我有一个将“metadata.processed”设置为 false 的现有文件并启动应用程序,上面的 find 调用不会返回任何结果。同样,如果我有一个已经处理过的文件并且我设置了“metadata.processed”字段回到 false,上面的 find 调用也停止工作。
private static final String FILTER_STR = "'{'\"filename\" : \"{0}\"'}'";
private static final String UPDATE_STR =
"'{'\"$set\": '{'\"metadata.processed\": \"{0}\"'}}'";
@Autowired
private GridFSBucketFactory gridFSBucketFactory;
@Autowired
private MongoCollectionFactory mongoCollectionFactory;
public void storeFile(String filename, DateTime publishTime,
InputStream inputStream) {
if (fileExists(filename)) {
LOGGER.info("File named {} already exists.", filename);
} else {
uploadToGridFS(filename, publishTime, inputStream);
LOGGER.info("Stored file named {}.", filename);
}
}
public GridFSDownloadStream getFile(BsonValue id) {
return gridFSBucketFactory.getGridFSBucket().openDownloadStream(id);
}
public GridFSDownloadStream getFile(String filename) {
final GridFSFile file = getGridFSFile(filename);
return file == null ? null : getFile(file.getId());
}
public GridFSFindIterable getUnprocessedFiles() {
return gridFSBucketFactory.getGridFSBucket()
.find(eq("metadata.processed", false));
}
public void setProcessed(String filename, boolean isProcessed) {
final BasicDBObject filter =
BasicDBObject.parse(format(FILTER_STR, filename));
final BasicDBObject update =
BasicDBObject.parse(format(UPDATE_STR, isProcessed));
if (updateOne(filter, update)) {
LOGGER.info("Set metadata for {} to {}", filename, isProcessed);
}
}
private void uploadToGridFS(String filename, DateTime publishTime,
InputStream inputStream) {
gridFSBucketFactory.getGridFSBucket().uploadFromStream(filename,
inputStream, createMetadata(publishTime));
}
private GridFSUploadOptions createMetadata(DateTime publishTime) {
final Document metadata = new Document();
metadata.put("processed", false);
// metadata.put("publishTime", publishTime.toString());
return new GridFSUploadOptions().metadata(metadata);
}
private boolean fileExists(String filename) {
return getGridFSFile(filename) != null;
}
private GridFSFile getGridFSFile(String filename) {
return gridFSBucketFactory.getGridFSBucket()
.find(eq("filename", filename)).first();
}
private boolean updateOne(BasicDBObject filter, BasicDBObject update) {
try {
mongoCollectionFactory.getFsFilesCollection().updateOne(filter,
update, new UpdateOptions().upsert(true));
} catch (final MongoException e) {
LOGGER.error(
"The following failed to update, filter:{0} update:{1}",
filter, update, e);
return false;
}
return true;
}
知道我能做些什么来确保:
GridFSBucket.find(eq("metadata.processed", false)
返回现有文件和/或已更改元数据的文件的正确结果?