Home > Net >  Socket blocks sending messages between a Java server and Python client
Socket blocks sending messages between a Java server and Python client

Time:11-12

I need to pass some strings of data between a Java application and a python script on a local Windows machine. So, I decided to do so with a Java socket server that is communicating with TCP to a python client. The Java creates two threads to handle two socket connections over the ports 9998 and 9999 of the localhost. I use port 9998 to handle incoming messages, while I use port 9999 to handle sending messages. My two applications run smoothly for the first few messages sent/received, and at some point, it stops on the call that sends the string from Java to Python. Here is part of my code:

This Java class handles the creation of the socket server and the communication

    public class ServerSocketConnection {

    private int port;
    private Socket socket;
    private ServerSocket serverSocket;
    private Logger logger;
    private BufferedWriter out;
    private BufferedReader in;

    public ServerSocketConnection(int port) {
        this.port = port;
        logger = App.getLogger();
    }

    // Create a server for a socket connection
    public void createServer() {
        try {
            // Create a server socket
            serverSocket = new ServerSocket(port);
            // Socket creation
            socket = serverSocket.accept();
            // Create a print writer
            out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            // Create a buffered reader
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        } catch (IOException e) {
            logger.severe("Error creating server socket");
        }
    }

    // Close the server socket
    public void closeServer() {
        try {
            serverSocket.close();
        } catch (IOException e) {
            logger.severe("Error closing server socket");
        }
    }

    public void sendMessage(String message) {
        try {
            // Sending the byte lenght of the message
            byte[] ptext = message.getBytes("UTF-16");
            send(String.valueOf(ptext.length));
            // Sending the message
            send(message);
        } catch (IOException e) {
            logger.severe("Error sending message:"   e.getMessage());
        }
    }

    private void send(String message) throws IOException {
        out.write(message);
        out.newLine();
        out.flush();
    }

    public String receiveMessage() {
        try {
            return in.readLine();
        } catch (IOException e) {
            logger.severe("Error receiving message");
            return null;
        }
    }

This is the Java Thread handling the sending of the messages. It gets the message to send from a Queue that is shared between other threads.

public class SendToPlatform implements Runnable {

    private static final int PORT = 9999;
    private Thread worker;
    private AtomicBoolean running;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private BlockingQueue<String> queueOut;
    private Logger logger;
    private ServerSocketConnection serverSocketConnection;

    public SendToPlatform(BlockingQueue<String> queueOut, AtomicBoolean running) {
        this.queueOut = queueOut;
        this.running = running;
        this.logger = App.getLogger();
        serverSocketConnection = new ServerSocketConnection(PORT);
    }

    public void run() {
        stopped.set(false);
        serverSocketConnection.createServer();
        while (running.get()) {
            socketSender();
        }
        stopped.set(true);
    }

    private void socketSender() {
        if (!queueOut.isEmpty()) {
            String element = null;
            try {
                element = queueOut.poll(1000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.severe("SendToPlatform: InterruptedException: "   e.getMessage());
            }
            serverSocketConnection.sendMessage(element);
        }
    }
}

This is the python thread that is used to receive the message from the Java socket server:

    def __socket_reading_no_server(self, queue_input : queue.Queue, is_running : bool):
        HOST = "localhost"
        PORT = 9999
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((HOST, PORT))

        while is_running:
            data = s.recv(4)
            message_size = int(data.decode('UTF-8').strip())
            data = s.recv(min(message_size   2, 1024))
            message = data.decode('UTF-8').strip()
            queue_input.put(message)
        s.close()

And this method is lanched as a thread with these instructions:

input_thread = threading.Thread(target=self.__socket_reading_no_server , args =(self.__queue_input, self.__running, ), daemon=True)
input_thread.start()

By debugging, logging, and using Wireshark to understand the problem in my code, I concluded that I have a recurrent problem with the out.write instruction that blocks while sending the message after around 10 messages are sent correctly. The pending message gets released when I close the socket connection. I tried using PrintWriter and DataOutputStream instead of BufferedWriter, but the same problem occurred. I tried not sending the length of the message before sending the string to adapt the s.recv() size, but the same problem occurred. I'm new to socket programming, and probably I did something incredibly wrong, but I cannot find where the problem is. Maybe is there a better way to pass data between processes that I'm unaware of that I can use for my needs instead of sockets?

CodePudding user response:

Here's a "cut down" implementation of a Java client and Python server that demonstrates an efficient mechanism for sending messages (in this case strings) of arbitrary length:

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class Client {

    public static void main(String[] args) throws IOException {
        String message = "Talking isn't doing. It is a kind of good deed to say well; and yet words are not deeds.";
        try (Socket socket = new Socket("localhost", 7070)) {
            try (DataOutputStream out = new DataOutputStream(socket.getOutputStream())) {
                byte[] dts = message.getBytes();
                out.writeInt(dts.length);
                out.write(dts);
            }
        }
    }
}

Note how the client sends the length of the upcoming message as a 32-bit integer before sending the actual message.

from multiprocessing.connection import Listener

def main():
    listener = Listener(address=('0.0.0.0', 7070), family='AF_INET', authkey=None)
    with listener.accept() as conn:
        print(conn.recv_bytes().decode())
            

if __name__ == '__main__':
    main()

The connection class expects to receive messages using the protocol implemented on the client side (Java in this case) - i.e., a 32-bit integer that gives the amount of data to follow.

I hope this clarifies the matter

CodePudding user response:

TL;DR , see Codes Corrections below.


Before any of your questions, there are some points that need to be concerned with:

  1. Change your server's data encoding from UTF-16 to UTF-8. WHY? Data transmission between any endpoints relies on the consistency, that is, data encoding for server/client applications. While your server (ServerSocketConnection) is sending messages encoded with UTF-16, your client (__socket_reading_no_server) is receiving message with UTF-8. Even your client is capable of receiving all the messages from server, it cannot recognize it at all. For example, UTF-16 encodes string "5" into bytes [0,53], for UTF-8 the result is [53] (assuming big-endian byte ordering). Refer to Wikipedia for more details.
  2. Do not use out.newLine(). Use out.write("\r\n") instead. WHY? newline()'s behavior is platform-dependent, which results in returning one or two characters, for Unix-like OS or Windows OS, respectively. Because it relies on a system's line.separator property, you can refer to Java doc for more detail information.
  3. Your data = s.recv(4) put a constraint that you client must read at most 4 bytes at a time, which is dangerous. WHY? Since according to Python doc, your 4 is not the actual bytes that client received, but the maximum amount of data to be received. Furthermore, the client would theoretically only able to receive up to 9999 bytes (byte1~4: '9') of a incoming message.

For your question "...instruction that blocks while sending the...": Unfortunately, there are no error messages provided here, and we cannot precisely deduce which part of the whole thing went wrong. However, we can infer that, it is more likely the consequence of java's implementation for plain socket, since the scenario you mentioned is probably rather unusual in C network programming with plain socket, that is, blocking won't happen during the continuous transmission of bytes, according to POSIX definitions for write system call:

Upon successful completion, write() and pwrite() shall return the number of bytes actually written to the file associated with fildes. This number shall never be greater than nbyte. Otherwise, -1 shall be returned and errno set to indicate the error.

That is, after invoking write to send bytes to stream buffer, it would just return something, instead of blocking. Note that, most high-level languages eventually invokes write system call to send bytes.

This dosen't mean that java's fault, instead, we should use sockets correctly, then the obscure error will disappear. For example, according to my tests, after applying the following corrections, your application just works fine:

Codes Corrections:

  1. ServerSocketConnection / byte[] ptext = message.getBytes("UTF-16"); -> byte[] ptext = message.getBytes("UTF-8");
  2. ServerSocketConnection / send(String.valueOf(ptext.length)); -> send(String.format("-",ptext.length));
  3. ServerSocketConnection / out.newLine() -> out.write("\r\n")

Tests:
server:

BlockingQueue<String> q = new ArrayBlockingQueue<String>(20);
q.add("str 1");
q.add("str 2");
q.add("str 3");
serverSocketConnection.sendMessage(element);
logger.info("element:" element);  // debug
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 1
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 2
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 3

client:

print("message: %s" % message)   # debug
# queue_input.put(message)
message: str 1
message: str 2
message: str 3
  • Related