1

这里是 rust 和异步编程的初学者。

我有一个功能可以在数据库中下载并存储一堆推文:

pub async fn process_user_timeline(config: &Settings, pool: &PgPool, user_object: &Value) {
    // get timeline
    if let Ok((user_timeline, _)) =
        get_user_timeline(config, user_object["id"].as_str().unwrap()).await
    {
        // store tweets
        if let Some(tweets) = user_timeline["data"].as_array() {
            for tweet in tweets.iter() {
                store_tweet(pool, &tweet, &user_timeline, "normal")
                    .await
                    .unwrap_or_else(|e| {
                        println!(
                            ">>>X>>> failed to store tweet {}: {:?}",
                            tweet["id"].as_str().unwrap(),
                            e
                        )
                    });
            }
        }
    }
}

它被另一个函数在异步循环中调用:

pub async fn loop_until_hit_rate_limit<'a, T, Fut>(
    object_arr: &'a [T],
    settings: &'a Settings,
    pool: &'a PgPool,
    f: impl Fn(&'a Settings, &'a PgPool, &'a T) -> Fut + Copy,
    rate_limit: usize,
) where
    Fut: Future,
{
    let total = object_arr.len();
    let capped_total = min(total, rate_limit);

    let mut futs = vec![];
    for (i, object) in object_arr[..capped_total].iter().enumerate() {
        futs.push(async move {
            println!(">>> PROCESSING {}/{}", i + 1, total);
            f(settings, pool, object).await;
        });
    }
    futures::future::join_all(futs).await;
}

有时两个异步任务会尝试同时插入相同的推文,从而产生以下错误:

failed to store tweet 1398307091442409475: Database(PgDatabaseError { severity: Error, code: "23505", message: "duplicate key value violates unique constraint \"tweets_tweet_id_key\"", detail: Some("Key (tweet_id)=(1398307091442409475) already exists."), hint: None, position: None, where: None, schema: Some("public"), table: Some("tweets"), column: None, data_type: None, constraint: Some("tweets_tweet_id_key"), file: Some("nbtinsert.c"), line: Some(656), routine: Some("_bt_check_unique") })

请注意,代码在插入之前已经检查是否存在推文,因此这只发生在以下情况下:从任务 1 读取 > 从任务 2 读取 > 从任务 1 写入(成功)> 从任务 2 写入(错误)。

为了解决这个问题,到目前为止,我最好的尝试是放置一个unwrap_or_else()子句,让其中一个任务失败,而不会在整个执行过程中惊慌失措。我知道至少有一个缺点——有时两项任务都会退出,推文永远不会被写出来。它发生在<1%的情况下,但它确实发生了。

我不知道的方法还有其他缺点吗?

处理这个问题的正确方法是什么?我讨厌丢失数据,更糟糕的是,这样做是不确定的。

PS我正在使用actix websqlx作为我的网络服务器/数据库库。

4

1 回答 1

1

通常对于可能由多个线程/进程编写的任何内容,任何逻辑,如

if (!exists) {
  writeValue()
}

需要被某种锁保护,或者代码需要更改为原子写入,写入可能会失败,因为已经写入了其他内容。

对于 Rust 中的内存数据,您将使用它Mutex来确保您可以在其他任何内容读取数据之前读取然后将数据写回,或者Atomic以这样一种方式修改数据,即如果已经写入了数据,您可以检测到它。

在数据库中,对于可能与同时发生的其他查询发生冲突的任何查询,您需要ON CONFLICT在查询中使用一个子句,以便数据库本身知道当它尝试写入数据并且它已经存在时该做什么。

对于您的情况,因为我猜推文是不可变的,您可能想要这样做ON CONFLICT tweet_id DO NOTHING(或任何您的 ID 列),在这种情况下,INSERT如果已经有一条带有您正在插入的 ID 的推文,则将跳过插入,并且它不会抛出错误。

于 2021-06-08T16:47:13.967 回答