我正在尝试使用 BulkWriteOperation 更新数百万个数据,但是当我的查询条件不满足但具有该 ID 的文档可用时,我的代码会给出异常。这是我的代码:-
if(provisionSubscriberList.size()>0){
Map<String, Object> map = new HashMap<String, Object>();
map.put("id", campaignTO.getId());
map.put("testSample", false);
map.put("status", "Active");
map.put("controlGroup", false);
try{
WriteConcern wc = WriteConcern.ACKNOWLEDGED;
BulkWriteOperation bulk = mongoTemplate.getCollection("provisionSubscriber").initializeOrderedBulkOperation();
for (ProvisionSubscriberEntity provisionalSubscriber : provisionSubscriberList) {
Query queryForAddSubscriber = new Query();
Update updateFieldsForAddSubscriber = new Update();
updateFieldsForAddSubscriber.set("msisdn", provisionalSubscriber.getMsisdn());
updateFieldsForAddSubscriber.set("deviceType", provisionalSubscriber.getDeviceType());
updateFieldsForAddSubscriber.addToSet("campaignIdList", map);
List<DBObject> criteria = new ArrayList<DBObject>();
criteria.add(new BasicDBObject("_id",new ObjectId(provisionalSubscriber.getId())));
criteria.add(new BasicDBObject("campaignIdList.id", new BasicDBObject("$ne", campaignTO.getId())));
criteria.add(new BasicDBObject("campaignIdList.controlGroup", new BasicDBObject("$ne", true)));
criteria.add(new BasicDBObject("campaignIdList.status", new BasicDBObject("$ne", "Active")));
BasicDBObject queryCriteria = new BasicDBObject("$and", criteria);
bulk.find(queryCriteria).upsert().updateOne(updateFieldsForAddSubscriber.getUpdateObject());
}
BulkWriteResult results =bulk.execute(wc);
System.out.println(results);
for (BulkWriteUpsert up : results.getUpserts()) {
System.out.println(up.getId());
}
这是我得到的例外: -
com.mongodb.BulkWriteException: Bulk write operation error on server 192.168.1.113:27017. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error index: jmailer_digiengage.provisionSubscriber.$_id_ dup key: { : ObjectId('58c8f33301de9614143f5812') }', details={ }}].
at com.mongodb.BulkWriteHelper.translateBulkWriteException(BulkWriteHelper.java:56)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2310)
at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:136)
at com.lumatadigital.digiengage.daoImpl.ProvisioningDaoImpl.provisionOnCampaign(ProvisioningDaoImpl.java:120)
at com.lumatadigital.digiengage.schedular.service.SchedularJobConfig.provisioningJob(SchedularJobConfig.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:269)
at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:257)
at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:75)
at org.quartz.core.JobRunShell.run(JobRunShell.java:213)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:557)
编辑:基本上,如果文档不可用,我想插入数据,或者如果文档可用并且我的查询对该文档满意,我想更新数据,否则跳过该文档。另外,我想跟踪插入的文档。