I'm experimenting with a simple HTTP1.1 server in Kotlin in order to try and reproduce a more complex problem where content seems to get cut off when closing the AsynchronousSocketChannel
net::ERR_CONTENT_LENGTH_MISMATCH 200 (OK)
First a helper function to make debugging easier:
@Suppress("NOTHING_TO_INLINE")
inline fun ByteArray.debug(): String = this.map {
when (it) {
'\t'.code.toByte() -> "\\t"
'\r'.code.toByte() -> "\\r"
'\n'.code.toByte() -> "\\n\n"
else -> "${it.toInt().toChar()}"
}
}.toTypedArray().joinToString(separator = "") { it }
and here's the whole simplified web server with all its imports which you should be able to copy and paste into a .kt
file to run:
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.concurrent.TimeUnit
fun main() {
val len = 1_000_000
val server =
AsynchronousServerSocketChannel.open().bind(InetSocketAddress(5555))
while (true) {
val channel = server.accept().get()
val body = "#".repeat(len)
val headers = mutableListOf<String>()
headers.add("HTTP/1.1 200 OK")
headers.add("Server: Test Server")
headers.add("Content-Type: text/plain")
headers.add("Connection: close")
headers.add("Content-Length: ${body.length}")
val header = headers.joinToString(separator = "\r\n") "\r\n\r\n"
println("==== header (size=${header.toByteArray().size})")
println(header.toByteArray().debug())
println("==== body (size=${body.toByteArray().size})")
val data = "$header$body".toByteArray()
println("==== data (size=${data.size})")
println(data.debug())
channel.write(
ByteBuffer.wrap(data),
30,
TimeUnit.SECONDS,
channel,
object : CompletionHandler<Int?, AsynchronousSocketChannel> {
override fun completed(result: Int?, channel: AsynchronousSocketChannel) {
println(result)
channel.close()
}
override fun failed(exc: Throwable?, channel: AsynchronousSocketChannel) {
channel.close()
}
})
}
}
Running it and opening the browser at localhost:5555
, I'm greeted by a Connection Reset
Looking at the browser Network console, I can see that the response headers look correct:
Looking at the output, I can see that it too is correct and matches what we're seeing in the browser Network Console, and the 1000110
at the end which is printed inside the Completion Handler, matches the total size of the data yet nothing renders and the browser complains about a connection reset.
==== header (size=110)
HTTP/1.1 200 OK\r\n
Server: Test Server\r\n
Content-Type: text/plain\r\n
Connection: close\r\n
Content-Length: 1000000\r\n
\r\n
==== body (size=1000000)
==== data (size=1000110)
HTTP/1.1 200 OK\r\n
Server: Test Server\r\n
Content-Type: text/plain\r\n
Connection: close\r\n
Content-Length: 1000000\r\n
\r\n
#####################################################################################.......
1000110
If I add a Thread.sleep before the channel.close()
, it works correctly, but then obviously the browser will wait for a full second before the connection is available again, so it's definitely not a solution.
channel.write(
ByteBuffer.wrap(data),
30,
TimeUnit.SECONDS,
channel,
object : CompletionHandler<Int?, AsynchronousSocketChannel> {
override fun completed(result: Int?, channel: AsynchronousSocketChannel) {
println(result)
Thread.sleep(1000)
channel.close()
}
override fun failed(exc: Throwable?, channel: AsynchronousSocketChannel) {
channel.close()
}
})
One of the responses suggested to call channel.shutdownOutput() followed by a channel.read() before calling close()
override fun completed(result: Int?, channel: AsynchronousSocketChannel) {
println(result)
channel.shutdownOutput()
channel.read(ByteBuffer.allocate(1)).get()
channel.close()
}
if I'm using allocate(1)
, it doesn't fix the issue, but allocate(very-big-number)
does work which is actually no different than just calling Thread.sleep here.
If I deploy it to AWS with a short Thread.sleep behind a load balancer, I run into a
net::ERR_CONTENT_LENGTH_MISMATCH 200 (OK)
which means it's getting some data written, but the stream gets cut off before the load balancer could read all the data, effectively the same as the net::ERR_CONNECTION_RESET 200 (OK)
error.
What is the correct way to close the AsynchronousSocketChannel without hitting connection reset errors or content length mismatch errors in the browser?
EDIT: here's the more full-fledged demo where I can still reproduce the error. In this example I'm first reading the request before writing a response. To make it more readable, I've wrapped the completion handlers in suspendCoroutine
so that I can just call readingSuspending and writeSuspending.
import kotlinx.coroutines.*
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.concurrent.TimeUnit
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
object Test {
val len = 1_000_000_000
suspend fun AsynchronousServerSocketChannel.acceptSuspending() =
suspendCoroutine<AsynchronousSocketChannel> { continuation ->
this.accept(
null, object : CompletionHandler<AsynchronousSocketChannel, Nothing?> {
override fun completed(result: AsynchronousSocketChannel, attachment: Nothing?) {
continuation.resume(result)
}
override fun failed(exc: Throwable, attachment: Nothing?) {
continuation.resumeWithException(exc)
}
})
}
suspend fun AsynchronousSocketChannel.writeSuspending(
buffer: ByteBuffer,
timeout: Duration = 60.seconds,
closeWhenDone: Boolean = false,
) = suspendCoroutine<Int> { continuation ->
this.write(buffer, timeout.inWholeSeconds, TimeUnit.SECONDS, null, object : CompletionHandler<Int, Nothing?> {
override fun completed(size: Int, attachment: Nothing?) {
continuation.resume(size)
}
override fun failed(exc: Throwable, attachment: Nothing?) {
continuation.resumeWithException(exc)
}
})
}
suspend fun AsynchronousSocketChannel.readSuspending(
buffer: ByteBuffer,
timeout: Duration = 5.seconds,
) = suspendCoroutine<Int> { continuation ->
this.read(buffer, timeout.inWholeSeconds, TimeUnit.SECONDS, null, object : CompletionHandler<Int, Nothing?> {
override fun completed(size: Int, attachment: Nothing?) {
continuation.resume(size)
}
override fun failed(exc: Throwable, attachment: Nothing?) {
continuation.resumeWithException(exc)
}
}
)
}
@JvmStatic
fun main(args: Array<String>) = runBlocking(Dispatchers.Default) {
val server = withContext(Dispatchers.IO) {
AsynchronousServerSocketChannel.open().bind(InetSocketAddress(5555))
}
while (true) {
val channel = server.acceptSuspending()
supervisorScope {
launch {
val buffer = ByteBuffer.allocate(1000)
// reading
do {
val size = channel.readSuspending(buffer.rewind(), 30.seconds)
println(String(buffer.array().sliceArray(0..size)))
} while (!buffer.hasRemaining())
// build response
val body = "#".repeat(len)
val headers = mutableListOf<String>()
headers.add("HTTP/1.1 200 OK")
headers.add("Server: Test Server")
headers.add("Content-Type: text/plain")
headers.add("Content-Length: ${body.length}")
val header = headers.joinToString(separator = "\r\n") "\r\n\r\n"
val data = "$header$body".toByteArray()
// writing
channel.writeSuspending(ByteBuffer.wrap(data), 30.seconds)
withContext(Dispatchers.IO) {
channel.close()
}
}
}
}
}
}
CodePudding user response:
Not a socket programmer, just some thoughts...
Code is actually synchronous and all logic needs to be implemented inside CompletionHandler
, however I believe it demonstrates the issue.
public static void main(String[] args) throws Exception {
int len = 50_000_000;
AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5555));
while (true) {
try (AsynchronousSocketChannel channel = server.accept().get()) {
// reading client headers, without that mitigation
// of the race condition at the bottom of loop fails
// one the other hand, I failed to reproduce the
// problem when bottom read is commented out
System.out.println("Header: " channel.read(ByteBuffer.allocate(4096)).get());
StringBuilder body = new StringBuilder("HTTP/1.1 200 OK\r\n"
"server: Cloud Auctioneers\r\n"
"content-type: text/plain\r\n"
// tell client to close socket
"connection: close\r\n"
"content-length: " len "\r\n\r\n");
for (int i = 0; i < len; i ) {
body.append('#');
}
ByteBuffer buff = ByteBuffer.wrap(body.toString().getBytes());
// according to javadoc write method does not guarantee
// that it will send the all data to client, at least
// without this loop client receives just 256K on my laptop
while (buff.hasRemaining()) {
Integer written = channel.write(buff).get();
System.out.println("Written: " written);
// not sure here about null
if (written == null) {
break;
}
}
// here we are trying to mitigate race condition between try-with-resources
// and delivering pending data. according to man 2 send it sends data
// asynchronously and there is no way to understand whether that was
// successful or not - trying to read something from socket
System.out.println("Footer: " channel.read(ByteBuffer.allocate(4096)).get());
}
}
}
UPD. Have performed some research on the topic, below are some thoughts:
- in case of writing to socket the code snippet like below:
public <A> void completed(Integer result, A attachment) {
if (result < 0) {
// socket closed
channel.close();
return;
}
if (buffer.hasRemaining()) {
channel.write(buffer, null, this);
}
}
seems to be mandatory due to following:
- it seems that
java.nio
does not route all errors toCompletionHandler#failed
, have no idea why, that is what I observe - it does not send full buffer to client - it could be reasonable because it is always a question what to do:
compact
buffer and populate it by pending data or send a reminder to client, however I would prefer to have a flag defining a behaviour inAsynchronousSocketChannel#write
- In regard to the question who is responsible to close connection - the answer depends on the version of HTTP protocol (e.g. client):
in case of HTTP/1.1 server may request client to close connection by sending HTTP header
Connection: close
in case of HTTP/1.0 server must to close connection by its own
from infrastructure perspective the difference between those two cases is following: the side which initiates shutdown gets socket in
TIME_WAIT
state, and in case of server it is undesirable, so we also need to setSO_REUSEADDR
totrue
(SO_LINGER
is not supported inAsynchronousSocketChannel
)
so, the implementation ultimately depends on the version on HTTP protocol, and here I would prefer to use existing library like netty
rather than solving those http
and java.nio
puzzles, however the general idea is:
- read HTTP headers till '\r\n\r\n' or '\n\n', determine client version
- keep track of the amount of data written to client and reiterate
- close channel if IO returned -1 or exception thrown
- if client is HTTP/1.1 send
Connection: close
and wait for -1 - if client is HTTP/1.0 close channel