我需要每天处理大量用户列表,以便根据某些情况向他们发送电子邮件和 SMS 通知。我为此使用 Java EE 批处理模型。我的工作 xml 如下:
<step id="sendNotification">
<chunk item-count="10" retry-limit="3">
<reader ref="myItemReader"></reader>
<processor ref="myItemProcessor"></processor>
<writer ref="myItemWriter"></writer>
<retryable-exception-classes>
<include class="java.lang.IllegalArgumentException"/>
</retryable-exception-classes>
</chunk>
</step>
MyItemReader 的 onOpen 方法从数据库中读取所有用户,而 readItem() 使用列表迭代器一次读取一个用户。在 myItemProcessor 中,实际的电子邮件通知被发送给用户,然后用户被持久保存在 myItemWriter 类中该块的数据库中。
@Named
public class MyItemReader extends AbstractItemReader {
private Iterator<User> iterator = null;
private User lastUser;
@Inject
private MyService service;
@Override
public void open(Serializable checkpoint) throws Exception {
super.open(checkpoint);
List<User> users = service.getUsers();
iterator = users.iterator();
if(checkpoint != null) {
User checkpointUser = (User) checkpoint;
System.out.println("Checkpoint Found: " + checkpointUser.getUserId());
while(iterator.hasNext() && !iterator.next().getUserId().equals(checkpointUser.getUserId())) {
System.out.println("skipping already read users ... ");
}
}
}
@Override
public Object readItem() throws Exception {
User user=null;
if(iterator.hasNext()) {
user = iterator.next();
lastUser = user;
}
return user;
}
@Override
public Serializable checkpointInfo() throws Exception {
return lastUser;
}
}
我的问题是检查点存储在前一个块中执行的最后一条记录。如果我有一个包含接下来 10 个用户的块,并且在第 5 个用户的 myItemProcessor 中引发异常,那么在重试时,整个块将被执行,所有 10 个用户将被再次处理。我不希望再次向已处理的用户发送通知。
有没有办法处理这个?这应该如何有效地完成?
任何帮助将不胜感激。谢谢。