Home > OS >  Why is async TcpStream blocking?
Why is async TcpStream blocking?

Time:10-14

I'm working on a project to implement a distributed key value store in rust. I've made the server side code using Tokio's asynchronous runtime. I'm running into an issue where it seems my asynchronous code is blocking so when I have multiple connections to the server only one TcpStream is processed. I'm new to implementing async code, both in general and on rust, but I thought that other streams would be accepted and processed if there was no activity on a given tcp stream.

Is my understanding of async wrong or am I using tokio incorrectly?

This is my entry point:

use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;

extern crate blue;

use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

    let opt = args::Opt::from_args();
    let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
    let role = NodeRole::from_str(opt.role.as_str()).unwrap();
    let leader_addr = match role {
        NodeRole::Leader => addr,
        NodeRole::Follower => SocketAddr::from_str(opt.follow.unwrap().as_str())?,
    };

    let wal_name = addr.to_string().replace(".", "").replace(":", "");
    let wal_full_name = format!("wal{}.log", wal_name);
    let wal_path = PathBuf::from(wal_full_name);
    let mut wal = match wal_path.exists() {
        true => {
            info!("Existing WAL found");
            WriteAheadLog::open(&wal_path)?
        }
        false => {
            info!("Creating WAL");
            WriteAheadLog::new(&wal_path)?
        }
    };
    debug!("WAL: {:?}", wal);

    let store_name = addr.to_string().replace(".", "").replace(":", "");
    let store_pth = format!("{}.pb", store_name);
    let store_path = Path::new(&store_pth);
    let mut store = match store_path.exists() {
        true => deserialize_store(store_path)?,
        false => message::Store::default(),
    };

    let listener = TcpListener::bind(addr).await?;
    let cluster = Cluster::new(addr, &role, leader_addr, &mut wal, &mut store).await?;

    let store_path = Arc::new(store_path);
    let store = Arc::new(Mutex::new(store));

    let wal = Arc::new(Mutex::new(wal));
    let cluster = Arc::new(Mutex::new(cluster));
    info!("Blue launched. Waiting for incoming connection");

    loop {
        let (stream, addr) = listener.accept().await?;
        info!("Incoming request from {}", addr);
        let store = Arc::clone(&store);
        let store_path = Arc::clone(&store_path);
        let wal = Arc::clone(&wal);
        let cluster = Arc::clone(&cluster);
        handle_stream(stream, store, store_path, wal, cluster, &role).await?;
    }
}

Below is my handler (handle_stream from the above). I excluded all the handlers in match input as I didn't think they were necessary to prove the point (full code for that section is here: https://github.com/matthewmturner/Bradfield-Distributed-Systems/blob/main/blue/src/store/handler.rs if it actually helps).

Specifically the point that is blocking is the line let input = async_read_message::<message::Request>(&mut stream).await;

This is where the server is waiting for communication from either a client or another server in the cluster. The behavior I currently see is that after connecting to server with client the server doesn't receive any of the requests to add other nodes to the cluster - it only handles the client stream.

use std::io;
use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use log::{debug, error, info};
use serde_json::json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream as asyncTcpStream;

use super::super::ipc::message;
use super::super::ipc::message::request::Command;
use super::super::ipc::receiver::async_read_message;
use super::super::ipc::sender::{async_send_message, send_message};
use super::cluster::{Cluster, NodeRole};
use super::serialize::persist_store;
use super::wal::WriteAheadLog;

// TODO: Why isnt async working? I.e. connecting servers after client is connected stays on client stream.
pub async fn handle_stream<'a>(
    mut stream: asyncTcpStream,
    store: Arc<Mutex<message::Store>>,
    store_path: Arc<&Path>,
    wal: Arc<Mutex<WriteAheadLog<'a>>>,
    cluster: Arc<Mutex<Cluster>>,
    role: &NodeRole,
) -> io::Result<()> {
    loop {
        info!("Handling stream: {:?}", stream);
        let input = async_read_message::<message::Request>(&mut stream).await;
        debug!("Input: {:?}", input);
        match input {
        ...
        }
    }
}

This is the code for async_read_message

pub async fn async_read_message<M: Message   Default>(
    stream: &mut asyncTcpStream,
) -> io::Result<M> {
    let mut len_buf = [0u8; 4];
    debug!("Reading message length");
    stream.read_exact(&mut len_buf).await?;
    let len = i32::from_le_bytes(len_buf);
    let mut buf = vec![0u8; len as usize];
    debug!("Reading message");
    stream.read_exact(&mut buf).await?;
    let user_input = M::decode(&mut buf.as_slice())?;
    debug!("Received message: {:?}", user_input);
    Ok(user_input)
}

CodePudding user response:

Your problem lies with how you're handling messages after clients have connected:

handle_stream(stream, store, store_path, wal, cluster, &role).await?;

This .await means your listening loop will wait for handle_stream to return, but (making some assumptions) this function won't return until the client has disconnected. What you want is to tokio::spawn a new task that can run independently:

tokio::spawn(handle_stream(stream, store, store_path, wal, cluster, &role));

You may have to change some of your parameter types to avoid lifetimes; tokio::spawn requires 'static since the task's lifetime is decoupled from the scope where it was spawned.

  • Related