Home > Blockchain >  Java Socket Server not able to respond correctly to Client side
Java Socket Server not able to respond correctly to Client side

Time:09-24

I'm creating a terminal simulator of a client/server system, where my program consist of 2 main components, AggregationServer, and ContentServer.

The AggregationServer will act as the main server that handling the request come from ContentServer, and there will be multiple ContentServer trying to connect to AggregationServer to send a request.

The ContentServer will send a PUT request, where it will open a local file and send that file's information to the AggregationServer. AggregationServer will parse and save the information to database. Also, ContentServer will ask for user input to either terminate the connection or send another PUT request.

Both AggregationServer and ContentServer will maintain a persistence connection for 12 seconds. If ContentServer is not communicating, Aggregation will terminate the connection.

Currently, my AggregationServer received the PUT request from ContentServer and insert it into database correctly. However, I faced an obstacle with AggregationServer sending respond to ContentServer. At the first time ContentServer establish connection and send PUT request, the AggregationServer respond but ContentServer's BufferedReader received nothing. Until I send another request to AggregationServer, ContentServer will acknowledge the respond, and it is actually the respond from the previous request. I can know this because for the first connection with each other, AggregationServer will send response code as 201, and for re-sending request, it will send status code 200.

I'm suspecting it is because of my BufferedReader and PrintWriter not perform correctly but I can't figure it out what went wrong.

Below is my code

Database.java

import java.sql.*;
import java.util.Date;
import java.text.SimpleDateFormat;

public class Database {
    // Establish connection to database
    private Connection getConnection() throws SQLException {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            System.err.println("Unable to find jdbc class!");
            e.printStackTrace();
            System.exit(1);
        }
        return DriverManager.getConnection("jdbc:mysql://localhost:3306/atom", "root", "root");
    }

    // Insert feed into the database
    // Update September 15th, 2022: The content server may try to send more than 1
    // PUT request of the same content
    // so if we keep inserting, it's will not be efficient, instead, I update the
    // database rather than insert
    public int insertFeed(String port, String body) throws SQLException {
        // Process to check if feed exist in database
        Connection con = getConnection();
        PreparedStatement ps = con.prepareStatement("SELECT * FROM feeds WHERE port_number=?");

        ps.setString(1, port);
        ResultSet rs = ps.executeQuery();

        SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
        Date date = new Date();

        if (rs.next()) {
            ps = con.prepareStatement("UPDATE feeds SET body=?, feed_date=? WHERE port_number=?");
            ps.setString(1, body);
            ps.setString(2, formatter.format(date));
            ps.setString(3, port);
            ps.executeUpdate();
            ps.close();
            return 0;
        } else { // else insert
            ps = con.prepareStatement("INSERT INTO feeds (body, feed_date, port_number) VALUES (?,?,?)");
            ps.setString(1, body);
            ps.setString(2, formatter.format(date));
            ps.setString(3, port);
            ps.executeUpdate();
            ps.close();
            return 1;
        }
    }

    // Return feed for GET request
    public synchronized String getFeed(String body) throws SQLException {
        Connection con = getConnection();
        PreparedStatement ps = con.prepareStatement("SELECT body FROM feeds WHERE port_number=?");
        ps.setInt(1, Integer.parseInt(body));
        ResultSet rs = ps.executeQuery();

        // Get the body feed
        StringBuilder sb = new StringBuilder();
        while (rs.next()) {
            sb.append(rs.getString("body"));
        }
        return sb.toString();
    }

    // Delete every feeds in database
    public synchronized void deleteAllFeeds() throws SQLException {
        Connection con = getConnection();
        PreparedStatement ps = con.prepareStatement("DELETE FROM feeds");
        ps.executeUpdate();
    }
}

AggregationServer.java

import java.net.*;
import java.sql.SQLException;

import java.io.*;

