我有 BatchConfiguration 、 OrderProcesser 和 OrderReader 类文件。当我调用 Spring Boot 休息服务时,它给出了 NullPointerException。但是,当我在 Postman 中运行 SpringBoot 应用程序时,它会显示适当的结果。另外,我想知道哪个步骤使它运行多次,以便 billerOrderId 多次执行。我使用https://spring.io/guides/gs/batch-processing/来编写我的代码。
Here is my osm-billers.csv
BillerOrderId
1233
2344
4566
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private CsvFileToDatabaseJobConfig csvFileToDatabaseJobConfig;
@Autowired
private DatabaseToCsvFileJobConfig databaseToCsvFileJobConfig;
RestTemplate restTemplate;
@Bean
public FlatFileItemReader<Biller> reader(){
try {
FlatFileItemReader<Biller> itemReader = csvFileToDatabaseJobConfig.csvFileItemReader();
return itemReader ;
} catch (UnexpectedInputException e) {
throw new OrderBatchException("Invalid Input..." + e.getMessage());
} catch (ParseException e) {
throw new OrderBatchException("Parsing error..." + e.getMessage());
} catch (NonTransientResourceException e) {
throw new OrderBatchException("NonTransientReasource error..." + e.getMessage());
} catch (Exception e) {
throw new OrderBatchException("Unknown Read error..." + e.getMessage());
}
}
@Bean
public OrderProcessor processor() {
return new OrderProcessor();
}
@Bean
public ItemWriter<Biller> writer() {
try {
ItemWriter<Biller> itemWriter = databaseToCsvFileJobConfig.databaseCsvItemWriter();
return itemWriter;
} catch (Exception e) {
throw new OrderBatchException("Unknown Write error..." + e.getMessage());
}
}
@Bean
public Job importJobOrder(JobCompletionNotificationListner listener, Step step1) {
return jobBuilderFactory.get("importJobOrder")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(ItemWriter<Biller> writer) {
return stepBuilderFactory.get("step1")
.<Biller, Biller> chunk(10)
.reader((ItemReader<? extends Biller>) reader())
.processor(processor())
.writer(writer)
.build();
}
}
package com.spectrum.sci.processor;
import java.util.List;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.client.RestTemplate;
import com.spectrum.sci.model.Biller;
import com.spectrum.tos.model.Order;
import com.spectrum.tos.model.OrderRequest;
import com.spectrum.tos.model.OrderResponse;
public class OrderProcessor implements ItemProcessor<Biller, Biller>{
@Value("${osm.service.url}")
private String orderUrl;
@Value("${osm.service.username}")
private String userName;
@Value("${osm.service.password}")
private String password;
RestTemplate restTemplate;
@Override
public Biller process(Biller biller) throws Exception {
OrderRequest orderRequest = new OrderRequest();
Order order = new Order();
order.setBillerOrderId(biller.getBillerOrderId());
orderRequest.setOrder(order);
OrderReader osmReader = new OrderReader(orderUrl, userName, password, restTemplate, orderRequest);
OrderResponse orderResponse = osmReader.read();
if (orderResponse.getResult().equals("SUCCESS") ) {
return null;
} else {
//Failed transactions
return biller;
}
}
}
public class OrderReader implements ItemReader<OrderResponse>{
private static final Logger log = LoggerFactory.getLogger(OrderReader.class);
private final String apiUrl;
private final RestTemplate restTemplate;
private int nextOrderIndex;
private OrderResponse orderResponse;
@Autowired
private OrderRequest orderRequest;
private String userName;
private String password;
public OrderReader(String apiUrl, String userName, String password, RestTemplate restTemplate, OrderRequest orderRequest) {
this.apiUrl = apiUrl;
this.restTemplate = restTemplate;
this.orderRequest = orderRequest;
this.userName = userName;
this.password = password;
nextOrderIndex = 0;
}
private boolean orderisNotInitialized() {
return this.orderResponse == null;
}
private OrderResponse fetchOrderDataFromApi(OrderRequest orderRequest) {
// log.info("OrderRequest = " + orderRequest.getOrder().getBillerOrderId());
log.info("apiUrl = " + apiUrl);
log.info("userName = " + userName);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
headers.setBasicAuth(userName, password);
HttpEntity<OrderRequest> requestEntity =
new HttpEntity<OrderRequest>(orderRequest, headers);
ResponseEntity<OrderResponse> response =
restTemplate.exchange(apiUrl,HttpMethod.POST, requestEntity,OrderResponse.class);
log.info("response = " + response);
//ResponseEntity<OrderResponse[]> response = restTemplate.postForEntity(apiUrl, orderRequest, OrderResponse[].class);
OrderResponse orderResponse = response.getBody();
//return Arrays.asList(orderResponse);
return orderResponse;
}
@Override
public OrderResponse read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (orderisNotInitialized()) {
orderResponse = fetchOrderDataFromApi(orderRequest);
}
/*
OrderResponse nextOrder = null;
if (nextOrderIndex < orderResponse.size()) {
nextOrder = orderResponse.get(nextOrderIndex);
nextOrderIndex++;
}
return nextOrder; */
return orderResponse;
}
}
这是错误。
main] o.s.batch.core.step.AbstractStep : Encountered an error executing step step1 in job importJobOrder
java.lang.NullPointerException: null
at com.spectrum.sci.processor.OrderReader.fetchOrderDataFromApi(OrderReader.java:70) ~[classes/:na]
at com.spectrum.sci.processor.OrderReader.read(OrderReader.java:84) ~[classes/:na]
at com.spectrum.sci.processor.OrderProcessor.process(OrderProcessor.java:34) ~[classes/:na]
at com.spectrum.sci.processor.OrderProcessor.process(OrderProcessor.java:1) ~[classes/:na]