I am implementing a Transfer Server program which takes messages from clients (via console input) and then forwards it to some sort of mailbox.
To allow concurrent reception of several messages by different clients, I first created a class that implements the Runnable
interface. Each of this class instances will handle the communication with exactly one client:
public class ClientConnection implements Runnable {
//...
//...
@Override
public void run() {
try {
// prepare the input reader and output writer
BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
Message message = new Message();
String request = "";
// read client requests
while ((request = reader.readLine()) != null) {
System.out.println("Client sent the following request: " request);
String response;
if (request.trim().equals("quit")) {
writer.println("ok bye");
return;
}
response = message.parseRequest(request);
if (message.isCompleted()) {
messagesQueue.put(message);
message = new Message();
}
writer.println(response);
}
} catch (SocketException e) {
System.out.println("ClientConnection: SocketException while handling socket: " e.getMessage());
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
System.out.println("Client Connection was interrupted!");
e.printStackTrace();
} finally {
if (clientSocket != null && !clientSocket.isClosed()) {
try {
clientSocket.close();
} catch (IOException ignored) {}
}
}
}
}
I do have a parent thread which is responsible for starting and managing all the ClientConnection runnables:
@Override
public void run() {
clientConnectionExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
while (true) {
Socket clientSocket;
try {
// wait for a Client to connect
clientSocket = serverSocket.accept();
ClientConnection clientConnection = new ClientConnection(clientSocket, messagesQueue);
clientConnectionExecutor.execute(clientConnection);
} catch (IOException e) {
// when this exception occurs, it means that we want to shut down everything
clientConnectionExecutor.shutdownNow(); // force terminate all ClientConnections
return;
}
}
}
Now according to this Stackoverflow Question, I would have expected that as soon as shutdownNow();
is being called, an InterruptedException
would be thrown within my ClientConnection.run()
method, and there, it should print Client Connection was interrupted!
. But this does not happen, so the catch clause seems never to be reached, the input reading loop just goes on.
I read in another Stackoverflow question that this might be related to some other codeline within the block seems to be consuming the InterruptedException
, but there wasn't any particular information on what codeline could do that. So I am thankful for any hints.
Edit: It turns out that as soon as I manually exit the loop by typing "quit" on the client, the loop will quit and then, Client Connection was interrupted!
will be printed. So somehow the exception seems to be ignored as long as the loop is running, and only handled afterwards.
CodePudding user response:
From Oracle docs for shutdownNow
:
There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.
If you take a look into ThreadPoolExecutor
sources, you will find out that shutdownNow
interrupts threads with this code:
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
Your ClientConnection
doesn't check the flag Thread.interrupted
. Due to information in the post, I can't figure out which method throws InterruptedException
. Probably, some other method, for example, readLine
of reader or writer, blocks the thread, because they use socket's InputStream
and OutputStream
and because it's obvious that socket's streams block the thread if data is not immediatly available.
For example, I wrote this code to test it:
class Example {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try(ServerSocket serverSocket = new ServerSocket()) {
serverSocket.bind(new InetSocketAddress(8080));
Socket socket = serverSocket.accept();
int dataByte = socket.getInputStream().read();
System.out.println(dataByte);
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
thread.interrupt();
}
}
On OpenJdk-16.0.2 there is no actual interruption.
I see two possible solutions for your problem:
Check
Thread.interrupted
inside thewhile
loop if you are sure thatSocket
doesn't block your thread.If your are not sure, use
SocketChannel
in non-blocking mode instead ofSocket
for checkingThread.interrupted
manually.
For the second way I tranformed my example into this:
class Example {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try(ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
serverSocket.configureBlocking(false);
serverSocket.bind(new InetSocketAddress(8080));
SocketChannel socket = null;
while (socket == null) {
socket = serverSocket.accept();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socket.read(byteBuffer);
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.flip();
byteBuffer.get(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
System.out.println("Interrupted successfully");
}
});
thread.start();
thread.interrupt();
}
}
It works.
Good luck with Java :)
CodePudding user response:
I would have expected that as soon as shutdownNow(); is being called, an InterruptedException would be thrown within my ClientConnection.run()
Your messagesQueue
should be a BlockingQueue
. So messagesQueue.put(message)
will make you need to catch an Interrupted
exception. So only when the thread is blocked in the put
method(queue
is full), you call threadpool#shutdownNow
, then the thread will receive an Interrupted
exception. In other cases, thread will not receive this Interrupted
exception.
You can change while ((request = reader.readLine()) != null)
to while ((request = reader.readLine()) != null && !Thread.interrupted())
.
Another solution is to maintain all client sockets, and close all client sockets when you need to close them, this way, the client thread will directly receive an IOException
:
List<Socket> clientSockets = new ArrayList<>();
while (true) {
try {
Socket accept = serverSocket.accept();
clientSockets.add(accept);
executorService.submit(new ClientConnection(accept));
}catch (Exception e) {
for (Socket socket : clientSockets) {
try {
socket.close();
} catch (Exception exception) {
//
}
}
//executorService.shutdownNow();
}
}