1

我正在尝试HashMap使用该Arc<Mutex<T>>模式作为受Rust 食谱启发的网站抓取练习的一部分来写信。

第一部分使用tokio运行时。我无法完成正在完成的任务并返回它,HashMap因为它只是挂起。

type Db = Arc<Mutex<HashMap<String, bool>>>;

pub async fn handle_async_tasks(db: Db) -> BoxResult<HashMap<String, bool>> {
    let links = NodeUrl::new("https://www.inverness-courier.co.uk/")
        .await
        .unwrap();

    let arc = db.clone();

    let mut handles = Vec::new();

    for link in links.links_with_paths {
        let x = arc.clone();
        handles.push(tokio::spawn(async move {
            process(x, link).await;
        }));
    }

    //  for handle in handles {
    //     handle.await.expect("Task panicked!");
    //  } < I tried this as well>

    futures::future::join_all(handles).await;

    let readables = arc.lock().await;

    for (key, value) in readables.clone().into_iter() {
        println!("Checking db: k, v ==>{} / {}", key, value);
    }

    let clone_db = readables.clone();

    return Ok(clone_db);
}

async fn process(db: Db, url: Url) {
    let mut db = db.lock().await;
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
        db.insert(url.to_string(), true);
    } else {
        db.insert(url.to_string(), false);
    }
}

async fn check_link(url: &Url) -> BoxResult<bool> {
    let res = reqwest::get(url.as_ref()).await?;
    Ok(res.status() != StatusCode::NOT_FOUND)
}

pub struct NodeUrl {
    domain: String,
    pub links_with_paths: Vec<Url>,
}

#[tokio::main]
async fn main() {
    let db: Db = Arc::new(Mutex::new(HashMap::new()));

    let db = futures::executor::block_on(task::handle_async_tasks(db));
}

我想返回HashMapmain()主线程被阻塞的地方。如何等待所有异步线程进程完成并返回HashMap

4

2 回答 2

2
let links = NodeUrl::new("https://www.some-site.com/.co.uk/").await.unwrap();

这对我来说似乎不是一个有效的 URL。

async fn process(db: Db, url: Url) {
    let mut db = db.lock().await;
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
         db.insert(url.to_string(), true);
    } else {
         db.insert(url.to_string(), false);
    }
}

这是非常有问题的。在整个请求期间,您持有数据库的排他锁。这使您的应用程序有效地串行。默认超时时间reqwest为 30 秒。因此,如果服务器没有响应并且您有很多链接要通过该程序可能看起来只是“挂起”。

您应该只获得尽可能短的数据库锁定 - 只是为了插入:

async fn process(db: Db, url: Url) {
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
         let mut db = db.lock().await;
         db.insert(url.to_string(), true);
    } else {
         let mut db = db.lock().await;
         db.insert(url.to_string(), false);
    }
}

或者更好的是,消除无用的情况:

async fn process(db: Db, url: Url) {
    println!("checking {}", url);
    let valid = check_link(&url).await.is_ok();
    db.lock().await.insert(url.to_string(), valid);
}

最后你没有展示你的main函数,你调用handle_async_tasks或运行其他东西的方式可能有问题。

于 2021-11-29T12:11:07.983 回答
1

我的主要问题是如何处理MutexGuard- 我最终通过使用clone和返回内部值来完成。

不需要使用futures::executor in main:因为我们在 tokio 运行时中,调用.await足以同步访问最终值。

克隆Arc一次就足够了;在将它传递到多线程上下文之前,我已经克隆了它两次。

感谢@orlp 指出与check_link函数有关的错误逻辑。

pub async fn handle_async_tasks() -> BoxResult<HashMap<String, bool>> {
    let get_links = NodeUrl::new("https://www.invernesscourier.co.uk/")
        .await
        .unwrap();

    let db: Db = Arc::new(Mutex::new(HashMap::new()));
    let mut handles = Vec::new();

    for link in get_links.links_with_paths {
        let x = db.clone();

        handles.push(tokio::spawn(async move {
            process(x, link).await;
        }));
    }

    futures::future::join_all(handles).await;

    let guard = db.lock().await;
    let cloned = guard.clone();

    Ok(cloned)
}

#[tokio::main]
async fn main() {
    let db = task::handle_async_tasks().await.unwrap();
    for (key, value) in db.into_iter() {
        println!("Checking db: {} / {}", key, value);
    }
}

这绝不是最好的 Rust 代码,但我想分享我最终是如何解决问题的。

于 2021-11-29T17:55:34.250 回答