6

我目前的情况:

我有10000 条记录作为 batch 的输入。根据我的理解,批处理仅用于逐记录处理。因此,我在批处理步骤中使用 dataweave 组件转换每条记录(注意:我没有使用任何批处理提交)并将每条记录写入文件。进行逐条记录处理的原因是假设在任何特定记录中,存在无效数据,只有该特定记录失败,其余记录将被正常处理。

但是在我看到的许多博客中,他们都在使用带有 dataweave 组件的批处理提交(带有流)。因此,据我了解,所有记录将一次性提供给 dataweave,如果一条记录包含无效数据,则所有 10000 条记录都将失败(在 dataweave 处)。然后,逐记录处理的点就丢失了。上面的假设是正确的还是我想错了?

这就是我不使用批量提交的原因。

现在,正如我所说,将每条记录发送到文件中。实际上,我确实需要将每条记录发送到5 个不同的 CSV 文件。因此,目前我在BatchStep中使用Scatter-Gather组件将其发送到五个不同的路线。在此处输入图像描述

如,您可以看到图像。输入阶段给出了 10000 条记录的集合。每条记录将使用 Scatter-Gather 发送到 5 个路由。

是,我使用的方法很好,还是可以遵循任何更好的设计?

此外,我创建了第二批步骤,仅捕获FAILEDRECORDS。但是,使用当前的设计,我无法捕获失败的记录。

4

1 回答 1

5

简短的答案

上面的假设是正确的还是我想错了?

简而言之,是的,您的想法是错误的。通过示例阅读我的 loooong 解释以了解原因,希望您会欣赏它。

此外,我创建了第二批步骤,仅捕获 FAILEDRECORDS。但是,使用当前的设计,我无法捕获失败的记录。

您可能忘记max-failed-records = "-1"在批处理作业上设置(无限制)。默认为 0,第一次失败的记录批处理将返回并且不执行后续步骤。

是,我使用的方法很好,还是可以遵循任何更好的设计?

我认为如果性能对您至关重要并且您无法应对按顺序执行此操作所产生的开销,这是有道理的。相反,如果您可以放慢一点速度,分 5 个不同的步骤执行此操作可能是有意义的,您将失去并行性,但您可以更好地控制失败的记录,尤其是在使用批量提交的情况下。

MULE 批量作业在实践中

我认为通过一个例子来解释它是如何工作的最好方法。

考虑以下情况:您的批处理配置为max-failed-records = "-1"(无限制)。

<batch:job name="batch_testBatch" max-failed-records="-1">

在这个过程中,我们输入一个由 6 个字符串组成的集合。

 <batch:input>
            <set-payload value="#[['record1','record2','record3','record4','record5','record6']]" doc:name="Set Payload"/>
 </batch:input>

处理由 3 个步骤组成”第一步只是记录处理,第二步将改为进行记录并在 record3 上抛出异常以模拟失败。

<batch:step name="Batch_Step">
        <logger message="-- processing #[payload] in step 1 --" level="INFO" doc:name="Logger"/>
 </batch:step>
 <batch:step name="Batch_Step2">
     <logger message="-- processing #[payload] in step 2 --" level="INFO" doc:name="Logger"/>
     <scripting:transformer doc:name="Groovy">
         <scripting:script engine="Groovy"><![CDATA[
         if(payload=="record3"){
             throw new java.lang.Exception();
         }
         payload;
         ]]>
         </scripting:script>
     </scripting:transformer>
</batch:step>

第三步将只包含提交计数值为 2 的提交。

<batch:step name="Batch_Step3">
    <batch:commit size="2" doc:name="Batch Commit">
        <logger message="-- committing #[payload] --" level="INFO" doc:name="Logger"/>
    </batch:commit>
</batch:step>

现在你可以跟着我执行这个批处理了:

在此处输入图像描述

开始时,第一步将处理所有 6 条记录,登录控制台将如下所示:

 -- processing record1 in step 1 --
 -- processing record2 in step 1 --
 -- processing record3 in step 1 --
 -- processing record4 in step 1 --
 -- processing record5 in step 1 --
 -- processing record6 in step 1 --
Step Batch_Step finished processing all records for instance d8660590-ca74-11e5-ab57-6cd020524153 of job batch_testBatch

现在在第 2 步会更有趣,记录 3 将失败,因为我们明确地抛出了一个异常,但尽管如此,该步骤将继续处理其他记录,这里是日志的样子。

-- processing record1 in step 2 --
-- processing record2 in step 2 --
-- processing record3 in step 2 --
com.mulesoft.module.batch.DefaultBatchStep: Found exception processing record on step ...
Stacktrace
....
-- processing record4 in step 2 --
-- processing record5 in step 2 --
-- processing record6 in step 2 --
Step Batch_Step2 finished processing all records for instance d8660590-ca74-11e5-ab57-6cd020524153 of job batch_testBatch

此时,尽管此步骤中的记录失败,但批处理仍将继续,因为参数max-failed-records设置为-1(无限制)而不是默认值 0。

此时所有成功的记录都会传给 step3,这是因为默认情况下,accept-policystep 的参数设置为NO_FAILURES。(其他可能的值为ALLONLY_FAILURES)。

现在包含计数等于 2 的提交阶段的 step3 将两个两个提交记录:

-- committing [record1, record2] --
-- committing [record4, record5] --
Step: Step Batch_Step3 finished processing all records for instance d8660590-ca74-11e5-ab57-6cd020524153 of job batch_testBatch
-- committing [record6] --

正如您所看到的,这证实了失败的记录 3 没有传递到下一步,因此没有提交。

从这个例子开始,我认为你可以想象和测试更复杂的场景,例如在提交之后,你可以有另一个步骤,只处理失败的记录,让管理员知道失败的邮件。在您始终可以使用外部存储来存储有关您的记录的更多高级信息之后,您可以在我对其他问题的回答中阅读。

希望这可以帮助

于 2016-02-03T13:38:50.773 回答