public class AggregationServer {
    public static void main(String[] args) throws IOException {

        // Initialize portnumber as 4567
        int portNumber = 4567;

        // If port number is given in the stdin, use that port number
        if (args.length == 1) {
            portNumber = Integer.parseInt(args[0]);
        } else if (args.length > 1) { // Port number error handling
            System.err.println("Error: Too many parameters.");
            System.err.println("Error: Enter a port number for AggregationServer! 4567 is the default port number.");
            System.exit(1);
        }

        System.out.println("Aggregation Server connecting to port: "   portNumber);

        // Establish database connection
        Database db = new Database();

        // Clearing database
        try {
            db.deleteAllFeeds();
        } catch (SQLException e) {
            System.err.println("DATABASE ERROR! UNABLE TO CLEAR DATABASE: "   e.getMessage());
        }

        try {
            ServerSocket serverSocket = new ServerSocket(portNumber);
            // Start a thread for responding for each client
            while (true) {
                new ServerThread(serverSocket.accept(), db).init();
            }
        } catch (IOException e) {
            System.err.println("SERVER ERROR! UNABLE TO PROCESS");
            e.printStackTrace();
        }
    }
}

ServerThread.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

public class ServerThread {
    private Socket socket;
    private String[] requestLine;
    private Database database;

    // Constructor
    public ServerThread(Socket socket, Database database) {
        try {
            socket.getKeepAlive();
            socket.setSoTimeout(12000);
            System.out.println("Aggregation Server receive connection at: "   socket.getRemoteSocketAddress());
        } catch (SocketException e) {
            System.err.println("SERVER ERROR! SOCKET TIMES OUT - CAN'T KEEP CONNECTION");
            e.printStackTrace();
        }

        this.socket = socket;
        this.database = database;
    }

