5

我想在一个项目中使用 Async MongoDB。

不想绕过客户端,因为它需要绕过多个任务和线程。所以我使用惰性静态保持了一个静态客户端。但是我不能在初始化块中使用 await 。

我能做些什么来解决这个问题?

也欢迎提出在没有lazy_static 的情况下完全不同的建议。

use std::env;
use futures::stream::StreamExt;
use mongodb::{
    bson::{doc, Bson},
    options::ClientOptions,
    Client,
};

lazy_static! {
    static ref MONGO: Option<Client> = {
        if let Ok(token) = env::var("MONGO_AUTH") {
            if let Ok(client_options) = ClientOptions::parse(&token).await
                                                                     ^^^^^
            {
                if let Ok(client) = Client::with_options(client_options) {
                    return Some(client);
                }
            }
        }
        return None;
    };
}
4

3 回答 3

3

我根据 rust 论坛中某人的建议采用了这种方法。

static MONGO: OnceCell<Client> = OnceCell::new();
static MONGO_INITIALIZED: OnceCell<tokio::sync::Mutex<bool>> = OnceCell::new();

pub async fn get_mongo() -> Option<&'static Client> {
    // this is racy, but that's OK: it's just a fast case
    let client_option = MONGO.get();
    if let Some(_) = client_option {
        return client_option;
    }
    // it hasn't been initialized yet, so let's grab the lock & try to
    // initialize it
    let initializing_mutex = MONGO_INITIALIZED.get_or_init(|| tokio::sync::Mutex::new(false));

    // this will wait if another task is currently initializing the client
    let mut initialized = initializing_mutex.lock().await;
    // if initialized is true, then someone else initialized it while we waited,
    // and we can just skip this part.
    if !*initialized {
        // no one else has initialized it yet, so

        if let Ok(token) = env::var("MONGO_AUTH") {
            if let Ok(client_options) = ClientOptions::parse(&token).await {
                if let Ok(client) = Client::with_options(client_options) {
                    if let Ok(_) = MONGO.set(client) {
                        *initialized = true;
                    }
                }
            }
        }
    }
    drop(initialized);
    MONGO.get()
}
于 2020-06-13T08:33:53.407 回答
2

创建一个新的运行时tokio::runtime::Runtime并用于block_on阻塞当前线程直到完成。

// database.rs
use tokio::runtime::Runtime;
use mongodb::Client;

pub fn connect_sync() -> Client {
    Runtime::new().unwrap().block_on(async {
        Client::with_uri_str("mongodb://localhost:27017").await.unwrap()
    })
}
// main.rs
mod database;

lazy_static! {
    static ref CLIENT: mongodb::Client =  database::connect_sync();
}

#[actix_web::main]
async fn main() {
   let collection = &CLIENT.database("db_name").collection("coll_name");
   // ...
}
于 2021-05-28T09:15:01.207 回答
1

但是我不能在初始化块中使用 await 。

你可以用futures::executor::block_on

use once_cell::sync::Lazy;
// ...
static PGCLIENT: Lazy<Client> = Lazy::new(|| {
    let client: Client = futures::executor::block_on(async {
        let (client, connection) = tokio_postgres::connect(
            "postgres:///?user=ecarroll&port=5432&host=/run/postgresql",
            NoTls,
        )
        .await
        .unwrap();

        tokio::spawn(async move {
            if let Err(e) = connection.await {
                eprintln!("connection error: {}", e);
            }
        });

        client
    });
    client
});

我们所拥有的是一个非异步闭包阻塞在单个线程中,直到未来的解决。

于 2021-05-02T07:41:32.720 回答