I am having trouble deserializing json data sent from my client.
server.rs
use std::collections::HashMap;
use std::sync::{Arc,Mutex};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use serde_json::{ Value};
/*
The type Arc<T> provides shared ownership of a value of type T, allocated in the heap. Invoking clone on Arc produces a new Arc instance, which points to the same allocation on the heap as the source Arc, while increasing a reference count. When the last Arc pointer to a given allocation is destroyed, the value stored in that allocation (often referred to as “inner value”) is also dropped.
*/
// creating a type alias for user to socket map
// Arc points top
type UserToSocket = Arc<Mutex<HashMap<String,TcpStream>>>;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:9090").await;
// creating a threadsafe hashmap mutex
let local_db: UserToSocket = Arc::new(Mutex::new(HashMap::new()));
let listener = match listener{
Result::Ok(value) => {value},
Result::Err(_)=> {panic!("ERROR OCCURED")},
};
println!("[ ] Listener has been started");
loop {
// now waiting for connection
println!("[ ] Listening for connection");
let (socket,addr) = listener.accept().await.unwrap();
println!("[ ] A connection accepted from {:?}, spawwning a new task for it",addr);
// cloning does not actually clone, but rather just increases counter to it
let ld = Arc::clone(&local_db);
// spawning a new task
tokio::spawn(
async move {
handler(socket,ld).await;
}
);
}
}
// a handler for new connection
async fn handler(mut socket: TcpStream, _db: UserToSocket) {
socket.write_all(b"[ ] Hello Friend, Welcome to my program\r\n").await.unwrap();
let mut buf = vec![0; 1024];
loop {
// n holds the number of bytes read i think
match socket.read(&mut buf).await {
Ok(0) => {
println!("Client Closed connection");
return;
}
// getting some data
Ok(_n) => {
// ownership is transferred so need to clone it
let bufc = buf.clone();
// unmarshalling json
//let parsed:Value = serde_json::from_slice(&bufc).unwrap();
// obtaining string
match String::from_utf8(bufc) {
Ok(val) => {
println!("[ ] So the parsed value is {}",val);
//let temp = val.as_str();
let parsed:Value = serde_json::from_str(&val).unwrap();
println!("{:?}",parsed);
socket.write_all(b"So yeah thanks for sending this\r\n").await.unwrap();
continue;
}
Err(err) => {
println!("ERROR Could not convert to string {:?}",err);
continue;
}
};
//socket.write_all(b"Vnekai bujena\r\n").await.unwrap();
}
Err(_) => {
println!("Unhandeled error occured");
return;
}
}
}
}
client.rs
use tokio::net::{TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::{thread,time};
#[tokio::main]
async fn main() {
let sleep_time = time::Duration::from_secs(2);
let socket = TcpStream::connect("127.0.0.1:9090").await;
let mut socket = match socket {
Ok(v) => {
println!("[ ] Successfully connected");
v
}
Err(_) => {
println!("ERROR could not connect to the server");
std::process::exit(-1);
}
};
let mut buf = vec![0;1024];
//let mut user_input = String::new();
loop {
thread::sleep(sleep_time);
match socket.read(&mut buf).await {
Ok(0) => {
println!("[ ] Connection with server has been closed");
std::process::exit(1);
}
Ok(_n) => {
let bc = buf.clone();
let res = String::from_utf8(bc).unwrap();
println!("[ ] Server responded with {}",res);
}
Err(_) => {
panic!("[-] Some fatal error occured");
}
}
println
!("You want to say: ");
/*let _v = match io::stdin().read_line(&mut user_input){
Ok(val) => {val}
Err(_) => panic!("ERROR"),
};*/
let val = "{\"name\": \"John Doe\",\"age\": 43,\"phones\": [\" 44 1234567\",\" 44 2345678\"]}\r\n";
socket.write(val.as_bytes()).await.unwrap();
}
}
When I send json data to server, I receive an error.
thread 'tokio-runtime-worker' panicked at 'called Result::unwrap()
on an Err
value: Error("trailing characters", line: 2, column: 1)', src\bin\simple_server.rs:79:71
This error does not occur when I try to desterilize the json string directly. It only occurs when I send the data through network.
CodePudding user response:
Since your JSON is newline-terminated, you should use something like read_line()
to read it. (And you should never send a formatted JSON, because it will contain newlines - but serde_json is creating non-formatted JSON by default.)
For example, this compiles and should work as intended
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufStream};
use tokio::net::{TcpListener, TcpStream};
type UserToSocket = Arc<Mutex<HashMap<String, TcpStream>>>;
// ... main unchanged from your implementation ...
async fn handler(socket: TcpStream, _db: UserToSocket) {
let mut socket = BufStream::new(socket);
socket
.write_all(b"[ ] Hello Friend, Welcome to my program\r\n")
.await
.unwrap();
socket.flush().await.unwrap();
let mut line = vec![];
loop {
line.clear();
if let Err(e) = socket.read_until(b'\n', &mut line).await {
println!("Unhandled error occured: {}", e);
return;
}
if line.is_empty() {
println!("Client Closed connection");
return;
}
println!(
"[ ] So the received value is {}",
String::from_utf8_lossy(&line)
);
let parsed: Value = serde_json::from_slice(&line).unwrap();
println!("{:?}", parsed);
socket
.write_all(b"So yeah thanks for sending this\r\n")
.await
.unwrap();
socket.flush().await.unwrap();
continue;
}
}