    // Start the thread
    public void init() {
        try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream());) {
            boolean terminate = false;
            if (out.checkError())
                System.err.println("An error for Printwritter occur at init()");
            while (!terminate) {
                try {
                    boolean isAlive = parseRequest(in); // Read in the request line and analyze it
                    if (!isAlive) {
                        System.out.println("SERVER LOST CONNECTION WITH CLIENT/CONTENT SERVER.");
                        break;
                    }
                    analyzeRespond(in, out);

                } catch (SocketTimeoutException se) {
                    System.err.println("SERVER ERROR! TIMEOUT - NO REQUEST FOR THE PASS 12 SECONDS.");
                    System.err.println("CLOSING CONNECTION.");
                    terminate = true;
                } catch (IOException e) {
                    System.err.println("SERVER ERROR! MESSAGES: "   e.getMessage());
                    terminate = true;
                }
            }
            socket.close();
        } catch (IOException e) {
            System.err.println("SERVER ERROR! UNABLE TO READ AND RESPONSE TO REQUEST - MESSAGE: "   e.getMessage());
        }

    }

    // analyze the request type
    private boolean parseRequest(BufferedReader in) throws IOException {
        String request = in.readLine();
        if (request == null) {
            return false;
        }
        this.requestLine = request.split(" ");
        return true;
    }

    // parsing the headers
    private HashMap<String, String> parseHeader(BufferedReader in) throws IOException {
        HashMap<String, String> mp = new HashMap<String, String>();
        String inputLine;
        while ((inputLine = in.readLine()) != null && !inputLine.isEmpty()) {
            int delim = inputLine.indexOf(": ");
            if (delim != -1) {
                mp.put(inputLine.substring(0, delim), inputLine.substring(delim   2));
            }
        }
        return mp;
    }

    // Parses the body fields of the response
    // Assumes text based
    private String parseBody(BufferedReader in, HashMap<String, String> headers) throws IOException {
        int size = 0;
        size = Integer.parseInt(headers.get("Content-Length"));
        // Skip the two feed headers line
        in.readLine();
        in.readLine();

        // Reading the body characters by characters
        char[] bodyChars = new char[size];
        in.read(bodyChars, 0, size);

        String body = new String(bodyChars);
        return body;
    }

    // Analyzing respond
    private void analyzeRespond(BufferedReader in, PrintWriter out) throws IOException {
        if (this.requestLine[0].equals("PUT")) {
            handlePUT(in, out);
        } 
    }

    // Method to extract body for PUT request
    private String[] extractFeeds(String body) {
        String[] feeds = body.split("\nentry\n");
        return feeds;
    }

    // Method to convert feed to hashmap
    private HashMap<String, String> toHashMap(String feed) {
        HashMap<String, String> map = new HashMap<String, String>();
        String[] feedLine = feed.split("\n");

        for (int i = 0; i < feedLine.length; i  ) {
            if (isAtomElement(feedLine[i])) {
                int delim = feedLine[i].indexOf(":");
                if (delim == -1)
                    continue;
                String atomElement = feedLine[i].substring(0, delim);

                if (atomElement.contains("summary")) { // Summary is at the end of feed
                    String summary = feedLine[i].substring(delim   1);
                    for (int j = i   1; j < feedLine.length; j  ) { // Add all the line
                        summary  = feedLine[j];
                    }
                    map.put(atomElement, summary);
                    break;
                } else { // Other atoms element
                    map.put(atomElement, feedLine[i].substring(delim   1));
                }
            } else { // Error: Unrecognized atom element
                System.err.println("FEED ERROR! UNRECOGNIZED ATOM ELEMENT "   feedLine[i]);
                break;
            }
        }
        return map;
    }

    // Check if valid atom element
    private boolean isAtomElement(String element) {
        if (element.contains("title") || element.contains("subtitle") || element.contains("link")
                || element.contains("updated")
                || element.contains("author") || element.contains("name") || element.contains("id")
                || element.contains("summary") || element.contains("entry"))
            return true;
        return false;
    }

    private void printResponse(int statusCode, String body, PrintWriter out) throws IOException {
        SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
        Date date = new Date();
        out.print("\nHTTP/1.1 "   String.valueOf(statusCode)   " "   getStatusMessage(statusCode)   "\r\n");
        out.print("Date: "   formatter.format(date)   "\r\n");
        out.print("Content-Length: "   String.valueOf(body.length())   "\r\n");
        out.print("Content-Type: application/atom xml\r\n");
        out.print("Connection: keep-alive\r\n");
        if (body != null)
            out.print(body   "\r\n");
        out.print("\r\n");
        out.flush();
    }

    // Method for get message for status code
    private String getStatusMessage(int statusCode) {
        switch (statusCode) {
            case 201:
                return "HTTP_CREATED";
            case 200:
                return "OK";
            case 400:
                return "BAD REQUEST";
            case 204:
                return "NO CONTENT";
            case 500:
                return "INTERNAL SERVER ERROR";
            default:
                return "UNRECOGNIZED STATUS CODE";
        }
    }

     // I'M SUSPECTING ERROR OCCUR HERE
    // Method to handle PUT
    private void handlePUT(BufferedReader in, PrintWriter out) throws IOException {
        // Get the headers and the body
        HashMap<String, String> headers = parseHeader(in);
        String body = parseBody(in, headers);

        // Extract the body into feeds by "entry"
        String[] feeds = extractFeeds(body);

        for (int i = 0; i < feeds.length; i  ) {
            HashMap<String, String> feed = toHashMap(feeds[i]);
            // Invalid feed cases
            if (feed.get("title") == null || feed.get("link") == null || feed.get("id") == null) {
                System.err.println("FEED ERROR! REJECTED FEED WITHOUT TITLE/LINK/ID");
                return;
            }
        }

        // Insert or update feeds into database
        try {
            if (database.insertFeed(this.socket.getRemoteSocketAddress().toString(), body) == 1) {
                System.out.println("Insert feed");
                printResponse(201, body, out);
            } else {
                System.out.println("Updated feed");
                printResponse(200, body, out);
            }
        } catch (SQLException e) {
            System.err.println("DATABASE ERROR! UNABLE TO COMPLETE PUT REQUEST - MESSAGE: "   e.getMessage());
        }
    }
}

ContentServer.java

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.stream.Collectors;

