使用 Spark 结构化流。
我正在编写一个需要对数据进行大量查找的代码。查找非常复杂,只是不能很好地转换为连接。
例如,在表 B 中查找字段 A 并获取一个值,如果找到则在另一个表中查找该值。如果未找到,则在表 D 中查找其他值 C,然后依此类推。
我设法使用 HBase 编写了这些查找,它在功能上运行良好。我为这些查找中的每一个都编写了 udfs,例如一个非常简单的可能是:
val someColFunc= udf( (code:String) =>
{
val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
if (value != null)
Bytes.toString(value)
else
null
}
)
由于 java hbase 客户端是不可序列化的。我正在像这样传递 Hbase 对象
object HbaseObject {
val table = new HbaseUtilities(zkUrl)
}
HbaseUtilities 是我为简化查找而编写的一个类。它只是创建了一个 java HBase 客户端,并为我需要的那种 get 命令提供了一个包装器。
这使我的代码太慢了,这也没关系。令我困惑的是,增加或减少执行器或核心的数量对我的代码速度没有影响。无论是 1 个执行程序还是 30 个执行程序,它都以完全相同的速度运行。这让我相信缺乏并行性。所以我所有的工人都必须共享同一个 Hbase 对象。他们是我可以在每个工作人员开始执行之前在每个工作人员上实例化一个这样的对象的一种方式吗?我已经尝试过使用lazy val,它没有任何效果
我什至尝试创建一个 sharedSingleton ,如此处所示https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/,它为我解决了一些问题,但不是并行度的损失。
我知道可能有更好的方法来解决问题,并且非常欢迎所有建议,但现在我陷入了一些限制和紧迫的时间表。