I'm creating a terminal simulator of a client/server system, where my program consist of 2 main components, AggregationServer, and ContentServer.
The AggregationServer will act as the main server that handling the request come from ContentServer, and there will be multiple ContentServer trying to connect to AggregationServer to send a request.
The ContentServer will send a PUT request, where it will open a local file and send that file's information to the AggregationServer. AggregationServer will parse and save the information to database. Also, ContentServer will ask for user input to either terminate the connection or send another PUT request.
Both AggregationServer and ContentServer will maintain a persistence connection for 12 seconds. If ContentServer is not communicating, Aggregation will terminate the connection.
Currently, my AggregationServer received the PUT request from ContentServer and insert it into database correctly. However, I faced an obstacle with AggregationServer sending respond to ContentServer. At the first time ContentServer establish connection and send PUT request, the AggregationServer respond but ContentServer's BufferedReader received nothing. Until I send another request to AggregationServer, ContentServer will acknowledge the respond, and it is actually the respond from the previous request. I can know this because for the first connection with each other, AggregationServer will send response code as 201, and for re-sending request, it will send status code 200.
I'm suspecting it is because of my BufferedReader and PrintWriter not perform correctly but I can't figure it out what went wrong.
Below is my code
Database.java
import java.sql.*;
import java.util.Date;
import java.text.SimpleDateFormat;
public class Database {
// Establish connection to database
private Connection getConnection() throws SQLException {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
System.err.println("Unable to find jdbc class!");
e.printStackTrace();
System.exit(1);
}
return DriverManager.getConnection("jdbc:mysql://localhost:3306/atom", "root", "root");
}
// Insert feed into the database
// Update September 15th, 2022: The content server may try to send more than 1
// PUT request of the same content
// so if we keep inserting, it's will not be efficient, instead, I update the
// database rather than insert
public int insertFeed(String port, String body) throws SQLException {
// Process to check if feed exist in database
Connection con = getConnection();
PreparedStatement ps = con.prepareStatement("SELECT * FROM feeds WHERE port_number=?");
ps.setString(1, port);
ResultSet rs = ps.executeQuery();
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
Date date = new Date();
if (rs.next()) {
ps = con.prepareStatement("UPDATE feeds SET body=?, feed_date=? WHERE port_number=?");
ps.setString(1, body);
ps.setString(2, formatter.format(date));
ps.setString(3, port);
ps.executeUpdate();
ps.close();
return 0;
} else { // else insert
ps = con.prepareStatement("INSERT INTO feeds (body, feed_date, port_number) VALUES (?,?,?)");
ps.setString(1, body);
ps.setString(2, formatter.format(date));
ps.setString(3, port);
ps.executeUpdate();
ps.close();
return 1;
}
}
// Return feed for GET request
public synchronized String getFeed(String body) throws SQLException {
Connection con = getConnection();
PreparedStatement ps = con.prepareStatement("SELECT body FROM feeds WHERE port_number=?");
ps.setInt(1, Integer.parseInt(body));
ResultSet rs = ps.executeQuery();
// Get the body feed
StringBuilder sb = new StringBuilder();
while (rs.next()) {
sb.append(rs.getString("body"));
}
return sb.toString();
}
// Delete every feeds in database
public synchronized void deleteAllFeeds() throws SQLException {
Connection con = getConnection();
PreparedStatement ps = con.prepareStatement("DELETE FROM feeds");
ps.executeUpdate();
}
}
AggregationServer.java
import java.net.*;
import java.sql.SQLException;
import java.io.*;
public class AggregationServer {
public static void main(String[] args) throws IOException {
// Initialize portnumber as 4567
int portNumber = 4567;
// If port number is given in the stdin, use that port number
if (args.length == 1) {
portNumber = Integer.parseInt(args[0]);
} else if (args.length > 1) { // Port number error handling
System.err.println("Error: Too many parameters.");
System.err.println("Error: Enter a port number for AggregationServer! 4567 is the default port number.");
System.exit(1);
}
System.out.println("Aggregation Server connecting to port: " portNumber);
// Establish database connection
Database db = new Database();
// Clearing database
try {
db.deleteAllFeeds();
} catch (SQLException e) {
System.err.println("DATABASE ERROR! UNABLE TO CLEAR DATABASE: " e.getMessage());
}
try {
ServerSocket serverSocket = new ServerSocket(portNumber);
// Start a thread for responding for each client
while (true) {
new ServerThread(serverSocket.accept(), db).init();
}
} catch (IOException e) {
System.err.println("SERVER ERROR! UNABLE TO PROCESS");
e.printStackTrace();
}
}
}
ServerThread.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
public class ServerThread {
private Socket socket;
private String[] requestLine;
private Database database;
// Constructor
public ServerThread(Socket socket, Database database) {
try {
socket.getKeepAlive();
socket.setSoTimeout(12000);
System.out.println("Aggregation Server receive connection at: " socket.getRemoteSocketAddress());
} catch (SocketException e) {
System.err.println("SERVER ERROR! SOCKET TIMES OUT - CAN'T KEEP CONNECTION");
e.printStackTrace();
}
this.socket = socket;
this.database = database;
}
// Start the thread
public void init() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream());) {
boolean terminate = false;
if (out.checkError())
System.err.println("An error for Printwritter occur at init()");
while (!terminate) {
try {
boolean isAlive = parseRequest(in); // Read in the request line and analyze it
if (!isAlive) {
System.out.println("SERVER LOST CONNECTION WITH CLIENT/CONTENT SERVER.");
break;
}
analyzeRespond(in, out);
} catch (SocketTimeoutException se) {
System.err.println("SERVER ERROR! TIMEOUT - NO REQUEST FOR THE PASS 12 SECONDS.");
System.err.println("CLOSING CONNECTION.");
terminate = true;
} catch (IOException e) {
System.err.println("SERVER ERROR! MESSAGES: " e.getMessage());
terminate = true;
}
}
socket.close();
} catch (IOException e) {
System.err.println("SERVER ERROR! UNABLE TO READ AND RESPONSE TO REQUEST - MESSAGE: " e.getMessage());
}
}
// analyze the request type
private boolean parseRequest(BufferedReader in) throws IOException {
String request = in.readLine();
if (request == null) {
return false;
}
this.requestLine = request.split(" ");
return true;
}
// parsing the headers
private HashMap<String, String> parseHeader(BufferedReader in) throws IOException {
HashMap<String, String> mp = new HashMap<String, String>();
String inputLine;
while ((inputLine = in.readLine()) != null && !inputLine.isEmpty()) {
int delim = inputLine.indexOf(": ");
if (delim != -1) {
mp.put(inputLine.substring(0, delim), inputLine.substring(delim 2));
}
}
return mp;
}
// Parses the body fields of the response
// Assumes text based
private String parseBody(BufferedReader in, HashMap<String, String> headers) throws IOException {
int size = 0;
size = Integer.parseInt(headers.get("Content-Length"));
// Skip the two feed headers line
in.readLine();
in.readLine();
// Reading the body characters by characters
char[] bodyChars = new char[size];
in.read(bodyChars, 0, size);
String body = new String(bodyChars);
return body;
}
// Analyzing respond
private void analyzeRespond(BufferedReader in, PrintWriter out) throws IOException {
if (this.requestLine[0].equals("PUT")) {
handlePUT(in, out);
}
}
// Method to extract body for PUT request
private String[] extractFeeds(String body) {
String[] feeds = body.split("\nentry\n");
return feeds;
}
// Method to convert feed to hashmap
private HashMap<String, String> toHashMap(String feed) {
HashMap<String, String> map = new HashMap<String, String>();
String[] feedLine = feed.split("\n");
for (int i = 0; i < feedLine.length; i ) {
if (isAtomElement(feedLine[i])) {
int delim = feedLine[i].indexOf(":");
if (delim == -1)
continue;
String atomElement = feedLine[i].substring(0, delim);
if (atomElement.contains("summary")) { // Summary is at the end of feed
String summary = feedLine[i].substring(delim 1);
for (int j = i 1; j < feedLine.length; j ) { // Add all the line
summary = feedLine[j];
}
map.put(atomElement, summary);
break;
} else { // Other atoms element
map.put(atomElement, feedLine[i].substring(delim 1));
}
} else { // Error: Unrecognized atom element
System.err.println("FEED ERROR! UNRECOGNIZED ATOM ELEMENT " feedLine[i]);
break;
}
}
return map;
}
// Check if valid atom element
private boolean isAtomElement(String element) {
if (element.contains("title") || element.contains("subtitle") || element.contains("link")
|| element.contains("updated")
|| element.contains("author") || element.contains("name") || element.contains("id")
|| element.contains("summary") || element.contains("entry"))
return true;
return false;
}
private void printResponse(int statusCode, String body, PrintWriter out) throws IOException {
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
Date date = new Date();
out.print("\nHTTP/1.1 " String.valueOf(statusCode) " " getStatusMessage(statusCode) "\r\n");
out.print("Date: " formatter.format(date) "\r\n");
out.print("Content-Length: " String.valueOf(body.length()) "\r\n");
out.print("Content-Type: application/atom xml\r\n");
out.print("Connection: keep-alive\r\n");
if (body != null)
out.print(body "\r\n");
out.print("\r\n");
out.flush();
}
// Method for get message for status code
private String getStatusMessage(int statusCode) {
switch (statusCode) {
case 201:
return "HTTP_CREATED";
case 200:
return "OK";
case 400:
return "BAD REQUEST";
case 204:
return "NO CONTENT";
case 500:
return "INTERNAL SERVER ERROR";
default:
return "UNRECOGNIZED STATUS CODE";
}
}
// I'M SUSPECTING ERROR OCCUR HERE
// Method to handle PUT
private void handlePUT(BufferedReader in, PrintWriter out) throws IOException {
// Get the headers and the body
HashMap<String, String> headers = parseHeader(in);
String body = parseBody(in, headers);
// Extract the body into feeds by "entry"
String[] feeds = extractFeeds(body);
for (int i = 0; i < feeds.length; i ) {
HashMap<String, String> feed = toHashMap(feeds[i]);
// Invalid feed cases
if (feed.get("title") == null || feed.get("link") == null || feed.get("id") == null) {
System.err.println("FEED ERROR! REJECTED FEED WITHOUT TITLE/LINK/ID");
return;
}
}
// Insert or update feeds into database
try {
if (database.insertFeed(this.socket.getRemoteSocketAddress().toString(), body) == 1) {
System.out.println("Insert feed");
printResponse(201, body, out);
} else {
System.out.println("Updated feed");
printResponse(200, body, out);
}
} catch (SQLException e) {
System.err.println("DATABASE ERROR! UNABLE TO COMPLETE PUT REQUEST - MESSAGE: " e.getMessage());
}
}
}
ContentServer.java
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.stream.Collectors;
// Content server whoes will put the file to the aggregation server
public class ContentServer {
public static void main(String[] args) {
// Checking for stdin input
if (args.length != 2) {
System.err.println("Error: ContentServer input via commandline - hostname:portnumber filename");
System.exit(1);
}
// Getting the hostname, port number, and file name
String fileName = args[1];
String[] tmp = args[0].split(":");
String hostName = tmp[0];
int portNumber = Integer.parseInt(tmp[1]);
if (portNumber == -1)
portNumber = 4567; // Default port number
System.out.println("Content Server connect to port: " portNumber);
// Establish socket connection
try (
Socket socket = new Socket(hostName, portNumber);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));) {
persistentConnection(in, out, hostName, fileName);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void sendPutRequest(PrintWriter out, String filename) {
BufferedReader file = null;
try {
// Read the file
InputStream fstream = ClassLoader.getSystemClassLoader().getResourceAsStream(filename);
if (fstream == null)
throw new FileNotFoundException();
file = new BufferedReader(new InputStreamReader(fstream));
String body = file.lines().collect(Collectors.joining("\n"));
// Send the request with the pre-defined format
System.out.println("Sending PUT request");
out.println("PUT /atom.xml HTTP/1.1\r");
out.println("User-Agent: ATOMClient/1/0\r");
out.println("Content-Type: application/atom xml\r");
out.println("Content-Length: " Integer.toString(body.length()) "\r\n");
out.println("<?xml version='1.0' encoding='iso-8859-1' ?>\r");
out.println("<feed xml:lang=\"en-US\" xmlns=\"http://www.w3.org/2005/Atom\">\r");
out.println(body);
out.println("</feed>\r");
out.flush();
} catch (FileNotFoundException e) {
System.err.println("Error while sending PUT request! Can't find file: " filename);
e.printStackTrace();
System.exit(1);
} finally {
try {
if (file != null)
file.close();
} catch (IOException ex) {
System.err.println("Error: " ex.getMessage());
}
}
}
public static void persistentConnection(BufferedReader in, PrintWriter out, String hostName, String fileName) {
Scanner sc = new Scanner(System.in);
boolean running = true;
while (running) {
System.out.print("Enter 1 for terminate connection to server, 2 for sending PUT request: ");
int user_input = sc.nextInt();
sc.nextLine();
switch (user_input) {
case 1:
System.out.println("Terminate connection");
running = false;
break;
case 2:
if (out.checkError())
return;
sendPutRequest(out, fileName);
try {
receiveResponse(in);
} catch (IOException e) {
System.err.println("Error in receiving respond: " e.getMessage());
}
break;
default:
System.out.println("Unable to recognize user input!");
break;
}
}
}
// Receive the response
public static void receiveResponse(BufferedReader in) throws IOException {
String line;
while(in.ready() && (line = in.readLine()) != null) {
System.out.println(line);
}
}
}
Specifically, I'm suspecting I got this uncorrect behavior at ServerThread.java - handlePUT() method and ContentServer.java - receiveRespond() method.
Thanks.
CodePudding user response:
So I figured out what I did wrong. In ContentServer.java, I send a PUT request and ask for a response immediately while the server's response did not reach the end of the pipe. This is what I did:
case 2:
if (out.checkError())
return;
sendPutRequest(out, fileName);
try {
Thread.sleep(500);
receiveResponse(in);
} catch (IOException e) {
System.err.println("Error in receiving respond: " e.getMessage());
} catch (InterruptedException et) {
System.err.println("Error in thread stopping " et.getMessage());
}
break;
I made the thread wait for the response and then print it out.