0

我正在尝试利用 Python 的异步功能编写代码。我有一个数据库连接类,其中有用于(断开)连接数据库以及获取数据的代码。现在我想使用基于一个标识符的 fetch data 方法异步获取数据。代码如下图:

import pyexasol
import pandas as pd
import logging
from typing import Iterable
import asyncio
import tqdm


class Exa(object):
    def __init__(self, dsn: str = '1.2.3.4',
                 user: str = os.environ['UID'],
                 password: str = os.environ['PWD']):
        self.__dsn = dsn
        self.__user = user
        self.__password = password
        self.conn = None

    def __connect(self):
        if self.conn is None:
            try:
                self.conn = pyexasol.connect(dsn=self.__dsn, user=self.__user,
                    password=self.__password, encryption=True)
            except Exception as e:
                logging.error(f"Error in connecting with Exasol. Error is: {e}")

    def __disconnect(self):
        if self.conn is not None:
            try:
                self.conn.close()
            except Exception as e:
                logging.error(f"Exception in disconnecting DB. Error is {e}")
            self.conn = None

    def fetch(self, query: str, leave_connection_open: bool = False) -> pd.DataFrame:
        # connect and execute the query
        self.__connect()
        try:
            res = self.conn.export_to_pandas(query)
            res.columns = res.columns.str.lower()
        except Exception as e:
            self.__disconnect()
            return pd.DataFrame()
        if not leave_connection_open:
            self.__disconnect()
        return res

    def fetch_batch(self, pattern: str, replacement: Iterable,
                    query: str, batchsize: int = 5000) -> pd.DataFrame:
        res = asyncio.run(self._fetch_batch(pattern=pattern, replacement=replacement,
                                            query=query, batchsize=batchsize))
        return res

    async def _fetch_batch(self, pattern: str, replacement: Iterable,
                           query: str, batchsize: int = 5000) -> pd.DataFrame:

        replacement = list(replacement)
        # breaking into batches
        if any(isinstance(i, str) for i in replacement):
            batches = ["'" + "','".join(replacement[i:i + batchsize]) + "'"
                       for i in range(0, len(replacement), batchsize)]
        else:
            batches = [",".join(replacement[i:i + batchsize])
                       for i in range(0, len(replacement), batchsize)]
        # connecting and executing query in batches
        nbatches = len(batches)
        self.__connect()
        try:
            tasks = [self.__run_batch_query(query=query.replace(pattern, batches[i]),
                                            i=i, nbatches=nbatches) for i in range(nbatches)]
            # progress bar
            res = [await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))]
        except Exception as e:
            logging.error("Could not fetch batches of data. Error is: %s", e)
        '''finally:
            self.__disconnect()'''
        # dataframe concatenation
        res = pd.concat(res)
        res.columns = res.columns.str.lower()
        return res

    async def __run_batch_query(self, query: str,
                                i: int, nbatches: int) -> pd.DataFrame:
        logging.info("Fetching %d/%d", i + 1, nbatches)
        async with self.fetch(query=query, leave_connection_open=True) as resp:
            raw = await resp
        return raw

我正在运行此代码:

from foo import Exa
db = Exa()

ids = db.fetch('select id from application limit 100')
ids1 = db.fetch_batch(pattern='IDS',
                         replacement=ids['id'],
                         query='select id from application where id in (IDS)',
                         batchsize=25)

但后来我得到如下错误:

ERROR:root:Could not fetch batches of data. Error is: __aexit__
Traceback (most recent call last):
  File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
    async with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __aexit__

此外,如果我将__run_batch_query()方法调用更改为self.fetch()方法而没有async错误更改为:

ERROR:root:Could not fetch batches of data. Error is: __enter__
Traceback (most recent call last):
  File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
    with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __enter__

如果这里有错误,请帮忙指出错误?

4

1 回答 1

1

pyexasol 创造者在这里。

请注意,asyncio 不会为与 Exasol 相关的场景提供任何好处。Ayncio 在单个 CPU 上运行并使用单个网络连接,这会阻止任何有意义的扩展。

从 Exasol 服务器加载数据的最有效方法是:

  • export_to_pandas()export_to_callback()用于单个 Python 进程;
  • export_parallel()+http_transport()用于多个 Python 进程;

请查看HTTP Transport (parallel)手册页以获取解释和示例。这种方法可以线性扩展,您甚至可以在多个服务器上运行计算任务。

对于简单的场景,compression=True如果您通过慢速(例如 WiFi)网络传输大量数据,您可以考虑连接选项。

于 2020-08-09T13:59:37.353 回答