Home > Software engineering >  Rust Deserializing JSON
Rust Deserializing JSON

Time:11-20

I am having trouble deserializing json data sent from my client.

server.rs

use std::collections::HashMap;
use std::sync::{Arc,Mutex};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use serde_json::{ Value};

/*
The type Arc<T> provides shared ownership of a value of type T, allocated in the heap. Invoking clone on Arc produces a new Arc instance, which points to the same allocation on the heap as the source Arc, while increasing a reference count. When the last Arc pointer to a given allocation is destroyed, the value stored in that allocation (often referred to as “inner value”) is also dropped.
*/
// creating a type alias for user to socket map 
// Arc points top 
type UserToSocket = Arc<Mutex<HashMap<String,TcpStream>>>;


#[tokio::main]
async fn main() {

    let listener = TcpListener::bind("127.0.0.1:9090").await;

    // creating a threadsafe hashmap mutex
    let local_db: UserToSocket = Arc::new(Mutex::new(HashMap::new()));

   let listener = match listener{
        Result::Ok(value) => {value},
        Result::Err(_)=> {panic!("ERROR OCCURED")},
    };

    println!("[ ] Listener has been started");

    loop {
        // now waiting for connection
        println!("[ ] Listening for connection");
        let (socket,addr) = listener.accept().await.unwrap();
        println!("[ ] A connection accepted from {:?}, spawwning a new task for it",addr);

        // cloning does not actually clone, but rather just increases counter to it
        let ld = Arc::clone(&local_db);

        // spawning a new task
        tokio::spawn(
            async move {
                handler(socket,ld).await;
            }
        );
    }


}

//  a handler for new connection
async fn handler(mut socket: TcpStream, _db: UserToSocket) {

    socket.write_all(b"[ ] Hello Friend, Welcome to my program\r\n").await.unwrap();

    let mut buf = vec![0; 1024];
    
    loop {
        // n holds the number of bytes read i think
        match  socket.read(&mut buf).await {
            Ok(0) => {
                println!("Client Closed connection");
                return;
            }

            // getting some data
            Ok(_n) => {

                // ownership is transferred so need to clone it
                let bufc = buf.clone();

                // unmarshalling json
                //let parsed:Value = serde_json::from_slice(&bufc).unwrap();

                // obtaining string
                match String::from_utf8(bufc) {
                    Ok(val) => {
                        println!("[ ] So the parsed value is {}",val);
                        //let temp = val.as_str();
                        let parsed:Value = serde_json::from_str(&val).unwrap();
                       
                        println!("{:?}",parsed);
                        socket.write_all(b"So yeah thanks for sending this\r\n").await.unwrap();
                        continue;
                    }
                    Err(err) => {
                        println!("ERROR Could not convert to string {:?}",err);
                        continue;
                    }
                };
                //socket.write_all(b"Vnekai bujena\r\n").await.unwrap();
            }
            Err(_) => {
                println!("Unhandeled error occured");
                return;
            }
        }
    }
    
}

client.rs

use tokio::net::{TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::{thread,time};

#[tokio::main]
async fn main() {
let sleep_time = time::Duration::from_secs(2);

    let socket = TcpStream::connect("127.0.0.1:9090").await;

    let mut socket = match socket {
        Ok(v) => {
            println!("[ ] Successfully connected");
            v
        }
        Err(_) => {
            println!("ERROR could not connect to the server");
            std::process::exit(-1);
        }
    };

    let mut buf = vec![0;1024];

    //let mut user_input = String::new();
    loop {
        thread::sleep(sleep_time);

        match socket.read(&mut buf).await {
            Ok(0) => {
                println!("[ ] Connection with server has been closed");
                std::process::exit(1);
            }
            Ok(_n) => {
                let bc = buf.clone();
                let res = String::from_utf8(bc).unwrap();
                println!("[ ] Server responded with {}",res);
            }
            Err(_) => {
                panic!("[-] Some fatal error occured");
            }
        }

        println
        !("You want to say: ");
        /*let _v =  match io::stdin().read_line(&mut user_input){
            Ok(val) => {val}
            Err(_) => panic!("ERROR"),
        };*/

        let val = "{\"name\": \"John Doe\",\"age\": 43,\"phones\": [\" 44 1234567\",\" 44 2345678\"]}\r\n";
        socket.write(val.as_bytes()).await.unwrap();

    }
    

    
}

When I send json data to server, I receive an error. thread 'tokio-runtime-worker' panicked at 'called Result::unwrap() on an Err value: Error("trailing characters", line: 2, column: 1)', src\bin\simple_server.rs:79:71

This error does not occur when I try to desterilize the json string directly. It only occurs when I send the data through network.

CodePudding user response:

Since your JSON is newline-terminated, you should use something like read_line() to read it. (And you should never send a formatted JSON, because it will contain newlines - but serde_json is creating non-formatted JSON by default.)

For example, this compiles and should work as intended

use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufStream};
use tokio::net::{TcpListener, TcpStream};

type UserToSocket = Arc<Mutex<HashMap<String, TcpStream>>>;

// ... main unchanged from your implementation ...

async fn handler(socket: TcpStream, _db: UserToSocket) {
    let mut socket = BufStream::new(socket);
    socket
        .write_all(b"[ ] Hello Friend, Welcome to my program\r\n")
        .await
        .unwrap();
    socket.flush().await.unwrap();

    let mut line = vec![];
    loop {
        line.clear();
        if let Err(e) = socket.read_until(b'\n', &mut line).await {
            println!("Unhandled error occured: {}", e);
            return;
        }
        if line.is_empty() {
            println!("Client Closed connection");
            return;
        }
        println!(
            "[ ] So the received value is {}",
            String::from_utf8_lossy(&line)
        );
        let parsed: Value = serde_json::from_slice(&line).unwrap();

        println!("{:?}", parsed);
        socket
            .write_all(b"So yeah thanks for sending this\r\n")
            .await
            .unwrap();
        socket.flush().await.unwrap();
        continue;
    }
}
  • Related