我正在尝试运行 spark (2.2) 作业以使用 GRPC (1.1.2) 客户端调用从服务器获取一些数据。通过 spark 运行此代码时出现此错误。为一小部分运行相同的作业可以正常工作。根据我的研究,我知道 ABORTED 消息是因为一些并发问题,所以我猜这是因为客户端无法创建超过一定数量的存根,但我不知道如何继续。此外,我知道 GRPC 服务器可以很好地处理大量请求,而我远低于它可以处理的请求数量。有任何想法吗?
根据要求添加更多信息:我的客户端 CatalogGrpcClient 有这些方法来处理通道和请求:
private List<ManagedChannel> getChannels() {
return IntStream.range(0, numChannels).mapToObj(x ->
ManagedChannelBuilder.forAddress(channelHost, channelPort).usePlaintext(true).build()
).collect(Collectors.toList());
}
private ManagedChannel getChannel() {
return channels.get(ThreadLocalRandom.current().nextInt(channels.size()));
}
private ListingRequest populateRequest(ListingRequest.Builder req, String requestId) {
return req.setClientSendTs(System.currentTimeMillis())
.setRequestId(StringUtils.defaultIfBlank(req.getRequestId(), requestId))
.setSchemaVersion(StringUtils.defaultIfBlank(req.getSchemaVersion(), schema))
.build();
}
private List<ListingResponse> getGrpcListingWithRetry(ListingRequest.Builder request,
String requestIdStr,
int retryLimit,
int sleepBetweenRetry) {
int retryCount = 0;
while (retryCount < retryLimit) {
try {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(CatalogServiceGrpc.newBlockingStub(getChannel()).getListings(populateRequest(request, requestIdStr)), Spliterator.ORDERED), false).collect(Collectors.toList());
} catch (Exception e) {
System.out.println("Exception " + e.getCause().getMessage());
retryCount = retryCount + 1;
try {
Thread.sleep(sleepBetweenRetry);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
throw new StatusRuntimeException(Status.ABORTED);
}
我在方法 extract 中使用 getCatalogListingData 方法,该方法用于映射到 spark 作业中的案例类
def extract(itemIds: List[Long], validAspects: Broadcast[Array[String]]): List[ItemDetailModel] = {
var itemsDetails = List[ItemDetailModel]()
val client = new CatalogGrpcClient()
implicit val formats = DefaultFormats
val listings = client.getCatalogListingData(itemIds.map(x => x.asInstanceOf[java.lang.Long]).asJava).asScala
...
...
itemsDetails
}
这是调用提取的火花代码。itemsMissingDetails 是一个数据框,其中有一列“item”,它是唯一项目 ID 的列表。zipWithIndex 和下面的映射是为了让我在每个请求中将 50 个项目 ID 传递给 GRPC svc。
itemsMissingDetails
.rdd
.zipWithIndex
.map(x => (x._2 / 50, List(x._1.getLong(0))))
.reduceByKey(_ ++ _)
.flatMap(items => extract(items._2, validAspects))
.toDF
.write
.format("csv")
.option("header",true)
.option("sep", "\t")
.option("escapeQuotes", false)
.save(path)
ABORTED 错误实际上是我的客户在很长一段时间(约 30 分钟到 1 小时)后抛出的。当我开始这项工作时,它会从 GRPC svc 获得我需要的信息,用于每个工人的几千个项目。在此之后,作业挂起(在每个工人身上),经过很长时间的等待(约 30 分钟到 1 小时),它会因上述异常而失败或继续进行。我无法始终如一地获得 StatusRuntimeException。