我正在将 Spring Batch 与 Mongo 数据库一起使用。
我需要根据状态(status= PENDING)获取文档,写入 Kafka 队列并使用新状态(status= FILLED)更新文档字段。
所以我使用MongoItemReader
, CompositeItemWriter
( KafkaItemWriter
, MongoItemWriter
) 写入 Kafka 队列并更新它。但是当我运行作业时,我可以看到一些文档被跳过,并且跳过的文档数量等于块大小。
例如,我的集合有 15 个文档,块大小等于 5,MongoItemReader
读取第 1、2、3、4、5 行,然后读取第 11、12、13、14、15 行(跳过第 6、7、8、9 行,10)。
我ItemProcessor
对 POJO 实体进行修改。由于 MongoItemReader 在每次读取之间进行了刷新,因此更新了实体。但似乎游标分页也增加了(从日志中可以看出:行 ID 6、7、8、9 和 10 已被跳过)。我已经看到它通过使用“ SqlPagingQueryProviderFactoryBean ”在关系数据库中进行了工作,但它不适合 NoSQL DB。我已尽一切努力寻找解决方案,但没有任何帮助。
那么,我该如何处理这个问题呢?