1

今天我花了很多时间慢慢学习 Postgres,我一直在创建一个代码来处理数据库的一些事情,比如插入、选择等。

我已经意识到我的大部分代码在连接和断开连接时都是复制粘贴,我知道有些人不喜欢它也取决于我在做什么之前人们生我的气,不要把它当作坏代码但我当然想改进的更多<3

到目前为止我所做的是:

import psycopg2
import psycopg2.extras
from loguru import logger

DATABASE_CONNECTION = {
    "host": "TEST",
    "database": "TEST",
    "user": "TEST",
    "password": "TEST"
}

def register_datas(store, data):
    """
    Register a data to database
    :param store:
    :param data:
    :return:
    """

    ps_connection = psycopg2.connect(**DATABASE_CONNECTION)
    ps_cursor = ps_connection.cursor()
    ps_connection.autocommit = True

    sql_update_query = "INSERT INTO public.store_items (store, name) VALUES (%s, %s);"

    try:

        data_tuple = (store, data["name"])

        ps_cursor.execute(sql_update_query, data_tuple)

        has_registered = ps_cursor.rowcount

        ps_cursor.close()
        ps_connection.close()

        return bool(has_registered)

    except (Exception, psycopg2.DatabaseError) as error:
        logger.exception("Error: %s" % error)
        ps_connection.rollback()
        ps_cursor.close()
        ps_connection.close()
        return False

def get_all_keywords(keywords):
    """
    Get all keywords
    :param positive_or_negative:
    :return:
    """

    ps_connection = psycopg2.connect(**DATABASE_CONNECTION)
    ps_cursor = ps_connection.cursor(cursor_factory=psycopg2.extras.DictCursor)

    sql_update_query = "SELECT keyword FROM public.keywords WHERE filter_type = %s;"

    try:

        data_tuple = (keywords,)
        ps_cursor.execute(sql_update_query, data_tuple)

        all_keywords = [keyword["keyword"] for keyword in ps_cursor]

        ps_cursor.close()
        ps_connection.close()

        return all_keywords

    except (Exception, psycopg2.DatabaseError) as error:
        logger.exception("Error: %s" % error)
        ps_connection.rollback()
        ps_cursor.close()
        ps_connection.close()
        return []


def check_if_store_exists(store):
    """
    Check if the store exists in database
    :param store:
    :return:
    """

    ps_connection = psycopg2.connect(**DATABASE_CONNECTION)
    ps_cursor = ps_connection.cursor()

    sql_update_query = "SELECT store FROM public.store_config WHERE store = %s;"

    try:

        data_tuple = (store,)
        ps_cursor.execute(sql_update_query, data_tuple)

        exists = bool(ps_cursor.fetchone())

        ps_cursor.close()
        ps_connection.close()

        return exists

    except (Exception, psycopg2.DatabaseError) as error:
        logger.exception("Error: %s" % error)
        ps_connection.rollback()
        ps_cursor.close()
        ps_connection.close()
        return []

我确实看到我有相同的代码:

ps_connection = psycopg2.connect(**DATABASE_CONNECTION)
ps_cursor = ps_connection.cursor()
...
...
...
...
ps_cursor.close()
ps_connection.close()
return data

为了缩短这段代码,我想知道是否可以执行一个函数,我调用该函数connects -> lets me do the query/execution -> close the connection然后返回我想要返回的数据?

4

1 回答 1

2

上下文管理器

Python 内置了一个模式。它被命名为上下文管理器。它有两个目的:

  • with在块中的某些逻辑之前和之后执行操作。
  • 捕获with块内的错误并允许以自定义方式处理它。

要创建上下文管理器,您可以采用两种方式之一。一个(我更喜欢它)是定义一个满足以下协议的类

  • __enter__(self)
  • __exit__(self, exc_type, exc_value, traceback)

任何满足协议的类都可以在with语句中使用并用作上下文管理器。根据 Python 的鸭子类型原则,解释器“知道”如何在with语句中使用类。

例子:

class QuickConnection:  
    def __init__(self):
        self.ps_connection = psycopg2.connect(**DATABASE_CONNECTION)
        self.ps_cursor = ps_connection.cursor(cursor_factory=psycopg2.extras.DictCursor)

    def __enter__(self):
        return self.ps_cursor

    def __exit__(self, err_type, err_value, traceback):
        if err_type and err_value:
            self.ps_connection.rollback()
        self.ps_cursor.close()
        self.ps_connection.close()
        return False

方法的返回值__exit__确实很重要。如果True返回,则所有错误都发生在被with抑制的块中。如果False返回,则在__exit__执行结束时引发错误。的返回值__exit__最好保持明确,因为特性本身并不那么明显。

或者使用contextlib.contextmanager装饰器

from contextlib import contextmanager

@contextmanager
def quick_connection():
    ps_connection = psycopg2.connect(**DATABASE_CONNECTION)
    ps_cursor = ps_connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
    try:
        yield ps_cursor
    except Exception: # don't do this, catch specific errors instead.
        ps_connection.rollback()
        raise
    finally:
        ps_cursor.close()
        ps_connection.close()

用法

with QuickConnection() as ps_cursor:
    data_tuple = (store, data["name"])
    ps_cursor.execute(sql_update_query, data_tuple)
    has_registered = ps_cursor.rowcount

使用上下文管理器,您可以重用连接,确保它已关闭。此外,您可以在上下文管理器中捕获和处理与数据库操作相关的错误。上下文管理器的使用是紧凑和可读的,我相信最终这就是目标。

于 2020-11-28T20:02:02.063 回答