8

背景:

我无法sqlxjuniper订阅集成。

Pin<Box<dyn Stream<Item = Result<User, sqlx::Error>> + 'e + Send>>sqlx::query::QueryAs::fetch().

juniper需要订阅返回为Pin<Box<dyn Stream<Item = Result<User, juniper::FieldError>> + Send>>.

注意从Result<User, sqlx::Error>到的变化Result<User, juniper::FieldError>。使用map_err()fromfutures::TryStreamExt,我创建了以下代码来执行查询并转换错误类型。

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
    async fn users(context: &Context) -> UsersStream {
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&context.pool)
            .map_err(|e| {
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("{}", e)))
            })
            .boxed()
    }
}

这在编译时失败并出现以下错误:

error[E0759]: `executor` has lifetime `'ref_e` but it needs to satisfy a `'static` lifetime requirement
  --> server/src/graphql/subscription.rs:27:1
   |
27 |   #[juniper::graphql_subscription(Context = Context)]
   |   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |   |
   |   this data with lifetime `'ref_e`...
   |   ...is captured here...
...
63 | /         sqlx::query_as!(User, "SELECT * FROM users")
64 | |             .fetch(&context.pool)
65 | |             .map_err(|e| {
66 | |                 FieldError::new(
...  |
69 | |             })
70 | |             .boxed()
   | |____________________- ...and is required to live as long as `'static` here
   |
   = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

error: aborting due to previous error

我对Streams 或生命周期不够熟悉,无法理解此错误的含义。在进一步研究之后,这似乎ref_e是订阅对juniper's的引用的生命周期Executor

尝试:

版本:

  • sqlx-0.4.1
  • junipercd66bdb固定承诺master
4

2 回答 2

0

Mathieu的回答是正确的。因此,我将解释错误背后的原因。

这里的根本问题是,当您返回时,UsersStream您将数据移出函数fn users(..)。现在函数的调用者users(..)拥有返回的数据,并且(理论上)可以做任何它想做的事情,包括在应用程序的生命周期内保留数据,即赋予数据“静态生命周期”。但是如果UsersStream引用了一个生命周期有限的数据( ),那么当数据的所有者丢弃数据context.pool时会发生什么?pool当数据被删除时,它将引用空指针。所以编译器会抛出错误来防止这种情况发生。

所以你可以在这里做的是以某种方式传递拥有的数据(pool)而不是引用,确保它与现在拥有的数据pool具有相同的生命周期。正是这样做,它创建了一个拥有的数据副本(引用计数或字节副本)。UsersStreamclone()

于 2021-11-05T13:00:31.793 回答
0

您的代码与我的不完全一样,但我认为该解决方案也可以在这里应用,请在使用之前尝试克隆池:

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
    async fn users(context: &Context) -> UsersStream {
        let pool = context.pool.clone();
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&pool)
            .map_err(|e| {
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("{}", e)))
            })
            .boxed()
    }
}
于 2021-11-05T10:25:19.810 回答