0

我正在将 DynamoDB 与 Vertx 一起使用,并且我的一个 Verticle 因错误而超时

等待 30000(ms) 回复后超时。地址

eventBus.send("test", testObject, x -> {
  if (x.succeeded()) {
        log.info("done successfully")
  } else {
        error(“error while completing”)
  }
}
);
public CompletableFuture<Void> process(Object testObject) {
    return CompletableFuture.runAsync(() -> dynamoMapper.save(testObject))
            .thenAcceptAsync(result -> {
        log.info("done successfully")
            }).exceptionally(throwable -> {
                throw new CompletionException(throwable);
            });
}

final void listen(String address) {
    eventBus.consumer("test", x -> process(x).whenCompleteAsync((result, t) -> {
        if (t == null) {
            x.reply(OK);
        } else {
            x.fail(0, errorMessage);
        }
    }));
}

但是当我异步运行 DynamoDB 保存查询时,我没有遇到这个问题。有人可以建议将 DynamoDB 与 vert.x 一起使用的最佳实践吗?

4

1 回答 1

0

在 vert.x 中,默认情况下,您不能通过事件总线 ( https://vertx.io/docs/vertx-core/java/#_types_of_messages )传递 pojo

此外,您不需要使用期货,因为 vertx 可以满足您的大部分需求。

我会使用 aws async 客户端,但目前您不能将映射器与它一起使用。如果您仍然想使用映射器,它应该类似于以下内容:

vertx.eventBus().<String>consumer("dynamodb-example", message -> {
  try {
    CatalogItem catalogItem = Json.decodeValue(message.body(), CatalogItem.class);
    dynamoDbMapper.save(catalogItem);
    message.reply(Json.encode(catalogItem));
  } catch (Exception e) {
    message.fail(500, e.getMessage());
  }
});

由于他们的 SDK 处于阻塞状态,您可以使用 executeBlocking 来避免阻塞事件总线:

vertx.eventBus().<String>consumer("dynamodb-example", message -> {
  vertx.<CatalogItem>executeBlocking(promise -> {
    CatalogItem catalogItem = Json.decodeValue(message.body(), CatalogItem.class);
    dynamoDbMapper.save(catalogItem);
    promise.complete(catalogItem);
  }, asyncResult -> {
    if (asyncResult.succeeded()) {
      message.reply(Json.encode(asyncResult.result()));
    } else {
      message.fail(500, asyncResult.cause().getMessage());
    }
  });
});

然后你可以调用它:

CatalogItem catalogItem = new CatalogItem();
vertx.eventBus().<String>request("dynamodb-example", Json.encode(catalogItem), asyncResult -> {
  if (asyncResult.succeeded()) {
    CatalogItem catalogItemResult = Json.decodeValue(asyncResult.result().body(), CatalogItem.class);
    // do something
  } else {
    // handle exception
  }
});

您还可以使用 JsonObject 而不是字符串,或者定义自己的编解码器以通过事件总线传递 pojo。

于 2020-08-27T14:01:47.327 回答