Home > OS >  Connection Reset error when closing AsynchronousSocketChannel
Connection Reset error when closing AsynchronousSocketChannel

Time:07-18

I'm experimenting with a simple HTTP1.1 server in Kotlin in order to try and reproduce a more complex problem where content seems to get cut off when closing the AsynchronousSocketChannel

net::ERR_CONTENT_LENGTH_MISMATCH 200 (OK)

First a helper function to make debugging easier:

@Suppress("NOTHING_TO_INLINE")
inline fun ByteArray.debug(): String = this.map {
    when (it) {
        '\t'.code.toByte() -> "\\t"
        '\r'.code.toByte() -> "\\r"
        '\n'.code.toByte() -> "\\n\n"
        else -> "${it.toInt().toChar()}"
    }
}.toTypedArray().joinToString(separator = "") { it }

and here's the whole simplified web server with all its imports which you should be able to copy and paste into a .kt file to run:

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.concurrent.TimeUnit

fun main() {

    val len = 1_000_000

    val server =
        AsynchronousServerSocketChannel.open().bind(InetSocketAddress(5555))

    while (true) {
        val channel = server.accept().get()

        val body = "#".repeat(len)
        val headers = mutableListOf<String>()
        headers.add("HTTP/1.1 200 OK")
        headers.add("Server: Test Server")
        headers.add("Content-Type: text/plain")
        headers.add("Connection: close")
        headers.add("Content-Length: ${body.length}")
        val header = headers.joinToString(separator = "\r\n")   "\r\n\r\n"

        println("==== header (size=${header.toByteArray().size})")
        println(header.toByteArray().debug())
        println("==== body (size=${body.toByteArray().size})")
        val data = "$header$body".toByteArray()
        println("==== data (size=${data.size})")

        println(data.debug())
        channel.write(
            ByteBuffer.wrap(data),
            30,
            TimeUnit.SECONDS,
            channel,
            object : CompletionHandler<Int?, AsynchronousSocketChannel> {
                override fun completed(result: Int?, channel: AsynchronousSocketChannel) {
                    println(result)
                    channel.close()
                }

                override fun failed(exc: Throwable?, channel: AsynchronousSocketChannel) {
                    channel.close()
                }
            })

    }

}

Running it and opening the browser at localhost:5555, I'm greeted by a Connection Reset

enter image description here

Looking at the browser Network console, I can see that the response headers look correct:

enter image description here

Looking at the output, I can see that it too is correct and matches what we're seeing in the browser Network Console, and the 1000110 at the end which is printed inside the Completion Handler, matches the total size of the data yet nothing renders and the browser complains about a connection reset.

==== header (size=110)
HTTP/1.1 200 OK\r\n
Server: Test Server\r\n
Content-Type: text/plain\r\n
Connection: close\r\n
Content-Length: 1000000\r\n
\r\n

==== body (size=1000000)
==== data (size=1000110)
HTTP/1.1 200 OK\r\n
Server: Test Server\r\n
Content-Type: text/plain\r\n
Connection: close\r\n
Content-Length: 1000000\r\n
\r\n
#####################################################################################.......
1000110

If I add a Thread.sleep before the channel.close(), it works correctly, but then obviously the browser will wait for a full second before the connection is available again, so it's definitely not a solution.

channel.write(
    ByteBuffer.wrap(data),
    30,
    TimeUnit.SECONDS,
    channel,
    object : CompletionHandler<Int?, AsynchronousSocketChannel> {
        override fun completed(result: Int?, channel: AsynchronousSocketChannel) {
            println(result)
            Thread.sleep(1000)
            channel.close()
        }

        override fun failed(exc: Throwable?, channel: AsynchronousSocketChannel) {
            channel.close()
        }
    })

enter image description here

One of the responses suggested to call channel.shutdownOutput() followed by a channel.read() before calling close()

override fun completed(result: Int?, channel: AsynchronousSocketChannel) {
    println(result)
    channel.shutdownOutput()
    channel.read(ByteBuffer.allocate(1)).get()
    channel.close()
}

if I'm using allocate(1), it doesn't fix the issue, but allocate(very-big-number) does work which is actually no different than just calling Thread.sleep here.

If I deploy it to AWS with a short Thread.sleep behind a load balancer, I run into a net::ERR_CONTENT_LENGTH_MISMATCH 200 (OK) which means it's getting some data written, but the stream gets cut off before the load balancer could read all the data, effectively the same as the net::ERR_CONNECTION_RESET 200 (OK) error.

enter image description here

What is the correct way to close the AsynchronousSocketChannel without hitting connection reset errors or content length mismatch errors in the browser?

EDIT: here's the more full-fledged demo where I can still reproduce the error. In this example I'm first reading the request before writing a response. To make it more readable, I've wrapped the completion handlers in suspendCoroutine so that I can just call readingSuspending and writeSuspending.

import kotlinx.coroutines.*
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.concurrent.TimeUnit
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

object Test {

    val len = 1_000_000_000

    suspend fun AsynchronousServerSocketChannel.acceptSuspending() =
        suspendCoroutine<AsynchronousSocketChannel> { continuation ->
            this.accept(
                null, object : CompletionHandler<AsynchronousSocketChannel, Nothing?> {
                    override fun completed(result: AsynchronousSocketChannel, attachment: Nothing?) {
                        continuation.resume(result)
                    }

                    override fun failed(exc: Throwable, attachment: Nothing?) {
                        continuation.resumeWithException(exc)
                    }
                })
        }

