-2

我正在对某些功能进行测试。我有一个使用数据库查询的函数。所以,我浏览了一些博客和文档,这些博客和文档说我们必须在内存或测试数据库中创建才能使用这些功能。下面是我的功能,

def already_exists(story_data,c):
    # TODO(salmanhaseeb): Implement de-dupe functionality by checking if it already
    # exists in the DB.
    c.execute("""SELECT COUNT(*) from posts where post_id = ?""", (story_data.post_id,))
    (number_of_rows,)=c.fetchone()
    if number_of_rows > 0:
        return True
    return False

此函数命中生产数据库。我的问题是,在测试时,我创建了一个内存数据库并在那里填充我的值,我将查询该数据库(测试数据库)。但我想测试我的already_exists()函数,在从测试中调用我的already_exists函数后,我的生产数据库将被命中。测试此功能时如何使我的测试数据库命中?

4

2 回答 2

3

您可以采取两种方法来解决此问题:

  1. 进行集成测试而不是单元测试,并使用真实数据库的副本
  2. 为方法提供一个假而不是实际的连接对象。

你应该做哪一个取决于你想要达到的目标。

如果您想测试查询本身是否有效,那么您应该使用集成测试。句号。确保查询按预期进行的唯一方法是使用已在数据库副本中的测试数据运行它。针对不同的运行它数据库技术(例如,当您的生产数据库在 PostgreSQL 中时针对 SQLite 运行)将不能确保它在生产中工作。需要数据库的副本意味着您将需要一些自动化的部署过程,可以轻松地针对单独的数据库调用。无论如何,您应该拥有这样一个自动化过程,因为它有助于确保跨环境的部署是一致的,允许您在发布之前对其进行测试,并“记录”升级数据库的过程。对此的标准解决方案是使用您的编程语言(如albemic )编写的迁移工具或执行原始 SQL 的工具(如yoyoFlyway ). 您需要在运行测试之前调用部署并用测试数据填充它,然后运行测试并断言您希望返回的输出。

如果您想测试查询周围的代码而不是查询本身,那么您可以使用假的连接对象。最常见的解决方案是mock。模拟提供了可以配置为接受函数调用和输入并返回一些输出来代替真实对象的替身。这将允许您测试方法的逻辑是否正常工作,假设查询返回您期望的结果。对于您的方法,这样的测试可能如下所示:

from unittest.mock import Mock

...

def test_already_exists_returns_true_for_positive_count():
    mockConn = Mock(
        execute=Mock(),
        fetchone=Mock(return_value=(5,)),
    )
    story = Story(post_id=10) # Making some assumptions about what your object might look like.

    result = already_exists(story, mockConn)

    assert result

    # Possibly assert calls on the mock. Value of these asserts is debatable.
    mockConn.execute.assert_called("""SELECT COUNT(*) from posts where post_id = ?""", (story.post_id,))
    mockConn.fetchone.assert_called()
于 2018-09-16T19:04:17.347 回答
-1

问题在于确保您的代码始终使用相同的数据库连接。然后,您可以将其设置为适合当前环境的任何内容。

与其在方法之间传递数据库连接,不如将其设为单例可能更有意义。

def already_exists(story_data):
    # Here `connection` is a singleton which returns the database connection.
    connection.execute("""SELECT COUNT(*) from posts where post_id = ?""", (story_data.post_id,))
    (number_of_rows,) = connection.fetchone()
    if number_of_rows > 0:
        return True
    return False

或者connection在每个类上做一个方法,然后already_exists变成一个方法。无论如何,它可能应该是一种方法。

def already_exists(self):
    # Here the connection is associated with the object.
    self.connection.execute("""SELECT COUNT(*) from posts where post_id = ?""", (self.post_id,))
    (number_of_rows,) = self.connection.fetchone()
    if number_of_rows > 0:
        return True
    return False

但实际上你不应该自己滚动这段代码。相反,您应该使用诸如SQLAlchemy之类的ORM ,它会为您处理类似这样的基本查询和连接管理。它有一个连接,即“会话”

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from sqlalchemy_declarative import Address, Base, Person

engine = create_engine('sqlite:///sqlalchemy_example.db')
Base.metadata.bind = engine

DBSession = sessionmaker(bind=engine)
session = DBSession()

然后你用它来进行查询。例如,它有一个exists方法

session.query(Post.id).filter(q.exists()).scalar()

使用 ORM 将大大简化您的代码。这是一个简短的基础教程,还有一个更长更完整的教程

于 2018-09-12T17:32:04.220 回答