1

语境

我使用 Python (3.7) 在 Hadoop 服务器上运行多个查询。

经过多次测试,我认为 Impala 是查询数据库效率最高的引擎。所以我使用 Ibis 框架设置了一个连接,以强制使用 Impala(默认使用 Hive)。

考虑到查询的数量,我试图同时运行这些查询。

我想我已经接近了,但是在尝试使用 Ibis 共享与服务器的连接以及我启动的多个进程时,我遇到了一个问题。

我对 Python 很陌生,但我会尽我所能清楚地解释我的问题,并使用正确的词汇。如有任何错误,请提前原谅我......!

如何提交查询

对于提交我的查询,代码如下所示:

  • 连接到数据库:

hdfs = ibis.hdfs_connect(host='XXXX', port=Y) client = ibis.impala.connect(host='XXXX',port=Y,hdfs_client=hdfs)

  • 创建查询(多次完成):

查询=“选择...从...在哪里...”

  • 发送查询并检索结果(为每个查询完成):

查询 = self.client.sql(query) 数据 = query.execute(limit = None)

为同时运行这些查询所做的工作

现在,我已经使用多处理创建了一个 Process 类,并且我正在向它传递可以启用连接的客户端参数(至少,我认为),以及一个包含配置要运行的查询所需的信息的列表服务器:

import multiprocessing

class ParallelDataRetrieving(multiprocessing.Process):

    """Process in charge of retrieving the data."""
    
    def __init__(self,client,someInformations):
    
    multiprocessing.Process.__init__(self)
    
    self.client = client
    self.someInformations = someInformations
    
def run(self):
    
    """Code to run during the execution of the process."""
    
    cpt1 = 0
    
    while cpt1 < len(someInformations):
        
        query = Use someInformations[cpt1] to create the query.
    
    query = self.client.sql(query)
    data = query.execute(limit = None)

    Some work on the data...
    
    return 0

然后,从主脚本中,我(尝试)建立连接,并使用此连接启动几个进程:

hdfs = ibis.hdfs_connect(host='X.X.X.X', port=Y)
client = ibis.impala.connect(host='X.X.X.X',port=Y,hdfs_client=hdfs)

process_1 = ParallelDataRetrieving(client,someInformations)
process_1.start()
process_2 = ...

但是这段代码不起作用。我收到错误“TypeError:无法腌制 _thread.lock 对象”。

据我了解,这是因为多处理使用 Pickle 来“封装”参数,并将它们传输到进程(其内存在 Windows 上单独运行)。而且似乎不可能腌制“客户端”参数。

然后我在互联网上找到了几个试图解决这个问题的想法,但它们似乎都不适用于我的特殊情况(宜必思,黑斑羚......):

  • 我试图直接在 Process 对象的 run 方法中创建连接(这意味着每个进程一个连接):这导致“BrokenPipeError:[Errno 32] Broken pipe”

  • 我尝试使用multiprocessing.sharedctypes.RawValue,但如果这是正确的解决方案,我不太有信心在我的代码中正确实现它......

这几乎是我目前的情况。我将继续尝试解决这个问题,但作为 Python 的一种“新人”,以及数据库查询的多处理,我认为更高级的用户可能会帮助我!

预先感谢您花时间处理此请求!

4

0 回答 0