Context
I am writing a Netty application (Netty 4) in which the processing of each message might take some time. As an example of what I mean I have created an EchoHandler
that responds with the original message, though sometimes after a short delay:
public class EchoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
if (message.equals("delay")) {
ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
}
}
}
Problem
Using this code, the order of the responses is not guaranteed to be the same as the order of the incoming requests. In fact, if the first message consists of the string "delay" and the second of some other string, the order of the responses will be reversed! I have written a test to illustrate this:
public class Tests {
@Test
public void test() throws ExecutionException, InterruptedException {
var channel = new EmbeddedChannel(new EchoHandler());
channel.writeInbound("delay", "second message");
// Let some time pass and process any scheduled tasks
Thread.sleep(500);
channel.runPendingTasks();
// The response to the last message comes first!
var firstMessage = (String) channel.readOutbound();
Assertions.assertEquals("second message", firstMessage);
// The response to the first message comes second!
var secondMessage = (String) channel.readOutbound();
Assertions.assertEquals("delay", secondMessage);
}
}
Question
I am looking for a built-in way in Netty to guarantee the order of the outgoing responses matches the order of the incoming messages. Using an imaginary extension of the library, I would rewrite the handler as follows:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
if (message.equals("delay")) {
ctx.executor().schedule(() -> {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}, 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}
}
Does Netty provide something like this out of the box? It sounds like a common use case and I'd much rather rely on battle-tested code than writing my own queuing implementation.
CodePudding user response:
Though not built-in, a seemingly idiomatic solution for this problem is a custom MessageToMessageCodec
. Based on the example which uses String
as message type, I wrote the following codec which handles queuing for you:
public class QueuingCodec extends MessageToMessageCodec<String, String> {
private final Queue<String> messageQueue = new ArrayDeque<>();
private boolean processingMessage = false;
@Override
protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception {
// Pass the message on to output
list.add(s);
// Allow processing more messages
processingMessage = false;
// Send the next message in the queue if available
if (!messageQueue.isEmpty()) {
processingMessage = true;
var message = messageQueue.poll();
// Pass the message on to input
ctx.executor().execute(() -> {
ctx.fireChannelRead(message);
});
}
}
@Override
protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception {
if (processingMessage) {
// Store message for later processing
messageQueue.add(s);
} else {
// Pass data on to input
list.add(s);
// Prevent processing of new messages
processingMessage = true;
}
}
}