1

我需要每天处理大量用户列表,以便根据某些情况向他们发送电子邮件和 SMS 通知。我为此使用 Java EE 批处理模型。我的工作 xml 如下:

<step id="sendNotification">
    <chunk item-count="10" retry-limit="3">
        <reader ref="myItemReader"></reader>
        <processor ref="myItemProcessor"></processor>
        <writer ref="myItemWriter"></writer>
        <retryable-exception-classes>
            <include class="java.lang.IllegalArgumentException"/>
        </retryable-exception-classes>
    </chunk>
</step>

MyItemReader 的 onOpen 方法从数据库中读取所有用户,而 readItem() 使用列表迭代器一次读取一个用户。在 myItemProcessor 中,实际的电子邮件通知被发送给用户,然后用户被持久保存在 myItemWriter 类中该块的数据库中。

@Named
public class MyItemReader extends AbstractItemReader {

    private Iterator<User> iterator = null;
    private User lastUser;

    @Inject
    private MyService service;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        super.open(checkpoint);

        List<User> users = service.getUsers();
        iterator = users.iterator();

        if(checkpoint != null) {
            User checkpointUser = (User) checkpoint;
            System.out.println("Checkpoint Found: " + checkpointUser.getUserId());
            while(iterator.hasNext() && !iterator.next().getUserId().equals(checkpointUser.getUserId())) {
                System.out.println("skipping already read users ... ");
            }
        }
    }

    @Override
    public Object readItem() throws Exception {

        User user=null;

        if(iterator.hasNext()) {
            user = iterator.next();
            lastUser = user;
        }
        return user;
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return lastUser;
    }
}

我的问题是检查点存储在前一个块中执行的最后一条记录。如果我有一个包含接下来 10 个用户的块,并且在第 5 个用户的 myItemProcessor 中引发异常,那么在重试时,整个块将被执行,所有 10 个用户将被再次处理。我不希望再次向已处理的用户发送通知。

有没有办法处理这个?这应该如何有效地完成?

任何帮助将不胜感激。谢谢。

4

2 回答 2

1

您当前的项目处理器正在执行块事务范围之外的操作,这导致应用程序状态不同步。如果您的要求是仅在块中的所有项目成功完成后才发送电子邮件,那么您可以将电子邮件发送部分移动到ItemWriterListener.afterWrite(items)

于 2017-11-30T02:30:13.940 回答
1

我将以@cheng 的评论为基础。我在此感谢他,希望我的回答能够为有用地组织和展示选项提供额外的价值。

答案:排队等待另一个 MDB 发送邮件

背景:

正如@cheng 指出的那样,失败意味着整个事务被回滚,并且检查点没有前进。

那么如何处理您的块已向某些用户发送电子邮件但不是全部的事实呢?(你可能会说它回滚了,但有“副作用”。)

因此,我们可以将您的问题重述为:如何从批量块步骤发送电子邮件?

好吧,假设您有办法通过事务 API(实现XAResource等)发送电子邮件,您可以使用该 API。

假设您不这样做,我会对 JMS 队列进行事务性写入,然后使用单独的 MDB 发送电子邮件(正如 @cheng 在他的评论中所建议的那样)。

建议的替代方案:使用 ItemWriter 将消息发送到 JMS 队列,然后使用单独的 MDB 实际发送电子邮件

使用这种方法,您仍然可以通过批量处理和更新数据库来提高效率(无论如何您一次只发送一封电子邮件),并且您可以从简单的检查点和重新启动中受益,而无需编写复杂的错误处理。

这也有可能作为一种模式在批处理作业和批处理之外重复使用。

其他选择

其他一些我认为不太好的想法,为了讨论而列出:

添加批处理应用程序逻辑跟踪通过电子邮件发送的用户(使用 ItemProcessListener)

您可以使用ItemProcessListener方法构建自己的成功/失败电子邮件列表: afterProcessonProcessError

然后,在重新启动时,您可以知道当前块中已向哪些用户发送了电子邮件,即使已经发送了一些电子邮件,但由于整个块回滚,我们被重新定位到该块。

这肯定会使您的批处理逻辑复杂化,并且您还必须以某种方式保留此成功或失败列表。此外,这种方法可能对这项工作高度特定(而不是排队等待 MDB 处理)。

但它更简单,因为您只需一个批处理作业,而无需消息提供程序和单独的应用程序组件。

如果您走这条路线,您可能希望同时使用可跳过和“无回滚”可重试异常的组合。

单项块

如果您使用item-count="1"定义您的块,那么您可以避免复杂的检查点和错误处理代码。但是,您会牺牲效率,因此只有在批处理的其他方面非常引人注目时才有意义:例如,通过通用界面调度和管理作业,在作业中的失败步骤重新启动的能力

如果您要走这条路,您可能需要考虑将套接字和超时异常定义为“无回滚”异常(使用) 因为回滚没有任何好处,您可能想重试网络超时问题。

既然你特别提到了效率,我猜这不适合你。

使用事务同步

这也许可行,但批处理 API 并没有特别容易做到这一点,而且您仍然可能遇到块完成但一封或多封电子邮件发送失败的情况。

于 2017-12-04T21:43:13.723 回答