2

我正在尝试使用actix-websqlx设置一个 Web 应用程序,我可以在其中进行具有自己的 Web 服务器和数据库事务的测试。我尝试设置我的服务器创建,使其接受数据库(Postgres)池或使用Executor特征的事务。虽然我在编译应用程序代码和测试时遇到了一些问题:

// main.rs

use std::net::TcpListener;

use actix_web::dev::Server;
use actix_web::{web, App, HttpServer, Responder};
use sqlx::PgPool;

async fn create_pool() -> PgPool {
    PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
        .await
        .expect("Failed to create pool")
}

async fn index() -> impl Responder {
    "Hello World!"
}

pub fn create_server<'a, E: 'static>(
    listener: TcpListener,
    pool: E,
) -> Result<Server, std::io::Error>
where
    E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
    let server = HttpServer::new(move || App::new().data(pool).route("/", web::get().to(index)))
        .listen(listener)?
        .run();
    Ok(server)
}

pub async fn server(pool: PgPool) -> std::io::Result<()> {
    const PORT: usize = 8088;
    let listener =
        TcpListener::bind(format!("0.0.0.0:{}", PORT)).expect("Failed to create listener");

    println!("Running on port {}", PORT);

    create_server(listener, pool).unwrap().await
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let pool = create_pool().await;
    server(pool).await;
    Ok(())
}

#[cfg(test)]
pub mod tests {
    use super::*;
    use std::net::TcpListener;

    #[actix_rt::test]
    async fn test_foo() {
        let pool = create_pool().await;
        let mut transaction = pool.begin().await.expect("Failed to create transaction");

        let listener = TcpListener::bind("0.0.0.0:0").expect("Failed to create listener");
        let server = create_server(listener, &mut transaction).expect("Failed to create server");
        tokio::spawn(server);
    }
}
# Cargo.toml

[package]
name = "sqlx-testing"
version = "0.1.0"
authors = ["Oskar"]
edition = "2018"

[dependencies]
actix-rt = "1.1.1"
actix-web = "3.3.2"
sqlx = { version = "0.4.2", default-features = false, features = ["postgres", "runtime-async-std-native-tls"] }
tokio = "0.2.22"

编译输出

error[E0277]: the trait bound `Pool<Postgres>: Executor<'_>` is not satisfied
  --> src\main.rs:37:29
   |
