0

我正在尝试使用Maxmind 扫雪机库来提取数据框中每个 IP 上的地理数据。

我们正在使用 Spark SQL(spark 版本 2.1.0),我在以下类中创建了一个 UDF:

class UdfDefinitions @Inject() extends Serializable with StrictLogging {

 sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
 val s3Config = configuration.databases.dataWarehouse.s3
 val lruCacheConst = 20000
 val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName) ),
  ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)

 def lookupIP(ip: String): LookupIPResult = {
  val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
  loc match {
    case None => LookupIPResult("", "", "")
    case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""), 
   x.city.getOrElse(""), x.regionName.getOrElse(""))
   }
 }

 val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)

}

目的是在 UDF 外部创建指向文件 (ipLookups) 的指针并在内部使用它,因此不要在每一行上打开文件。这会出现任务未序列化的错误,并且当我们在 UDF 中使用 addFiles 时,会出现打开文件过多的错误(使用大型数据集时,在小型数据集上它确实有效)。

这个线程展示了如何使用 RDD 来解决问题,但我们想使用 Spark SQL。在 spark 序列化中使用 maxmind geoip

有什么想法吗?谢谢

4

1 回答 1

0

这里的问题是 IpLookups 不可序列化。然而,它会从静态文件(我收集的文件)中进行查找,所以你应该能够解决这个问题。我建议您克隆存储库并使 IpLookups 可序列化。然后,为了让它与 spark SQL 一起工作,像你一样将所有东西包装在一个类中。在主要的 spark 作业中,您可以编写如下内容:

val IPResolver = new MySerializableIpResolver()
val resolveIP = udf((ip : String) => IPResolver.resolve(ip))
data.withColumn("Result", resolveIP($"IP"))

如果您没有那么多不同的 IP 地址,还有另一种解决方案:您可以在驱动程序中执行所有操作。

val ipMap = data.select("IP").distinct.collect
    .map(/* calls to the non serializable IpLookups but that's ok, we are in the driver*/)
    .toMap
val resolveIP = udf((ip : String) => ipMap(ip))
data.withColumn("Result", resolveIP($"IP"))
于 2017-12-11T18:07:29.020 回答