Home > Blockchain >  Java NIO asynchronous read and write operations
Java NIO asynchronous read and write operations

Time:12-13

I'm developing for a project a NIO server that takes as input a message from the client containing the times of running for the read and write operations. I have a problem because at the first execution of the client everything works fine, but if I run the client once again the server gets stuck in the writable part. Can you tell me what am I doing wrong? These are my files, thank you in advance.

MyAsyncProcessor.java

public class MyAsyncProcessor {

    enum States {
        Idle,
        Read,
        Write
    }

    ExecutorService pool;
    private Map<Integer, States> socketStates = new HashMap<>();

    public MyAsyncProcessor() {
    }

    public static void main(String[] args) throws IOException {
        new MyAsyncProcessor().process();
    }

    public void process() throws IOException {
        pool = Executors.newFixedThreadPool(2);
        InetAddress host = InetAddress.getByName("localhost");
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(host, 9876));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        SelectionKey key;
        while (true) {
            if (selector.select() > 0) {
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> i = selectedKeys.iterator();
                while (i.hasNext()) {
                    key = i.next();
                    i.remove();
                    MyTask task = new MyTask();
                    if (key.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        System.out.println("Channel hashCode: "   socketChannel.hashCode());
                        socketChannel.register(selector, SelectionKey.OP_READ   SelectionKey.OP_WRITE);
                        socketStates.put(socketChannel.hashCode(), States.Idle);
                        System.out.println("Connection accepted from: "   socketChannel.getLocalAddress());
                    }
                    if (key.isReadable()) {
                        System.out.println("Readable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        States socketState = socketStates.get(socketChannel.hashCode());
                        if (socketState == States.Idle) {
                            socketStates.put(socketChannel.hashCode(), States.Read);
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            try {
                                socketChannel.read(byteBuffer);
                                String result = new String(byteBuffer.array()).trim();
                                String[] words = result.split(" ");
                                int secondsToRead = Integer.parseInt(words[words.length - 2])*1000;
                                int secondsToWrite = Integer.parseInt(words[words.length - 1])*1000;
                                task.setTimeToRead(secondsToRead);
                                task.setTimeToWrite(secondsToWrite);
                                System.out.println(task.getTimeToRead()   " "   task.getTimeToWrite());
                                Runnable h = new MyAsyncReadThread(task);
                                pool.execute(h);
                                socketChannel.register(selector, SelectionKey.OP_WRITE);
                            } catch (Exception e) {
                                System.out.println("Closing Connection Read...");
                            }
                        }
                    }
                    if (key.isWritable()) {
                        System.out.println("Writable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        States socketState = socketStates.get(socketChannel.hashCode());
                        if (socketState == States.Read) {
                            socketStates.put(socketChannel.hashCode(), States.Write);
                            System.out.println(task.getTimeToRead()   " "   task.getTimeToWrite());
                            Runnable h = new MyAsyncWriteThread(task);
                            pool.execute(h);
                        }
                        key.cancel();
                    }
                }
            }
        }
    }
}

MyClient.java

public class MyClient  {

    public static void main(String [] args) {

        Random rand = new Random();
        int secondsToRead = rand.nextInt(10);
        int secondsToWrite = secondsToRead   1;
        String message = "Seconds for the task to be read and written: "   secondsToRead   " "   secondsToWrite;
        System.out.println(message);
        Socket socket;
        try {
            socket = new Socket("127.0.0.1", 9876);
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            printWriter.println(message);
            System.out.println("Sending message");
        } catch (IOException e) {
            System.out.println("Error in Socket");
            System.exit(-1);
        }
    }
}




CodePudding user response:

sorry , I can't comment.

you can't use key.cancel();, I don't know your business, only I can advice just don't use Map like that.

JDK.NIO is very hard. here is your code(change a bit) , hoping work for you.

Don't write NIO code buy yourself. [https://netty.io/][Netty] is good.

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author [email protected]
 * @since 2022/12/13 09:46
 */
public class MyAsyncProcessor {

    enum States {
        Idle,
        Read,
        Write
    }

    ExecutorService pool;
    private Map<Integer, States> socketStates = new HashMap<>();

    public MyAsyncProcessor() {
    }

    public static class MyTask implements Runnable {

        @Override
        public void run() {
            System.out.println("execute task");
        }

        private int secondsToRead;
        private int secondsToWrite;

        public void setTimeToRead(int secondsToRead) {
            this.secondsToRead = secondsToRead;
        }

        public void setTimeToWrite(int secondsToWrite) {
            this.secondsToWrite = secondsToWrite;
        }
    }

    public static void main(String[] args) throws IOException {
        new MyAsyncProcessor().process();
    }

    public void process() throws IOException {
        pool = Executors.newFixedThreadPool(2);
        InetAddress host = InetAddress.getByName("localhost");
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(host, 9876));
        final SelectionKey register1 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        register1.attach(serverSocketChannel);
        SelectionKey key;
        while (true) {
            if (selector.select() > 0) {
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> i = selectedKeys.iterator();
                while (i.hasNext()) {
                    key = i.next();
                    i.remove();
                    MyTask task = new MyTask();
                    if (!key.isValid()) {
                        key.cancel();
                        continue;
                    }
                    if (key.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        System.out.println("Channel hashCode: "   socketChannel.hashCode());
                        final SelectionKey register = socketChannel.register(selector, SelectionKey.OP_READ);
                        register.attach(key.attachment());
                        System.out.println("Connection accepted from: "   socketChannel.getLocalAddress());
                    }
                    if (key.isReadable()) {
                        System.out.println("Readable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        socketStates.put(socketChannel.hashCode(), States.Read);


                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        try {
                            final int read = socketChannel.read(byteBuffer);
                            if (read > 0) {
                                System.out.println("receive message form client:"   new String(byteBuffer.array(), 0, read - 1));
                                task.setTimeToRead(10);
                                task.setTimeToWrite(10);
                                pool.execute(task);
                            }
                            socketChannel.register(selector, SelectionKey.OP_WRITE);
                        } catch (Exception e) {
                            socketChannel.close();
                        }
                    }
                    if (key.isValid() && key.isWritable()) {
                        System.out.println("Writable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        try {
                            socketChannel.write(ByteBuffer.wrap("hello world!".getBytes(StandardCharsets.UTF_8)));
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } catch (IOException e) {
                            socketChannel.close();
                        }
                    }
                }
            }
        }
    }
}
  • Related