17 | pub fn create_server<'a, E: 'static>(
   |        ------------- required by a bound in this
...
22 |     E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
   |        --------------------------------------------- required by this bound in `create_server`
...
37 |     create_server(listener, pool).unwrap().await
   |                             ^^^^ the trait `Executor<'_>` is not implemented for `Pool<Postgres>`
   |
   = help: the following implementations were found:
             <&Pool<DB> as Executor<'p>>

error[E0277]: the trait bound `Pool<Postgres>: Copy` is not satisfied
  --> src\main.rs:37:29
   |
17 | pub fn create_server<'a, E: 'static>(
   |        ------------- required by a bound in this
...
22 |     E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
   |                                                        ---- required by this bound in `create_server`
...
37 |     create_server(listener, pool).unwrap().await
   |                             ^^^^ the trait `Copy` is not implemented for `Pool<Postgres>`
4

2 回答 2

8

试图对 Executor 特征进行通用化有点矫枉过正。您可能应该只在测试中使用大小为 1 的池并手动调用Beginand ROLLBACK

#[actix_rt::test]
async fn test_endpoint() {
    // build with only one connection
    let pool = PgPoolOptions::new()
        .max_connections(1)
        .connect("postgres://postgres:postgres@localhost:5432/postgres")
        .await
        .expect("pool failed");

    sqlx::query("BEGIN")
        .execute(&pool)
        .await
        .expect("BEGIN failed");
    let saved_pool = pool.clone();
    let listener = TcpListener::bind("0.0.0.0:0").expect("Failed to create listener");
    let server = HttpServer::new(move || 
    App::new().data(pool.clone()).service(one))
            .listen(listener)
            .expect("fail to bind")
            .run();
    tokio::spawn(server);

    // your test

    sqlx::query("ROLLBACK")
        .execute(&saved_pool)
        .await
        .expect("ROLLBACK failed");
}

这样您就不必更改代码来处理您的测试

// main.rs
use actix_web::{get, web, App, HttpServer, Responder};
use sqlx::{postgres::PgPool, Row};
use std::net::TcpListener;

#[get("/one")]
async fn one(pool: web::Data<PgPool>) -> impl Responder {
    let row = sqlx::query("select 1 as id")
        .fetch_one(pool.get_ref())
        .await
        .unwrap();
    let one: i32 = row.try_get("id").unwrap();
    format!("{:?}", one)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
        .await
        .expect("Failed to create pool");
    const PORT: usize = 8088;
    let listener =
        TcpListener::bind(format!("0.0.0.0:{}", PORT)).expect("Failed to create listener");

    println!("Running on port {}", PORT);
    HttpServer::new(move || App::new().data(pool.clone()).service(one))
        .listen(listener)?
        .run()
        .await
}

#[cfg(test)]
pub mod tests {
    use super::*;
    use sqlx::postgres::PgPoolOptions;

    #[actix_rt::test]
    async fn test_endpoint() {
        // build with only one connection
        let pool = PgPoolOptions::new()
            .max_connections(1)
            .connect("postgres://postgres:postgres@localhost:5432/postgres")
            .await
            .expect("pool failed");

        sqlx::query("BEGIN")
            .execute(&pool)
            .await
            .expect("BEGIN failed");

        let saved_pool = pool.clone();

        let listener = TcpListener::bind("0.0.0.0:0").expect("Failed to create listener");
        let server = HttpServer::new(move || App::new().data(pool.clone()).service(one))
            .listen(listener)
            .expect("fail to bind")
            .run();
        tokio::spawn(server);

        // your test

        sqlx::query("ROLLBACK")
            .execute(&saved_pool)
            .await
            .expect("ROLLBACK failed");
    }

    #[actix_rt::test]
    async fn test_rollback() {
        let pool = PgPoolOptions::new()
            .max_connections(1)
            .connect("postgres://postgres:postgres@localhost:5432/postgres")
            .await
            .expect("pool failed");

        sqlx::query("BEGIN")
            .execute(&pool)
            .await
            .expect("BEGIN failed");

        sqlx::query("CREATE TABLE  IF NOT EXISTS test (id SERIAL, name TEXT)")
            .execute(&pool)
            .await
            .expect("CREATE TABLE test failed");

        sqlx::query("INSERT INTO test (name) VALUES ('bob')")
            .execute(&pool)
            .await
            .expect("INSERT test failed");

        let count: i64 = sqlx::query("SELECT COUNT(id) as count from test")
            .fetch_one(&pool)
            .await
            .expect("SELECT COUNT test failed")
            .try_get("count")
            .unwrap();
        sqlx::query("ROLLBACK")
            .execute(&pool)
            .await
            .expect("ROLLBACK failed");

        assert_eq!(count, 1);
    }

    #[actix_rt::test]
    async fn test_no_rollback() {
        let pool = PgPoolOptions::new()
            .max_connections(1)
            .connect("postgres://postgres:postgres@localhost:5432/postgres")
            .await
            .expect("pool failed");

        sqlx::query("CREATE TABLE  IF NOT EXISTS test2 (id SERIAL, name TEXT)")
            .execute(&pool)
            .await
            .expect("CREATE TABLE test failed");

        sqlx::query("INSERT INTO test2 (name) VALUES ('bob')")
            .execute(&pool)
            .await
            .expect("INSERT test failed");

        let count: i64 = sqlx::query("SELECT COUNT(id) as count from test2")
            .fetch_one(&pool)
            .await
            .expect("SELECT COUNT failed")
            .try_get("count")
            .unwrap();

        // this will failed the second time you run your test
        assert_eq!(count, 1);
    }
}
于 2020-12-22T01:18:58.297 回答
5

尝试创建一个可以同时接受两者的通用请求处理程序,但PgPool事实&mut Transaction证明这太具有挑战性了。幸运的是,您可以通过将实例限制为 1 个连接并在将其传递给任何处理程序之前PgPool执行查询来使实例表现得好像它是一个事务:BEGIN

async fn get_transaction_pool() -> PgPool {
    let pool = PgPoolOptions::new()
         .max_connections(1)
         .connect("postgres://postgres:postgres@localhost:5432/postgres")
         .await
         .expect("Failed to create test pool.");

    sqlx::query("BEGIN")
         .execute(&pool)
         .await
         .expect("Failed to BEGIN transaction.");

    pool
}

我发现将上述内容抽象为自己的TestTransaction结构很有用,如下所示:

struct TestTransaction {
    pool: web::Data<PgPool>,
}

impl TestTransaction {
    async fn begin() -> Self {
        let pool = PgPoolOptions::new()
            .max_connections(1)
            .connect("postgres://postgres:postgres@localhost:5432/postgres")
            .await
            .expect("Failed to connect to test pool.");

        sqlx::query("BEGIN")
            .execute(&pool)
            .await
            .expect("Failed to BEGIN transaction.");

        TestTransaction {
            pool: web::Data::new(pool),
        }
    }

    fn get_pool(&self) -> web::Data<PgPool> {
        self.pool.clone()
    }

    async fn rollback(&self) {
        sqlx::query("ROLLBACK")
            .execute(self.pool.as_ref())
            .await
            .expect("Failed to ROLLBACK transaction.");
    }
}

此外,您不需要HttpServer在每个单元测试中开始,您可以直接按照这个简单的模板测试处理程序:

#[actix_rt::test]
async fn test_case() {
    let tx = TestTransaction::begin().await;

    let response = request_handler_func(tx.get_pool()).await;
    assert_eq!(response, "some expected value here");

    tx.rollback().await;
}

这是完整main.rs的一些评论:

use actix_web::{web, App, HttpServer};
use sqlx::{PgPool, Row};
use std::net::TcpListener;

async fn create_item(pool: web::Data<PgPool>) -> String {
    let id = sqlx::query("INSERT INTO items (label) VALUES ('label text') RETURNING id")
        .fetch_one(pool.as_ref())
        .await
        .expect("Failed to create item.")
        .get::<i64, _>("id");
    format!("created item with id {}", id)
}

async fn count_items(pool: web::Data<PgPool>) -> String {
    let count = sqlx::query("SELECT count(*) FROM items")
        .fetch_one(pool.as_ref())
        .await
        .expect("Failed to fetch item count.")
        .get::<i64, _>("count");
    format!("{} items in db", count)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
        .await
        .expect("Failed to create pool.");

    sqlx::query("CREATE TABLE IF NOT EXISTS items (id BIGSERIAL PRIMARY KEY, label TEXT)")
        .execute(&pool)
        .await
        .expect("Failed to create items table.");

    let listener = TcpListener::bind("0.0.0.0:8080").expect("Failed to create listener");

    println!("Listening on http://localhost:8080");
    println!("Try endpoints GET /create-item & GET /count-items");

    HttpServer::new(move || {
        App::new()
            .data(pool.clone())
            .route("/create-item", web::get().to(create_item))
            .route("/count-items", web::get().to(count_items))
    })
    .listen(listener)?
    .run()
    .await
}

#[cfg(test)]
pub mod tests {
    use super::*;
    use sqlx::postgres::PgPoolOptions;

    struct TestTransaction {
        pool: web::Data<PgPool>,
    }

    impl TestTransaction {
        async fn begin() -> Self {
            let pool = PgPoolOptions::new()
                .max_connections(1)
                .connect("postgres://postgres:postgres@localhost:5432/postgres")
                .await
                .expect("Failed to create test pool.");

            sqlx::query("BEGIN")
                .execute(&pool)
                .await
                .expect("Failed to BEGIN transaction.");

            // below 2 queries are necessary so that tests are always
            // run from within the same environment conditions, i.e.
            // the items table should be empty

            sqlx::query("DROP TABLE IF EXISTS items")
                .execute(&pool)
                .await
                .expect("Failed to drop test items table.");

            sqlx::query("CREATE TABLE IF NOT EXISTS items (id BIGSERIAL PRIMARY KEY, label TEXT)")
                .execute(&pool)
                .await
                .expect("Failed to create test items table.");

            TestTransaction {
                pool: web::Data::new(pool),
            }
        }

        fn get_pool(&self) -> web::Data<PgPool> {
            self.pool.clone()
        }

        async fn rollback(&self) {
            sqlx::query("ROLLBACK")
                .execute(self.pool.as_ref())
                .await
                .expect("Failed to ROLLBACK transaction.");
        }
    }

    // all tests below are run in parallel and are
    // isolated within their own transaction instances

    #[actix_rt::test]
    async fn create_and_count_1_items() {
        let tx = TestTransaction::begin().await;

        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 1");

        let response = count_items(tx.get_pool()).await;
        assert_eq!(response, "1 items in db");

        tx.rollback().await;
    }

    #[actix_rt::test]
    async fn create_and_count_2_items() {
        let tx = TestTransaction::begin().await;

        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 1");
        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 2");

        let response = count_items(tx.get_pool()).await;
        assert_eq!(response, "2 items in db");

        tx.rollback().await;
    }

    #[actix_rt::test]
    async fn create_and_count_3_items() {
        let tx = TestTransaction::begin().await;

        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 1");
        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 2");
        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 3");

        let response = count_items(tx.get_pool()).await;
        assert_eq!(response, "3 items in db");

        tx.rollback().await;
    }
}

您当然可以运行测试,cargo test但您也可以cargo run在浏览器中运行和访问端点:

  • http://localhost:8080/create-item
  • http://localhost:8080/count-items

尽管这些端点修改了数据库,但如果您关闭服务器并尝试cargo test再次运行,测试仍然会通过!这是因为该TestTransaction结构有效地截断了函数中的items表,begin这使得所有单元测试都可以重现,而不管数据库中实际存在什么,并且它在回滚的事务中安全地执行它,因此数据库本身中没有数据被修改。

于 2020-12-22T22:24:16.750 回答