我有一个 PySpark 作业,可以更新 HBase 中的一些对象(Spark v1.6.0;happybase v0.9)。
如果我为每一行打开/关闭一个 HBase 连接,它有点工作:
def process_row(row):
conn = happybase.Connection(host=[hbase_master])
# update HBase record with data from row
conn.close()
my_dataframe.foreach(process_row)
在几千次 upsert 之后,我们开始看到如下错误:
TTransportException: Could not connect to [hbase_master]:9090
显然,为每个 upsert 打开/关闭连接是低效的。这个函数实际上只是一个适当解决方案的占位符。
然后我尝试创建一个process_row
使用连接池的函数版本:
pool = happybase.ConnectionPool(size=20, host=[hbase_master])
def process_row(row):
with pool.connection() as conn:
# update HBase record with data from row
由于某种原因,此函数的连接池版本返回错误(请参阅完整的错误消息):
TypeError: can't pickle thread.lock objects
你能看出我做错了什么吗?
更新
我看到这篇文章并怀疑我遇到了同样的问题:Spark 尝试序列化pool
对象并将其分发给每个执行程序,但是这个连接池对象不能在多个执行程序之间共享。
听起来我需要将数据集拆分为多个分区,并为每个分区使用一个连接(请参阅使用 foreachrdd 的设计模式)。我尝试了这个,基于文档中的一个例子:
def persist_to_hbase(dataframe_partition):
hbase_connection = happybase.Connection(host=[hbase_master])
for row in dataframe_partition:
# persist data
hbase_connection.close()
my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))
不幸的是,它仍然返回“无法腌制 thread.lock 对象”错误。