1

我正在 Apache Ignite 中尝试容错

我不知道如何在任何节点上重试失败的作业。我有一个用例,我的工作将通过进程构建器调用第三方工具作为系统进程来进行一些计算。在某些情况下,该工具可能会失败,但在大多数情况下,可以在任何节点上重试该作业 - 包括之前失败的那个节点。

目前,Ignite 似乎将作业重新路由到之前没有此作业的另一个节点。因此,过了一会儿,所有节点都消失了,任务失败了。

我正在寻找的是如何在任何节点上重试作业。

这是一个测试来证明我的问题。

这是我随机失败的工作:

public static class RandomlyFailingComputeJob implements ComputeJob {
    private static final long serialVersionUID = -8351095134107406874L;
    private final String data;

    public RandomlyFailingComputeJob(String data) {
        Validate.notNull(data);
        this.data = data;
    }

    public void cancel() {
    }

    public Object execute() throws IgniteException {
        final double random = Math.random();
        if (random > 0.5) {
            throw new IgniteException();
        } else {
            return StringUtils.reverse(data);
        }
    }
}

下面是任务:

public static class RandomlyFailingComputeTask extends
        ComputeTaskSplitAdapter<String, String> {
    private static final long serialVersionUID = 6756691331287458885L;

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res,
            List<ComputeJobResult> rcvd) throws IgniteException {
        if (res.getException() != null) {
            return ComputeJobResultPolicy.FAILOVER;
        }
        return ComputeJobResultPolicy.WAIT;
    }

    public String reduce(List<ComputeJobResult> results)
            throws IgniteException {
        final Collection<String> reducedResults = new ArrayList<String>(
                results.size());
        for (ComputeJobResult result : results) {
            reducedResults.add(result.<String> getData());
        }
        return StringUtils.join(reducedResults, ' ');
    }

    @Override
    protected Collection<? extends ComputeJob> split(int gridSize,
            String arg) throws IgniteException {
        final String[] args = StringUtils.split(arg, ' ');
        final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
                args.length);
        for (String data : args) {
            computeJobs.add(new RandomlyFailingComputeJob(data));
        }
        return computeJobs;
    }

}

测试代码:

    final Ignite ignite = Ignition.start();
    final String original = "The quick brown fox jumps over the lazy dog";

    final String reversed = StringUtils.join(
            ignite.compute().execute(new RandomlyFailingComputeTask(),
                    original), ' ');

如您所见,应始终进行故障转移。由于失败的概率!= 1,我希望任务在某个时候成功终止。

使用 0.5 的概率阈值和总共 3 个节点,这几乎不会发生。我遇到了一个例外,例如class org.apache.ignite.cluster.ClusterTopologyException: Failed to failover a job to another node (failover SPI returned null). 经过一些调试,我发现这是因为我最终用完了节点。所有的都没有了。

我知道我可以自己写FailoverSpi来处理这个问题。

但这只是感觉不对。

首先,这样做似乎有点矫枉过正。
但是SPI是一种全球性的东西。我想决定每项工作是否应该重试或故障转移。例如,这可能取决于我正在调用的第三方工具的退出代码。所以在全局 SPI 上配置故障转移是不对的。

4

1 回答 1

2

如果 AlwaysFailoverSpi 的当前实现(这是默认实现)已经尝试了特定作业的所有节点,则不会进行故障转移。我相信它可以是一个配置选项,但现在您必须实现自己的故障转移 SPI(它应该非常简单 - 每次作业尝试故障转移时只需从拓扑中选择一个随机节点)。

至于 SPI 的全局性质,你是对的,但它的 failover() 采用 FailoverContext,其中包含有关失败作业的信息(任务名称、属性、异常等),因此您可以根据这些信息做出决定。

于 2015-06-29T21:15:11.980 回答