Home > Software design >  ThreadPoolExecutor.shutdownNow() not throwing InterruptedException in Thread
ThreadPoolExecutor.shutdownNow() not throwing InterruptedException in Thread

Time:12-22

I am implementing a Transfer Server program which takes messages from clients (via console input) and then forwards it to some sort of mailbox.

To allow concurrent reception of several messages by different clients, I first created a class that implements the Runnable interface. Each of this class instances will handle the communication with exactly one client:

public class ClientConnection implements Runnable {

    //...

    //...

    @Override
    public void run() {
        try {
            // prepare the input reader and output writer
            BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);

            Message message = new Message();
            String request = "";

            // read client requests
            while ((request = reader.readLine()) != null) {

                System.out.println("Client sent the following request: "   request);
                String response;
                if (request.trim().equals("quit")) {
                    writer.println("ok bye");
                    return;
                }

                response = message.parseRequest(request);
                if (message.isCompleted()) {
                    messagesQueue.put(message);
                    message = new Message();
                }
                writer.println(response);
            }

        } catch (SocketException e) {
            System.out.println("ClientConnection: SocketException while handling socket: "   e.getMessage());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        } catch (InterruptedException e) {
            System.out.println("Client Connection was interrupted!");
            e.printStackTrace();
        } finally {
            if (clientSocket != null && !clientSocket.isClosed()) {
                try {
                    clientSocket.close();
                } catch (IOException ignored) {}
            }

        }

    }
}

I do have a parent thread which is responsible for starting and managing all the ClientConnection runnables:

@Override
public void run() {

    clientConnectionExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    while (true) {

        Socket clientSocket;

        try {
            // wait for a Client to connect
            clientSocket = serverSocket.accept();

            ClientConnection clientConnection = new ClientConnection(clientSocket, messagesQueue);
            clientConnectionExecutor.execute(clientConnection);

        } catch (IOException e) {
            // when this exception occurs, it means that we want to shut down everything
            clientConnectionExecutor.shutdownNow();  // force terminate all ClientConnections
            return;
        }
    }
}

Now according to this Stackoverflow Question, I would have expected that as soon as shutdownNow(); is being called, an InterruptedException would be thrown within my ClientConnection.run() method, and there, it should print Client Connection was interrupted!. But this does not happen, so the catch clause seems never to be reached, the input reading loop just goes on.

I read in another Stackoverflow question that this might be related to some other codeline within the block seems to be consuming the InterruptedException, but there wasn't any particular information on what codeline could do that. So I am thankful for any hints.

Edit: It turns out that as soon as I manually exit the loop by typing "quit" on the client, the loop will quit and then, Client Connection was interrupted! will be printed. So somehow the exception seems to be ignored as long as the loop is running, and only handled afterwards.

CodePudding user response:

From Oracle docs for shutdownNow:

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.

If you take a look into ThreadPoolExecutor sources, you will find out that shutdownNow interrupts threads with this code:

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

Your ClientConnection doesn't check the flag Thread.interrupted. Due to information in the post, I can't figure out which method throws InterruptedException. Probably, some other method, for example, readLine of reader or writer, blocks the thread, because they use socket's InputStream and OutputStream and because it's obvious that socket's streams block the thread if data is not immediatly available.

For example, I wrote this code to test it:

class Example {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try(ServerSocket serverSocket = new ServerSocket()) {
                serverSocket.bind(new InetSocketAddress(8080));
                Socket socket = serverSocket.accept();
                int dataByte = socket.getInputStream().read();
                System.out.println(dataByte);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        thread.interrupt();
    }
}

On OpenJdk-16.0.2 there is no actual interruption.

I see two possible solutions for your problem:

  1. Check Thread.interrupted inside the while loop if you are sure that Socket doesn't block your thread.

  2. If your are not sure, use SocketChannel in non-blocking mode instead of Socket for checking Thread.interrupted manually.

For the second way I tranformed my example into this:

class Example {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try(ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
                serverSocket.configureBlocking(false);
                serverSocket.bind(new InetSocketAddress(8080));

                SocketChannel socket = null;

                while (socket == null) {
                    socket = serverSocket.accept();

                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                }

                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                socket.read(byteBuffer);
                byte[] bytes = new byte[byteBuffer.limit()];
                byteBuffer.flip();
                byteBuffer.get(bytes);
                System.out.println(new String(bytes, StandardCharsets.UTF_8));
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                System.out.println("Interrupted successfully");
            }
        });
        thread.start();
        thread.interrupt();
    }
}

It works.

Good luck with Java :)

CodePudding user response:

I would have expected that as soon as shutdownNow(); is being called, an InterruptedException would be thrown within my ClientConnection.run()

Your messagesQueue should be a BlockingQueue. So messagesQueue.put(message) will make you need to catch an Interrupted exception. So only when the thread is blocked in the put method(queue is full), you call threadpool#shutdownNow, then the thread will receive an Interrupted exception. In other cases, thread will not receive this Interrupted exception.

You can change while ((request = reader.readLine()) != null) to while ((request = reader.readLine()) != null && !Thread.interrupted()).

Another solution is to maintain all client sockets, and close all client sockets when you need to close them, this way, the client thread will directly receive an IOException:

        List<Socket> clientSockets = new ArrayList<>();
        while (true) {
            try {
                Socket accept = serverSocket.accept();
                clientSockets.add(accept);
                executorService.submit(new ClientConnection(accept));
            }catch (Exception e) {
                for (Socket socket : clientSockets) {
                    try {
                        socket.close();
                    } catch (Exception exception) {
                        //
                    }
                }
                //executorService.shutdownNow();
            }
        }
  • Related