10

我有一个 PySpark 作业,可以更新 HBase 中的一些对象(Spark v1.6.0;happybase v0.9)。

如果我为每一行打开/关闭一个 HBase 连接,它有点工作:

def process_row(row):
    conn = happybase.Connection(host=[hbase_master])
    # update HBase record with data from row
    conn.close()

my_dataframe.foreach(process_row)

在几千次 upsert 之后,我们开始看到如下错误:

TTransportException: Could not connect to [hbase_master]:9090

显然,为每个 upsert 打开/关闭连接是低效的。这个函数实际上只是一个适当解决方案的占位符。

然后我尝试创建一个process_row使用连接池的函数版本:

pool = happybase.ConnectionPool(size=20, host=[hbase_master])

def process_row(row):
    with pool.connection() as conn:
        # update HBase record with data from row

由于某种原因,此函数的连接池版本返回错误(请参阅完整的错误消息):

TypeError: can't pickle thread.lock objects

你能看出我做错了什么吗?

更新

我看到这篇文章并怀疑我遇到了同样的问题:Spark 尝试序列化pool对象并将其分发给每个执行程序,但是这个连接池对象不能在多个执行程序之间共享。

听起来我需要将数据集拆分为多个分区,并为每个分区使用一个连接(请参阅使用 foreachrdd 的设计模式)。我尝试了这个,基于文档中的一个例子:

def persist_to_hbase(dataframe_partition):
    hbase_connection = happybase.Connection(host=[hbase_master])
    for row in dataframe_partition:
        # persist data
    hbase_connection.close()

my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))

不幸的是,它仍然返回“无法腌制 thread.lock 对象”错误。

4

1 回答 1

1

下线happybase连接只是tcp连接,因此它们不能在进程之间共享。连接池主要对多线程应用程序有用,也证明对单线程应用程序有用,可以将池用作具有连接重用的全局“连接工厂”,这可以简化代码,因为不需要传递“连接”对象大约。它还使错误恢复更容易一些。

在任何情况下,一个池(它只是一组连接)都不能在进程之间共享。出于这个原因,尝试对其进行序列化是没有意义的。(池使用导致序列化失败的锁,但这只是一个症状。)

也许您可以使用有条件地创建池(或连接)并将其存储为模块局部变量的助手,而不是在导入时实例化它,例如

_pool = None

def get_pool():
    global _pool
    if _pool is None:
        _pool = happybase.ConnectionPool(size=1, host=[hbase_master])
    return pool

def process(...)
    with get_pool().connection() as connection:
        connection.table(...).put(...)

这会在第一次使用而不是在导入时实例化池/连接。

于 2016-12-14T19:12:52.690 回答