0

在 Qubole 笔记本中,我试图从 API 响应中获取某些字符串。对于示例数据,它似乎工作得很好,但是当我使用完整集时失败了。星火版本:2.3.1;斯卡拉版本:2.11;scalaj-http 版本:2.4.2

import scalaj.http._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

// Sample data to test the code
val locationIds = Seq(
  ("178277"),("178278"),("178279")).toDF("search_destination_id")

def getResponse(request_url: String): String = Http(request_url).asString.body

def getGeoType(col: Column): Column = {
  udf { gaiaId: String =>
    val request_url = "https://...some_url..." + gaiaId + "...some_parameters"
    val response = getResponse(request_url)
    val pattern = "(?<=type\":\").*?(?=\")".r
    pattern.findFirstIn(response).getOrElse("no match")
  }.apply(col)
}

val result = locationIds.withColumn("geo_type", getGeoType($"search_destination_id"))
result.show

我为样本数据获得的预期结果示例:

+---------------------+-------------------+
|search_destination_id|           geo_type|
+---------------------+-------------------+
|               178277|multi_city_vicinity|
|               178278|multi_city_vicinity|
|               178279|multi_city_vicinity|
+---------------------+-------------------+

当我尝试处理我必须处理的所有 search_destination_id 时收到错误消息:

  at java.lang.Class.getSimpleBinaryName(Class.java:1450)
  at java.lang.Class.getSimpleName(Class.java:1309)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage$lzycompute(ScalaUDF.scala:1048)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage(ScalaUDF.scala:1047)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.doGenCode(ScalaUDF.scala:1000)
  at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
  at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
  at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
  at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
  at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
  at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
  at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:85)
  at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:206)
  at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
  at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
  at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
  at org.apache.spark.sql.execution.BaseLimitExec$class.doProduce(limit.scala:70)
  at org.apache.spark.sql.execution.LocalLimitExec.doProduce(limit.scala:97)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.LocalLimitExec.produce(limit.scala:97)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3280)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3261)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:78)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3260)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2705)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:256)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:725)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:684)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:693)
  ... 96 elided
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -21
  at java.lang.String.substring(String.java:1931)
  at java.lang.Class.getSimpleBinaryName(Class.java:1448)
  ... 174 more

请告知导致此错误的原因以及如何避免它。

4

0 回答 0