1

我正在尝试运行 spark (2.2) 作业以使用 GRPC (1.1.2) 客户端调用从服务器获取一些数据。通过 spark 运行此代码时出现此错误。为一小部分运行相同的作业可以正常工作。根据我的研究,我知道 ABORTED 消息是因为一些并发问题,所以我猜这是因为客户端无法创建超过一定数量的存根,但我不知道如何继续。此外,我知道 GRPC 服务器可以很好地处理大量请求,而我远低于它可以处理的请求数量。有任何想法吗?

根据要求添加更多信息:我的客户端 CatalogGrpcClient 有这些方法来处理通道和请求:

private List<ManagedChannel> getChannels() {
    return IntStream.range(0, numChannels).mapToObj(x ->
            ManagedChannelBuilder.forAddress(channelHost, channelPort).usePlaintext(true).build()
    ).collect(Collectors.toList());
}

private ManagedChannel getChannel() {
    return channels.get(ThreadLocalRandom.current().nextInt(channels.size()));
}

private ListingRequest populateRequest(ListingRequest.Builder req, String requestId) {
    return req.setClientSendTs(System.currentTimeMillis())
            .setRequestId(StringUtils.defaultIfBlank(req.getRequestId(), requestId))
            .setSchemaVersion(StringUtils.defaultIfBlank(req.getSchemaVersion(), schema))
            .build();
}

private List<ListingResponse> getGrpcListingWithRetry(ListingRequest.Builder request,
                                                      String requestIdStr,
                                                      int retryLimit,
                                                      int sleepBetweenRetry) {
    int retryCount = 0;
    while (retryCount < retryLimit) {
        try {
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(CatalogServiceGrpc.newBlockingStub(getChannel()).getListings(populateRequest(request, requestIdStr)), Spliterator.ORDERED), false).collect(Collectors.toList());
        } catch (Exception e) {
            System.out.println("Exception " + e.getCause().getMessage());
            retryCount = retryCount + 1;
            try {
                Thread.sleep(sleepBetweenRetry);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }
    throw new StatusRuntimeException(Status.ABORTED);
}

我在方法 extract 中使用 getCatalogListingData 方法,该方法用于映射到 spark 作业中的案例类

def extract(itemIds: List[Long], validAspects: Broadcast[Array[String]]): List[ItemDetailModel] = {
    var itemsDetails = List[ItemDetailModel]()
    val client = new CatalogGrpcClient()
    implicit val formats = DefaultFormats
    val listings = client.getCatalogListingData(itemIds.map(x => x.asInstanceOf[java.lang.Long]).asJava).asScala
    ...
    ...
    itemsDetails
}

这是调用提取的火花代码。itemsMissingDetails 是一个数据框,其中有一列“item”,它是唯一项目 ID 的列表。zipWithIndex 和下面的映射是为了让我在每个请求中将 50 个项目 ID 传递给 GRPC svc。

itemsMissingDetails
  .rdd
  .zipWithIndex
  .map(x => (x._2 / 50, List(x._1.getLong(0))))
  .reduceByKey(_ ++ _)
  .flatMap(items => extract(items._2, validAspects))
  .toDF
  .write
  .format("csv")
  .option("header",true)
  .option("sep", "\t")
  .option("escapeQuotes", false)
  .save(path)

ABORTED 错误实际上是我的客户在很长一段时间(约 30 分钟到 1 小时)后抛出的。当我开始这项工作时,它会从 GRPC svc 获得我需要的信息,用于每个工人的几千个项目。在此之后,作业挂起(在每个工人身上),经过很长时间的等待(约 30 分钟到 1 小时),它会因上述异常而失败或继续进行。我无法始终如一地获得 StatusRuntimeException。

4

0 回答 0