1

我正在尝试在 b/w 不同的线程周围移动一些数据,但收到 ole Copy trait-not-implemented 错误。这是一些代码:

use std::future::Future;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};

/// Start external crates mocked here

#[derive(Clone, PartialEq, Eq)]
pub struct DecodeError {
    inner: Box<Inner>,
}
#[derive(Clone, PartialEq, Eq)]
struct Inner {}
#[derive(Clone)]
pub struct Connection {}

pub trait Message: core::fmt::Debug + Send + Sync {
    fn decode<B>(mut buf: B) -> Result<Self, DecodeError>
    where
        B: bytes::Buf,
        Self: Default,
    {
        // do stuff
        let mut message = Self::default();
        Ok(message)
    }
}

#[derive(Clone, Debug, Default)]
pub struct Request {}
impl Message for Request {}
#[derive(Clone, Debug, Default)]
pub struct Response {}
impl Message for Response {}

pub struct OtherResponse {}
pub enum ReplyError {
    InvalidData,
}
pub struct EventMessage {
    data: Vec<u8>,
}

pub struct Subscription {}

impl Subscription {
    pub async fn next(&self) -> Option<EventMessage> {
        Some(EventMessage { data: vec![] })
    }
}
/// End external crates mocked here

#[derive(Clone)]
pub struct Publisher<T> {
    connection: Connection,
    subject: String,
    resource_type: PhantomData<*const T>,
}

#[derive(Debug)]
pub enum PublishError {
    SerializeError(String),
    PublishError(String),
}

pub type PublishResult<T> = std::result::Result<T, PublishError>;

impl<T: Message> Publisher<T> {
    pub fn new(connection: Connection, subject: String) -> Self {
        let resource_type = PhantomData;

        Publisher {
            connection: connection,
            subject,
            resource_type,
        }
    }
    pub async fn publish(&self, msg: T) -> PublishResult<()>
    where
        T: Message,
    {
        // do stuff to msg
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let node = Node::new("127.0.0.1", "node".into())
        .await
        .expect("connecting to NATS");
    let p: Publisher<Request> = node.get_publisher("TOPIC".into());
    let _submission_replyer: AsynkReplyer<Request, Response> = node
        .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
            let mut req = req.clone().lock().unwrap();
            p.clone().publish(*req);
            Ok(Response {})
        })
        .await;

    Ok(())
}

pub struct Node {
    name: String,
    connection: Connection,
}

pub type ReplyResult<T> = std::result::Result<T, ReplyError>;

impl Node {
    pub async fn new(_nats_url: &str, name: String) -> std::io::Result<Self> {
        env_logger::init();

        let connection = Connection {};
        Ok(Node { name, connection })
    }

    pub fn get_publisher<T>(&self, subject: String) -> Publisher<T>
    where
        T: Message + Default,
    {
        Publisher::new(self.connection.clone(), subject)
    }

    pub async fn get_replyer<Req, Resp, Fut>(
        &self,
        subject: String,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> AsynkReplyer<Req, Resp>
    where
        Req: Message + Default + 'static,
        Resp: Message + Default,
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        AsynkReplyer::new(&self.connection, subject, callback).await
    }
}

pub struct AsynkReplyer<Req, Resp> {
    request_type: PhantomData<Req>,
    response_type: PhantomData<Resp>,
}

impl<Req: Message + Default + 'static, Resp: Message + Default> AsynkReplyer<Req, Resp> {
    pub async fn new<Fut>(
        connection: &Connection,
        subject: String,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> AsynkReplyer<Req, Resp>
    where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        Self::start_subscription_handler(Subscription {}, callback).await;

        AsynkReplyer {
            request_type: PhantomData,
            response_type: PhantomData,
        }
    }

    pub async fn start_subscription_handler<Fut>(
        subscription: Subscription,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        tokio::spawn(async move {
            loop {
                match subscription.next().await {
                    Some(msg) => {
                        Self::handle_request(msg, callback).await;
                    }
                    None => {
                        break;
                    }
                }
            }
        });
    }

    /// Decodes + spins up another task to handle the request
    pub async fn handle_request<Fut>(
        msg: EventMessage,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> ReplyResult<()>
    where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        let decoded = Req::decode(msg.data.as_slice()).map_err(|_| ReplyError::InvalidData)?;

        tokio::spawn(async move {
            match callback(Arc::new(Mutex::new(decoded))).await {
                Ok(response) => {
                    // do stuff
                }
                Err(e) => {}
            }
        });
        Ok(())
    }
}

错误:

error[E0277]: the trait bound `Publisher<Request>: std::marker::Copy` is not satisfied in `[closure@src/main.rs:93:40: 97:10]`
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          within `[closure@src/main.rs:93:40: 97:10]`, the trait `std::marker::Copy` is not implemented for `Publisher<Request>`
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

error[E0277]: `*const Request` cannot be sent between threads safely
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          `*const Request` cannot be sent between threads safely
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Send` is not implemented for `*const Request`
   = note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
  --> src/main.rs:53:12
   |
53 | pub struct Publisher<T> {
   |            ^^^^^^^^^
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

error[E0277]: `*const Request` cannot be shared between threads safely
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          `*const Request` cannot be shared between threads safely
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Sync` is not implemented for `*const Request`
   = note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
  --> src/main.rs:53:12
   |
53 | pub struct Publisher<T> {
   |            ^^^^^^^^^
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

我不能(或者我可以)Copy在结构上添加属性,Publisher但这不起作用,因为并非所有字段都实现了Copy. 尽管如此,我已经注释掉了Publisher不隐含的字段Copy并将属性添加到它只是为了查看,并且通过这种方法我得到:

the trait `std::marker::Copy` is not implemented for `Request`

Request 是protobuf使用 prost lib 编译的基于结构的结构。我无法向其中添加Copy属性,因为它的某些字段没有实现Copy,例如Stringand Timestamp

我想知道这里的设计是否天生就很糟糕,或者是否有一个简单的修复方法。

4

2 回答 2

0

帮助您获取这些信息确实非常困难。完整的错误代码可能会很有用。

无论如何,在“impl Node ... get_replyer()”中,你会看到回调应该返回一些实现 Copy

pub async fn get_replyer<Req, Resp, Fut>(
    &self,
    subject: String,
    callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    //-------------------------------------------------------------------^
) -> AsynkReplyer<Req, Resp>

在主要

let _submission_replyer: AsynkReplyer<Resuest, Response> = node
    .get_replyer(
    "request".into(),
        move |req: Arc<Mutex<Request>>| async {
            
            let mut req = req.lock().unwrap();
            p.publish(*req);
            
            Ok(Response {
                header: None,
                response: Some(OtherResponse {
                    request: None,
                    status: 0,
                }),
            })
//-----------^------------------------------- Return a enum std::result::Result
        },
    )
    .await;
    

std::result::Result 实现复制但关于响应?此时显示错误?

于 2021-10-29T14:07:49.863 回答
0

在我看来,你已经限制了FnisCopy因为你将它传递给多个tokio::spawn调用。你发现这Copy是非常严格的,但Clone不是。您应该改用它,并.clone()在处理新请求时简单地调用:

Self::handle_request(msg, callback.clone()).await;

那么唯一的错误是'*const Request' cannot be sent between threads safely. 编译器不会自动实现Sendor Syncfor 指针,因为它不知道这是否安全,但您Fn需要从不同的线程调用。幸运的是,您不必担心这一点。无论您PhantomData<*const T>只是为了满足编译器还是强制执行特定的差异,您都可以得到相同的结果,如下所示:

resource_type: PhantomData<fn() -> *const T>

然后,既然我们已经修复了类型约束错误,编译器现在会产生关于生命周期的错误:

  • req.clone().lock().unwrap()不起作用,因为 的结果与.lock()from 的值相关联req.clone(),但它会立即被删除。修复是.clone()不必要的,可以删除。

  • p.clone().publish(*req)不起作用,因为取消引用 aMutexLockGuard不能提供拥有的值,只能提供引用。您可以通过添加 a.clone()来解决此问题。相反,如果您认为该Arc参数是独占的,则可以按照以下建议获得所有权:如何从 Arc<Mutex<T>> 获取 T 的所有权?

  • 最后一个生命周期错误有点模糊,因为它与返回Future的与req参数相关联的生命周期有关。这可以通过 using 来解决,async move { }但随后p会从闭包中移出到未来,这意味着它不再是Fn. 你想要的是移动 req移动. p你可以这样做:

    move |req: Arc<Mutex<Request>>| {
        let p = p.clone();
        async move {
            // ...
        }
    }
    
  • “现在我正在尝试解决与await-ing相关的错误p.publish - 该错误与现在在一个await点上持续存在的锁有关,但由于互斥锁未实现Send,因此Future不能Send。您可以通过一步锁定和克隆来解决此问题,因此不会保持锁定:

    let req = req.lock().unwrap().clone();
    p.publish(req);
    Ok(Response {})
    

在操场上看到这个编译。仍有许多警告需要解决(未使用Result的),但我希望这能让您走上正确的道路。

于 2021-10-30T04:22:26.267 回答