// Content server whoes will put the file to the aggregation server
public class ContentServer {
    public static void main(String[] args) {

        // Checking for stdin input
        if (args.length != 2) {
            System.err.println("Error: ContentServer input via commandline - hostname:portnumber filename");
            System.exit(1);
        }

        // Getting the hostname, port number, and file name
        String fileName = args[1];
        String[] tmp = args[0].split(":");
        String hostName = tmp[0];
        int portNumber = Integer.parseInt(tmp[1]);

        if (portNumber == -1)
            portNumber = 4567; // Default port number
        System.out.println("Content Server connect to port: "   portNumber);

        // Establish socket connection
        try (
                Socket socket = new Socket(hostName, portNumber);
                PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
                BufferedReader in = new BufferedReader(
                        new InputStreamReader(socket.getInputStream()));) {
            persistentConnection(in, out, hostName, fileName);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void sendPutRequest(PrintWriter out, String filename) {
        BufferedReader file = null;
        try {
            // Read the file
            InputStream fstream = ClassLoader.getSystemClassLoader().getResourceAsStream(filename);

            if (fstream == null)
                throw new FileNotFoundException();

            file = new BufferedReader(new InputStreamReader(fstream));
            String body = file.lines().collect(Collectors.joining("\n"));

            // Send the request with the pre-defined format
            System.out.println("Sending PUT request");
            out.println("PUT /atom.xml HTTP/1.1\r");
            out.println("User-Agent: ATOMClient/1/0\r");
            out.println("Content-Type: application/atom xml\r");
            out.println("Content-Length: "   Integer.toString(body.length())   "\r\n");
            out.println("<?xml version='1.0' encoding='iso-8859-1' ?>\r");
            out.println("<feed xml:lang=\"en-US\" xmlns=\"http://www.w3.org/2005/Atom\">\r");
            out.println(body);
            out.println("</feed>\r");
            out.flush();
        } catch (FileNotFoundException e) {
            System.err.println("Error while sending PUT request! Can't find file: "   filename);
            e.printStackTrace();
            System.exit(1);
        } finally {
            try {
                if (file != null)
                    file.close();
            } catch (IOException ex) {
                System.err.println("Error: "   ex.getMessage());
            }
        }
    }

    public static void persistentConnection(BufferedReader in, PrintWriter out, String hostName, String fileName) {
        Scanner sc = new Scanner(System.in);
        boolean running = true;
        while (running) {
            System.out.print("Enter 1 for terminate connection to server, 2 for sending PUT request: ");
            int user_input = sc.nextInt();
            sc.nextLine();

            switch (user_input) {
                case 1:
                    System.out.println("Terminate connection");

                    running = false;
                    break;
                case 2:
                    if (out.checkError())
                        return;

                    sendPutRequest(out, fileName);
                    try {
                        receiveResponse(in);
                    } catch (IOException e) {
                        System.err.println("Error in receiving respond: "   e.getMessage());
                    }
                    break;
                default:
                    System.out.println("Unable to recognize user input!");
                    break;
            }
        }
    }

    // Receive the response
    public static void receiveResponse(BufferedReader in) throws IOException {
        String line;
        while(in.ready() && (line = in.readLine()) != null) {
            System.out.println(line);
        }
    }
}

Specifically, I'm suspecting I got this uncorrect behavior at ServerThread.java - handlePUT() method and ContentServer.java - receiveRespond() method.

Thanks.

CodePudding user response:

So I figured out what I did wrong. In ContentServer.java, I send a PUT request and ask for a response immediately while the server's response did not reach the end of the pipe. This is what I did:

case 2:
                    if (out.checkError())
                        return;

                    sendPutRequest(out, fileName);
                    try {
                        Thread.sleep(500);
                        receiveResponse(in);
                    } catch (IOException e) {
                        System.err.println("Error in receiving respond: "   e.getMessage());
                    } catch (InterruptedException et) {
                        System.err.println("Error in thread stopping "   et.getMessage());
                    }
                    break;

I made the thread wait for the response and then print it out.

  • Related