0

使用 Spark 结构化流。

我正在编写一个需要对数据进行大量查找的代码。查找非常复杂,只是不能很好地转换为连接。

例如,在表 B 中查找字段 A 并获取一个值,如果找到则在另一个表中查找该值。如果未找到,则在表 D 中查找其他值 C,然后依此类推。

我设法使用 HBase 编写了这些查找,它在功能上运行良好。我为这些查找中的每一个都编写了 udfs,例如一个非常简单的可能是:

val someColFunc= udf( (code:String) =>
        {
            val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
            if (value != null)
                Bytes.toString(value)
            else
                null
        }
    )

由于 java hbase 客户端是不可序列化的。我正在像这样传递 Hbase 对象

object HbaseObject {
 val table = new HbaseUtilities(zkUrl)
}

HbaseUtilities 是我为简化查找而编写的一个类。它只是创建了一个 java HBase 客户端,并为我需要的那种 get 命令提供了一个包装器。

这使我的代码太慢了,这也没关系。令我困惑的是,增加或减少执行器或核心的数量对我的代码速度没有影响。无论是 1 个执行程序还是 30 个执行程序,它都以完全相同的速度运行。这让我相信缺乏并行性。所以我所有的工人都必须共享同一个 Hbase 对象。他们是我可以在每个工作人员开始执行之前在每个工作人员上实例化一个这样的对象的一种方式吗?我已经尝试过使用lazy val,它没有任何效果

我什至尝试创建一个 sharedSingleton ,如此处所示https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/,它为我解决了一些问题,但不是并行度的损失。

我知道可能有更好的方法来解决问题,并且非常欢迎所有建议,但现在我陷入了一些限制和紧迫的时间表。

4

2 回答 2

1

您需要在执行程序中创建所有不可序列化的对象。您可以使用foreachPartitionmapPartitions在每个执行程序中创建连接。

与此类似的东西(我使用的是 hbase 客户端 2.0.0):

 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}


df.foreachPartition(
partition => {
  //foreach executor create the connection and the table
  val config: Configuration = HBaseConfiguration.create()
  config.set("hbase.zookeeper.quorum", "zk url")
  val connection: Connection = ConnectionFactory.createConnection(config)
  val table = connection.getTable(TableName.valueOf("tableName"))
  partition.map(
    record => {
      val byteKey = Bytes.toBytes(record.getString(0))
      val get = new Get(byteKey)
      val result = table.get(get)
      //DO YOUR LOGIC HERE FOR EACH RECORD
    }
  ).toList
  table.close()
  connection.close()
}
)

df是您要查找的每条记录的数据框。

您可以为同一连接的每个执行程序创建所需数量的表。

当您在执行程序中创建所有对象时,您不需要处理不可序列化的问题。您可以将它放在像您这样的类HbaseUtilities中使用,但您只需要在 foreach/map 分区内创建一个新实例

于 2021-01-19T14:28:14.800 回答
0

您可以使用 HBase 项目主分支中的 HBase-Spark 连接器来完成您想要做的事情。由于某种原因,连接器似乎没有包含在任何官方 HBase 构建中,但您可以自己构建它并且它工作正常。只需构建 jar 并将其包含在您的 pom.xml 中。

一旦构建,连接器将允许您在 Worker 类中传递 HBase Connection 对象,因此您不必担心序列化连接或构建单例/等。

例如:

JavaSparkContext jSPContext ...; //Create Java Spark Context
HBaseConfiguration hbConf = HBaseConfiguration.create();
hbConf.set("hbase.zookeeper.quorum", zkQuorum);
hbConf.set("hbase.zookeeper.property.clientPort", PORT_NUM);
// this is your key link to HBase from Spark -- use it every time you need to access HBase inside the Spark parallelism:
JavaHBaseContext hBaseContext = new JavaHBaseContext(jSPContext, hbConf);   

// Create an RDD and parallelize it with HBase access:
JavaRDD<String> myRDD = ... //create your RDD
hBaseContext.foreachPartition(myRDD,  new SparkHBaseWorker());
// You can also do other usual Spark tasks, like mapPartitions, forEach, etc.

// The Spark worker class for foreachPartition use-case on RDD of type String would look something like this:
class SparkHBaseWorker implements VoidFunction<Tuple2<Iterator<String>, Connection>>
{
    private static final long serialVersionUID = 1L;
    
    public WorkerIngest()
    {
    }
    
// Put all your HBase logic into this function:
    @Override
    public void call(Tuple2<Iterator<String>, Connection> t) throws Exception
    {           
        // This is your HBase connection object:
        Connection conn = t._2();
        // Now you can do direct access to HBase from this Spark worker node:
        Table hbTable = conn.getTable(TableName.valueOf(MY_TABLE));
        // now do something with the table/etc.
    }
}
于 2020-07-27T08:00:41.143 回答