我正在编写一个 Spring Boot 应用程序,它启动、收集数百万个数据库条目并将其转换为新的简化 JSON 格式,然后将它们全部发送到 GCP PubSub 主题。我正在尝试为此使用 Spring Batch,但是在为我的流程实现容错时遇到了麻烦。数据库中充斥着数据质量问题,有时我的 JSON 转换会失败。发生故障时,我不希望工作立即退出,我希望它继续处理尽可能多的记录,并在完成之前报告哪些确切记录失败,以便我和/或我的团队可以检查这些记录有问题的数据库条目。
为此,我尝试使用 Spring Batch 的 SkipListener 接口。但我也在我的进程中使用了 AsyncItemProcessor 和 AsyncItemWriter,即使在处理过程中发生异常,SkipListener 的onSkipInWrite()
方法也会捕获它们 - 而不是onSkipInProcess()
方法。不幸的是,该onSkipInWrite()
方法无法访问原始数据库实体,因此我无法将其 ID 存储在有问题的数据库条目列表中。
我是否配置错误?是否有任何其他方法可以从未能通过 AsynItemProcessor 处理步骤的读取器访问对象?
这是我尝试过的...
我有一个单例 Spring 组件,我在其中存储了已成功处理的数据库条目数以及多达 20 个有问题的数据库条目。
@Component
@Getter //lombok
public class ProcessStatus {
private int processed;
private int failureCount;
private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();
public void incrementProgress { processed++; }
public void logUnexpectedFailure(UnexpectedFailure failure) {
failureCount++;
unexpectedFailure.add(failure);
}
@Getter
@AllArgsConstructor
public static class UnexpectedFailure {
private Throwable error;
private DBProjection dbData;
}
}
我有一个 Spring 批处理 Skip Listener,它应该捕获故障并相应地更新我的状态组件:
@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
private ProcessStatus processStatus;
@Override
public void onSkipInRead(Throwable error) {}
@Override
public void onSkipInProcess(DBProjection dbData, Throwable error) {
processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
}
@Override
public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
//This is getting called instead!! Even though the exception happened during processing :(
//But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
}
}
然后我像这样配置了我的工作:
@Configuration
public class ConversionBatchJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private TaskExecutor processThreadPool;
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize,
@Value("#{jobParameters[limit]}") Integer limit) {
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
// myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
//Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
asyncDataConverter.setDelegate(dataConverter);
asyncDataConverter.setTaskExecutor(processThreadPool);
asyncDataConverter.afterPropertiesSet();
return asyncDataConverter;
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
@StepScope
public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
asyncJsonPublisher.setDelegate(jsonPublisher);
asyncJsonPublisher.afterPropertiesSet();
return asyncJsonPublisher;
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
AsyncItemWriter<JsonMessage> asyncJsonPublisher,
ProcessStatus processStatus,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
.reader(dbReader)
.processor(asyncDataConverter)
.writer(asyncJsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new ConversionSkipListener(processStatus))
.build();
}
@Bean
public Job conversionJob(Step conversionProcess) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}