3

我的问题是在将 actix-web 与 Rust 一起使用的背景下。

不幸的是,如果没有一个庞大的代码示例,我无法解释这一点,所以让我从这个开始。

struct MyWs {
    game: Arc<RwLock<Game>>,
}

impl Actor for MyWs {
    type Context = ws::WebsocketContext<Self>;
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Text(text)) => {
                debug!("Echoing text with {:?}", text);
                self.game.write().unwrap().some_method();
                ctx.text(text)
            },
            _ => (),
        }
    }
}

struct Game {
    websockets: Vec<Arc<RwLock<MyWs>>>,
}

impl Game {
    pub fn new() -> GameWrapper {
        GameWrapper {
            websockets: vec![],
        }
    }

    pub fn add_websocket(&mut self, my_ws: Arc<RwLock<MyWs>>) {
        self.websockets.push(my_ws);
    }

    pub fn some_method(&mut self) {
        // Do something to influence internal state.
        self.push_state();
    }

    pub fn push_state(&self) {
        for w in self.websockets {
            // I'm not sure about this part, idk how to access the
            // WebsocketContext with which I can send stuff back to the client.
            let game_state = get_game_state_or_something();
            w.write().unwrap().ctx.text(self.game_state);
        }
    }
}

struct GameWrapper {
    pub game: Arc<RwLock<Game>>,
}

impl GameWrapper {
    pub fn new(game: Arc<RwLock<Game>>) -> GameWrapper {
        GameWrapper { game }
    }
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    let game = Arc::new(RwLock::new(Game::new()));
    let game_wrapper = RwLock::new(GameWrapper::new(game.clone()));
    let game_wrapper_data = web::Data::new(game_wrapper);
    HttpServer::new(move || {
        App::new()
            .app_data(game_wrapper_data.clone())
            .route("/play_game", web::get().to(play_game))
    })
    .bind(ip_port)?
    .run()
    .await
}

pub async fn play_game(
    req: HttpRequest,
    stream: web::Payload,
    game_wrapper: web::Data<GameWrapper>,
) -> impl Responder {
    let my_ws = MyWs { game: game_wrapper.game.clone() };
    let my_ws = Arc::new(RwLock::new(my_ws));
    let mut game = game_wrapper.game.write().unwrap();
    game.add_websocket(my_ws);
    let resp = ws::start(my_ws, &req, stream);  // This is the problem.
    let resp = match resp {
        Ok(resp) => resp,
        Err(e) => return HttpResponse::from_error(e),
    };
    debug!("Successfully upgraded to websocket");
    resp
}

让我先解释一下我要做什么。当我的客户端连接时,我与他们建立了一个 websocket。我需要这些 websocket 的列表,所以当游戏发生变化时,我可以将更新推送到所有客户端。

我将该函数绑定play_gameplay_game路由的处理程序。在这个函数中,我将 HTTP get 请求升级为 websocket。在此之前,我复制了一个游戏的 Arc+RwLock 并将其传递给 MyWs,即 websocket 结构。你可以在handleStreamHandler的MyWs impl的函数中看到我修改了Game(带some_method函数)。到目前为止这很好。

当我尝试获取对 websocket 的多个引用时,事情就会爆炸。你可以看到play_game我调用add_websocket,给 Game 一个对它的引用,所以它可以在发生变化时将更新推送回所有客户端。例如,在调用 之后some_method,我们将调用push_updates。这样做的问题是ws::start不接受 Arc,它必须接受一个 Actor,该 Actor 使用 WebSocketContext 实现 StreamHandler。

所以我主要的两个问题是:

  1. 我需要一种方法来保留对 websocket 的多个引用,这样我就可以从多个位置与客户端交谈(阅读:线程)。
  2. 我需要一些方法来做到这一点。我不确定在 actix 中如何在 MyWs 演员的上下文之外实际将消息发送回客户端。框架将 WebSocketContext 传递给handle,但我不知道如何自己动手。

我解决这个问题的想法:

  1. 在 MyWs 的handle(或started)函数中,将对 Context 的引用传递到self.game. 这不起作用,因为我要移出一个可变的 ref。
  2. 自己ws::start做一个可以参考的。我还没有尝试过,因为看起来我最终会重写很多。
  3. 以某种方式在 Arc 上实现 Actor 和 StreamHandler,或者我自己的具有内部可变性的结构/允许我保留对它的多个引用的东西。

这并不能真正帮助我发回消息,因为我仍然不知道如何通过handle函数上下文之外的 websocket 发回消息。

对不起这个问题的长度。tl;dr 是,如何在 actix-web 中获取对 websocket 的多个引用并使用它们向客户端发送消息?

以下是我正在使用的每个组件的相关文档:

4

1 回答 1

0

好的,所以我在这里的困境的解决方案毫不奇怪地改变了我试图解决这个问题的方式。我真正需要的是对每个持有 websocket 的参与者的引用,而不是对 websocket 的多个引用。我认为这就是您应该这样做的方式,因为 Actix 是一个演员框架。

这意味着代码应如下所示:

impl Game {
    ...

    pub fn register_actor(&mut self, actor: Addr<MyWs>) {
        self.actors.push(actor);
    }
}

pub async fn play_game(
    req: HttpRequest,
    stream: web::Payload,
    game_wrapper: web::Data<GameWrapper>,
) -> impl Responder {
    let my_ws = MyWs { game: game_wrapper.game.clone() };
    let my_ws = Arc::new(RwLock::new(my_ws));
    let mut game = game_wrapper.game.write().unwrap();
    let res = ws::start_with_addr(my_ws, &req, stream);
    let (addr, resp) = match res {
        Ok(res) => res,
        Err(e) => return HttpResponse::from_error(e),
    };
    game_manager.register_actor(handle, addr);
    debug!("Successfully upgraded to websocket");
    resp
}

然后,您可以通过Addr<MyWs>.

我将暂时搁置这个问题,以防其他人对如何更好地完成这件事有想法。

于 2020-06-13T19:49:09.683 回答