我正在使用 akka 流来处理我的数据。其中我有 1 个由元素 UUID 组成的源。
流程如下:
- 正在从某个第三方 HTTP 服务获取元素,该服务返回完整的元素及其属性。
- 然后我从该元素中检索所需的数据并将其转换为我的应用程序可以理解的对象。
- 然后我将该对象中的数据写入数据库。
- 最后,我用流中所有元素的状态更新数据库。
现在我想向这个流程添加重试机制,以便如果流程中的任何阶段失败,它应该重试该阶段一段时间,比如 3 次,如果之后它失败,那么应该发生流的唯一失败。例如,如果第三方服务存在一些问题,例如 HTTP 504 错误,那么大部分时间在重试此元素成功后。那么akka有什么方法可以实现这一点。
目前,我正在维护 1 个列表来存储所有失败的元素 ID,如下所示。
代码 :
List<UUID> failedId = new CopyOnWriteArrayList<>();
Source.from(elementIdToProcess).map(f -> {
failedId.add(f);
return f;
}).via(featureFlow()).via(filterNullFeaturesFlow())
.via(mapToFeatureFlow()).via(setFeaturesAdditionalInfo())
.runForeach(features -> {
features.parallelStream().forEach(feature -> {
try {
featureCommitter.save(feature);
failedId.remove(UUID.fromString(feature.getFeatureId()));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}, materializer);