1

我想使用像https://github.com/davidmoten/rtree2这样的 Rtree作为 spark 广播变量。

但是,java.io.NotSerializableException: com.github.davidmoten.rtree2.RTree它不受支持。

有解决方法吗?https://github.com/davidmoten/rtree2建议:

/ deserialize the entries from disk (for example)
List<Entry<Thing, Point> entries = ...
// bulk load
RTree<Thing, Point> tree = RTree.maxChildren(28).star().create(entries); 

但我不知道如何在广播变量的上下文中适应它。即,我当然可以广播条目列表,但我不知道在使用 UDF 时在所有执行程序上初始化 Rtree 的入口点。当然,应该可以通过映射分区实现,但我更喜欢 UDF 方法。

import com.github.davidmoten.grumpy.core.Position
  import com.github.davidmoten.rtree2.geometry.{Geometries, Point}
  import com.github.davidmoten.rtree2.{Entry, Iterables, RTree}

  val sydney = Geometries.point(151.2094, -33.86)
  val canberra = Geometries.point(149.1244, -35.3075)
  val brisbane = Geometries.point(153.0278, -27.4679)
  val bungendore = Geometries.point(149.4500, -35.2500)

  var tree = RTree.star.create[String, Point]
  tree = tree.add("Sydney", sydney)
  tree = tree.add("Brisbane", brisbane)

  val broadcastVar = spark.sparkContext.broadcast(tree)

这失败了,上述例外。

顺便说一句,这也适用于:

  • https://github.com/davidmoten/rtree

    RTree 树 = ...; 输出流操作系统 = ...; 序列化器 serializer = Serializers.flatBuffers().utf8(); 序列化器.write(树,操作系统);显然这应该可以工作,但至少对于火花来说,它不会在相同的例外情况下工作

编辑 2

解决方法:

使用正确支持序列化的https://github.com/plokhotnyuk/rtree2d之类的东西。尽管如此,如何将其改装为第一个示例会很有趣。

4

0 回答 0