    suspend fun AsynchronousSocketChannel.writeSuspending(
        buffer: ByteBuffer,
        timeout: Duration = 60.seconds,
        closeWhenDone: Boolean = false,
    ) = suspendCoroutine<Int> { continuation ->
        this.write(buffer, timeout.inWholeSeconds, TimeUnit.SECONDS, null, object : CompletionHandler<Int, Nothing?> {
            override fun completed(size: Int, attachment: Nothing?) {
                continuation.resume(size)
            }

            override fun failed(exc: Throwable, attachment: Nothing?) {
                continuation.resumeWithException(exc)
            }
        })
    }

    suspend fun AsynchronousSocketChannel.readSuspending(
        buffer: ByteBuffer,
        timeout: Duration = 5.seconds,
    ) = suspendCoroutine<Int> { continuation ->
        this.read(buffer, timeout.inWholeSeconds, TimeUnit.SECONDS, null, object : CompletionHandler<Int, Nothing?> {
            override fun completed(size: Int, attachment: Nothing?) {
                continuation.resume(size)
            }

            override fun failed(exc: Throwable, attachment: Nothing?) {
                continuation.resumeWithException(exc)
            }
        }
        )
    }

    @JvmStatic
    fun main(args: Array<String>) = runBlocking(Dispatchers.Default) {

        val server = withContext(Dispatchers.IO) {
            AsynchronousServerSocketChannel.open().bind(InetSocketAddress(5555))
        }

        while (true) {
            val channel = server.acceptSuspending()
            supervisorScope {
                launch {
                    val buffer = ByteBuffer.allocate(1000)

                    // reading
                    do {
                        val size = channel.readSuspending(buffer.rewind(), 30.seconds)
                        println(String(buffer.array().sliceArray(0..size)))
                    } while (!buffer.hasRemaining())

                    // build response
                    val body = "#".repeat(len)
                    val headers = mutableListOf<String>()
                    headers.add("HTTP/1.1 200 OK")
                    headers.add("Server: Test Server")
                    headers.add("Content-Type: text/plain")
                    headers.add("Content-Length: ${body.length}")
                    val header = headers.joinToString(separator = "\r\n")   "\r\n\r\n"
                    val data = "$header$body".toByteArray()

                    // writing
                    channel.writeSuspending(ByteBuffer.wrap(data), 30.seconds)
                    withContext(Dispatchers.IO) {
                        channel.close()
                    }
                }
            }
        }
    }
}
 

CodePudding user response:

Not a socket programmer, just some thoughts...

Code is actually synchronous and all logic needs to be implemented inside CompletionHandler, however I believe it demonstrates the issue.

public static void main(String[] args) throws Exception {
    int len = 50_000_000;

    AsynchronousServerSocketChannel server =
            AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5555));

    while (true) {
        try (AsynchronousSocketChannel channel = server.accept().get()) {
            // reading client headers, without that mitigation
            // of the race condition at the bottom of loop fails
            // one the other hand, I failed to reproduce the
            // problem when bottom read is commented out 
            System.out.println("Header: "   channel.read(ByteBuffer.allocate(4096)).get());
            StringBuilder body = new StringBuilder("HTTP/1.1 200 OK\r\n"  
                    "server: Cloud Auctioneers\r\n"  
                    "content-type: text/plain\r\n"  
                    // tell client to close socket
                    "connection: close\r\n"  
                    "content-length: "   len   "\r\n\r\n");
            for (int i = 0; i < len; i  ) {
                body.append('#');
            }
            ByteBuffer buff = ByteBuffer.wrap(body.toString().getBytes());
            // according to javadoc write method does not guarantee
            // that it will send the all data to client, at least
            // without this loop client receives just 256K on my laptop
            while (buff.hasRemaining()) {
                Integer written = channel.write(buff).get();
                System.out.println("Written: "   written);
                // not sure here about null
                if (written == null) {
                    break;
                }
            }
            // here we are trying to mitigate race condition between try-with-resources
            // and delivering pending data. according to man 2 send it sends data
            // asynchronously and there is no way to understand whether that was
            // successful or not - trying to read something from socket
            System.out.println("Footer: "   channel.read(ByteBuffer.allocate(4096)).get());
        }
    }
}

UPD. Have performed some research on the topic, below are some thoughts:

  1. in case of writing to socket the code snippet like below:
public <A> void completed(Integer result, A attachment) {
    if (result < 0) {
        // socket closed
        channel.close();
        return;
    }
    if (buffer.hasRemaining()) {
        channel.write(buffer, null, this);
    }
} 

seems to be mandatory due to following:

  • it seems that java.nio does not route all errors to CompletionHandler#failed, have no idea why, that is what I observe
  • it does not send full buffer to client - it could be reasonable because it is always a question what to do: compact buffer and populate it by pending data or send a reminder to client, however I would prefer to have a flag defining a behaviour in AsynchronousSocketChannel#write
  1. In regard to the question who is responsible to close connection - the answer depends on the version of HTTP protocol (e.g. client):
  • in case of HTTP/1.1 server may request client to close connection by sending HTTP header Connection: close

  • in case of HTTP/1.0 server must to close connection by its own

    from infrastructure perspective the difference between those two cases is following: the side which initiates shutdown gets socket in TIME_WAIT state, and in case of server it is undesirable, so we also need to set SO_REUSEADDR to true (SO_LINGER is not supported in AsynchronousSocketChannel)

so, the implementation ultimately depends on the version on HTTP protocol, and here I would prefer to use existing library like netty rather than solving those http and java.nio puzzles, however the general idea is:

  • read HTTP headers till '\r\n\r\n' or '\n\n', determine client version
  • keep track of the amount of data written to client and reiterate
  • close channel if IO returned -1 or exception thrown
  • if client is HTTP/1.1 send Connection: close and wait for -1
  • if client is HTTP/1.0 close channel
  • Related