我正在尝试使用 Rust tokio tungstenite 包来创建基于 URL 的聊天室。例如,我有一个客户端连接到ws://localhost:8080/abcd
. 我的理解是我必须使用该tokio_tungstenite::accept_hdr_async
函数来访问标题以获取/abcd
路径,但是我在使用它时遇到了麻烦。我的第二个论点copy_headers_callback
应该是什么?
不确定我的方法是否合理。我是 Rust 的新手,它让我很头疼。我将不胜感激任何见解!
我的代码基于此示例https://github.com/snapview/tokio-tungstenite/blob/master/examples/server.rs:
use std::{
collections::HashMap,
env,
io::Error as IoError,
net::SocketAddr,
sync::{Arc, Mutex},
marker::Unpin,
};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tungstenite::{
protocol::Message,
handshake::server::{Request},
};
type Sender = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Sender>>>;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct BroadcastJsonStruct {
message: String,
sender_addr: SocketAddr,
}
async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, client_addr: SocketAddr) {
println!("Incoming TCP connection from: {}, raw stream: {}", client_addr, raw_stream.local_addr().unwrap());
let copy_headers_callback = |request: &Request| {
for (name, value) in request.headers().iter() {
println!("Name: {}, value: ", name.to_string())
// headers.insert(name.to_string(), value.clone());
}
Ok(None)
};
//accept a new asynchronous WebSocket connection
let ws_stream = tokio_tungstenite::accept_hdr_async(
raw_stream,
copy_headers_callback,
)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", client_addr);
// Insert the write part of this peer to the peer map.
let (sender, receiver) = unbounded();
peer_map.lock().unwrap().insert(client_addr, sender);
//set up the incoming and outgoing
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each(|msg| {
println!("Received a message from {}: {}", client_addr, msg.to_text().unwrap());
let peers = peer_map.lock().unwrap();
//make a new struct to be serialized
let broadcast_data = BroadcastJsonStruct {
message: msg.to_text().unwrap().to_owned(),
sender_addr: client_addr.to_owned(),
};
let new_msg = Message::Text(
serde_json::to_string(&broadcast_data).expect("problem serializing broadcast_data")
);
println!("New message {}", new_msg.to_text().unwrap());
// We want to broadcast the message to everyone except ourselves.
//filter returns addresses that aren't our current address
let broadcast_recipients =
peers.iter().filter(|(peer_addr, _)| peer_addr != &&client_addr).map(|(_, ws_sink)| ws_sink);
//send the message to all the recipients
for recp in broadcast_recipients {
recp.unbounded_send(new_msg.clone()).unwrap();
}
future::ok(())
});
let receive_from_others = receiver.map(Ok).forward(outgoing);
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
println!("{} disconnected", &client_addr);
peer_map.lock().unwrap().remove(&client_addr);
}
#[tokio::main]
async fn main() -> Result<(), IoError> {
//see if there is a server address specified in the command line argument, else use default address
let server_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
let state = PeerMap::new(Mutex::new(HashMap::new()));
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&server_addr).await; //create the server on the address
let listener = try_socket.expect("Failed to bind");
println!("Listening on: {}", server_addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, client_addr)) = listener.accept().await {
tokio::spawn(handle_connection(state.clone(), stream, client_addr));
}
Ok(())
}
我的错误:
error[E0593]: closure is expected to take 2 arguments, but it takes 1 argument
--> src/main.rs:45:9
|
34 | let copy_headers_callback = |request: &Request| {
| ------------------- takes 1 argument
...
45 | copy_headers_callback,
| ^^^^^^^^^^^^^^^^^^^^^ expected closure that takes 2 arguments
|
::: /Users/harryli/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-tungstenite-0.12.0/src/lib.rs:151:8
|
151 | C: Callback + Unpin,
| -------- required by this bound in `accept_hdr_async`
|
= note: required because of the requirements on the impl of `Callback` for `[closure@src/main.rs:34:33: 40:10]`
error: aborting due to previous error; 1 warning emitted