5

我正在使用 akka 流来处理我的数据。其中我有 1 个由元素 UUID 组成的源。

流程如下:

  1. 正在从某个第三方 HTTP 服务获取元素,该服务返回完整的元素及其属性。
  2. 然后我从该元素中检索所需的数据并将其转换为我的应用程序可以理解的对象。
  3. 然后我将该对象中的数据写入数据库。
  4. 最后,我用流中所有元素的状态更新数据库。

现在我想向这个流程添加重试机制,以便如果流程中的任何阶段失败,它应该重试该阶段一段时间,比如 3 次,如果之后它失败,那么应该发生流的唯一失败。例如,如果第三方服务存在一些问题,例如 HTTP 504 错误,那么大部分时间在重试此元素成功后。那么akka有什么方法可以实现这一点。

目前,我正在维护 1 个列表来存储所有失败的元素 ID,如下所示。

代码 :

List<UUID> failedId = new CopyOnWriteArrayList<>();
Source.from(elementIdToProcess).map(f -> {
            failedId.add(f);
            return f;
        }).via(featureFlow()).via(filterNullFeaturesFlow())
            .via(mapToFeatureFlow()).via(setFeaturesAdditionalInfo())
            .runForeach(features -> {
                features.parallelStream().forEach(feature -> {
                    try {
                        featureCommitter.save(feature);
                        failedId.remove(UUID.fromString(feature.getFeatureId()));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }, materializer);
4

2 回答 2

1

我有一个类似的问题,并使用名为retry的 Future 功能解决了它

根据您的代码,我将以这种方式实现它:

  @BeforeClass
  public static void setUpClass() {
      system = ActorSystem.create();
      mat = ActorMaterializer.create(system);
  }

  @AfterClass
  public static void tearDown() {
      TestKit.shutdownActorSystem(system);
      system = null;
  }

  @Test
  public void shouldRetryArbitraryNumberOfTimes() {
    doThrow(new RuntimeException()).when(featureCommitter).process(anyString(), anyString(), anyString());
    TestSubscriber.Probe<Message> probe = getJmsSource().runWith(TestSink.probe(system), mat);
    Throwable throwable = probe.request(1).expectError();
    verify(featureCommitter, timeout(5000).times(4)).save(any(Feature.class));
  }

  private Source<Feature> getJmsSource() {
    List<UUID> failedId = new CopyOnWriteArrayList<>();
      return Source.from(elementIdToProcess)
              .via(featureFlow())
              .via(filterNullFeaturesFlow())
              .via(mapToFeatureFlow())
              .via(setFeaturesAdditionalInfo())
              .mapAsync(1, features -> {
                      features.parallelStream().forEach(feature -> retry(getCompletionStageCallable(feature),
                                                                         3, 
                                                                         Duration.ofMillis(200),
                                                                         system.scheduler(),
                                                                         system.dispatcher());
                  });
  }

  private Callable<CompletionStage<Feature>> getCompletionStageCallable(Feature feature) {
    () -> {
      return return CompletableFuture.supplyAsync(() -> {
          try {
              featureCommitter.save(feature);
          } catch (Exception e) {
              throw new RuntimeException(e);
          }
          return feature;
      }
  }

这里要考虑的主要事情是,我们不是将重试作为流的一部分来处理,而是作为处理未来的一种方式。如您所见,我将保存从 Sink 移到 amapAsync中,在其中我将并行度设置为 1。这样我就可以使用 aTestSubscriber.Probe来验证流行为。

我希望这有帮助。

问候

于 2018-09-23T12:39:09.923 回答
1

你可以像这样重试

source.recoverWithRetries(attempts = 2, {
  case _: RuntimeException ⇒ source
})

RestartSource或者,您可以使用RestartSink或来制定退避策略RestartFlow。所有这些都记录在Akka 文档中

于 2018-06-23T09:40:29.980 回答