-2

我在反序列化从客户端发送的 json 数据时遇到问题。

服务器.rs

use std::collections::HashMap;
use std::sync::{Arc,Mutex};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use serde_json::{ Value};

/*
The type Arc<T> provides shared ownership of a value of type T, allocated in the heap. Invoking clone on Arc produces a new Arc instance, which points to the same allocation on the heap as the source Arc, while increasing a reference count. When the last Arc pointer to a given allocation is destroyed, the value stored in that allocation (often referred to as “inner value”) is also dropped.
*/
// creating a type alias for user to socket map 
// Arc points top 
type UserToSocket = Arc<Mutex<HashMap<String,TcpStream>>>;


#[tokio::main]
async fn main() {

    let listener = TcpListener::bind("127.0.0.1:9090").await;

    // creating a threadsafe hashmap mutex
    let local_db: UserToSocket = Arc::new(Mutex::new(HashMap::new()));

   let listener = match listener{
        Result::Ok(value) => {value},
        Result::Err(_)=> {panic!("ERROR OCCURED")},
    };

    println!("[+] Listener has been started");

    loop {
        // now waiting for connection
        println!("[+] Listening for connection");
        let (socket,addr) = listener.accept().await.unwrap();
        println!("[+] A connection accepted from {:?}, spawwning a new task for it",addr);

        // cloning does not actually clone, but rather just increases counter to it
        let ld = Arc::clone(&local_db);

        // spawning a new task
        tokio::spawn(
            async move {
                handler(socket,ld).await;
            }
        );
    }


}

//  a handler for new connection
async fn handler(mut socket: TcpStream, _db: UserToSocket) {

    socket.write_all(b"[+] Hello Friend, Welcome to my program\r\n").await.unwrap();

    let mut buf = vec![0; 1024];
    
    loop {
        // n holds the number of bytes read i think
        match  socket.read(&mut buf).await {
            Ok(0) => {
                println!("Client Closed connection");
                return;
            }

            // getting some data
            Ok(_n) => {

                // ownership is transferred so need to clone it
                let bufc = buf.clone();

                // unmarshalling json
                //let parsed:Value = serde_json::from_slice(&bufc).unwrap();

                // obtaining string
                match String::from_utf8(bufc) {
                    Ok(val) => {
                        println!("[+] So the parsed value is {}",val);
                        //let temp = val.as_str();
                        let parsed:Value = serde_json::from_str(&val).unwrap();
                       
                        println!("{:?}",parsed);
                        socket.write_all(b"So yeah thanks for sending this\r\n").await.unwrap();
                        continue;
                    }
                    Err(err) => {
                        println!("ERROR Could not convert to string {:?}",err);
                        continue;
                    }
                };
                //socket.write_all(b"Vnekai bujena\r\n").await.unwrap();
            }
            Err(_) => {
                println!("Unhandeled error occured");
                return;
            }
        }
    }
    
}

客户端.rs

use tokio::net::{TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::{thread,time};

#[tokio::main]
async fn main() {
let sleep_time = time::Duration::from_secs(2);

    let socket = TcpStream::connect("127.0.0.1:9090").await;

    let mut socket = match socket {
        Ok(v) => {
            println!("[+] Successfully connected");
            v
        }
        Err(_) => {
            println!("ERROR could not connect to the server");
            std::process::exit(-1);
        }
    };

    let mut buf = vec![0;1024];

    //let mut user_input = String::new();
    loop {
        thread::sleep(sleep_time);

        match socket.read(&mut buf).await {
            Ok(0) => {
                println!("[+] Connection with server has been closed");
                std::process::exit(1);
            }
            Ok(_n) => {
                let bc = buf.clone();
                let res = String::from_utf8(bc).unwrap();
                println!("[+] Server responded with {}",res);
            }
            Err(_) => {
                panic!("[-] Some fatal error occured");
            }
        }

        println
        !("You want to say: ");
        /*let _v =  match io::stdin().read_line(&mut user_input){
            Ok(val) => {val}
            Err(_) => panic!("ERROR"),
        };*/

        let val = "{\"name\": \"John Doe\",\"age\": 43,\"phones\": [\"+44 1234567\",\"+44 2345678\"]}\r\n";
        socket.write(val.as_bytes()).await.unwrap();

    }
    

    
}

当我将 json 数据发送到服务器时,我收到一个错误。 线程'tokio-runtime-worker'在'调用Result::unwrap()一个Err值时惊慌失措:错误(“尾随字符”,行:2,列:1)',src\bin\simple_server.rs:79:71

当我尝试直接对 json 字符串进行消毒时,不会出现此错误。它仅在我通过网络发送数据时发生。

4

1 回答 1

1

由于您的 JSON 是换行符终止的,因此您应该使用类似的东西read_line()来阅读它。(而且您永远不应该发送格式化的 JSON,因为它将包含换行符 - 但 serde_json 默认创建非格式化的 JSON。)

例如,这可以编译并且应该按预期工作

use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufStream};
use tokio::net::{TcpListener, TcpStream};

type UserToSocket = Arc<Mutex<HashMap<String, TcpStream>>>;

// ... main unchanged from your implementation ...

async fn handler(socket: TcpStream, _db: UserToSocket) {
    let mut socket = BufStream::new(socket);
    socket
        .write_all(b"[+] Hello Friend, Welcome to my program\r\n")
        .await
        .unwrap();
    socket.flush().await.unwrap();

    let mut line = vec![];
    loop {
        line.clear();
        if let Err(e) = socket.read_until(b'\n', &mut line).await {
            println!("Unhandled error occured: {}", e);
            return;
        }
        if line.is_empty() {
            println!("Client Closed connection");
            return;
        }
        println!(
            "[+] So the received value is {}",
            String::from_utf8_lossy(&line)
        );
        let parsed: Value = serde_json::from_slice(&line).unwrap();

        println!("{:?}", parsed);
        socket
            .write_all(b"So yeah thanks for sending this\r\n")
            .await
            .unwrap();
        socket.flush().await.unwrap();
        continue;
    }
}
于 2021-11-19T16:31:47.080 回答