Home > Software design >  Selector.select() not being unblocked even after I register a channel ready to read from another thr
Selector.select() not being unblocked even after I register a channel ready to read from another thr

Time:09-29

I am creating a server to handle many different connections at the same time. I create two Selectors, on for the serverSocketChannel to accept and then the other for the connections to read data.

The one selector successfully gets past the blocking select() function to accept the new connection. Then the goal is to register that SocketChannel that was accepted with the other selector that is currently blocked by it's select() function so that I can read the data when I need to.

Everything seems to be set up correctly, but even after sending data, I am not getting past the other selector's select() function.

Here is my server file:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;

public class Server implements Runnable
{
    
    public Thread threadAccept;
    public Thread threadRead;
    
    protected ServerSocketChannel serverSocket;
    
    protected Selector selectorAccept;
    protected Selector selectorIO;

    protected Set<Connection> connections;
    
    private ByteBuffer buffer;
    
    public Server()
    {
        try
        {
            selectorAccept = Selector.open();
            selectorIO = Selector.open();
            
            serverSocket = ServerSocketChannel.open();
            serverSocket.configureBlocking(false);
            
            InetSocketAddress hostAddress = new InetSocketAddress("localhost",4444);
            serverSocket.bind(hostAddress);
            int ops = serverSocket.validOps();
            SelectionKey selectKey = serverSocket.register(selectorAccept, ops);
            
            
            threadAccept = new Thread(this);
            threadAccept.start();
            
            threadRead = new Thread(this);
            threadRead.start();
            
        }
        catch(Exception e)
        {
            System.out.println("Error " e);
        }
    }
    
    public void run()
    {
        if(Thread.currentThread() == threadAccept)
        {
            acceptNewConnections();
        }
        else if(Thread.currentThread() == threadRead)
        {
            readData();
        }
    }
    
    private void acceptNewConnections()
    {
        int numberOfKeys = 0;
        
        while(true)
        {
            try
            {
                numberOfKeys = selectorAccept.select();
                Set<SelectionKey> keys = selectorAccept.selectedKeys();
                Iterator<SelectionKey> itr = keys.iterator();
                
                while(itr.hasNext())
                {
                    SelectionKey key = itr.next();
                    if(key.isAcceptable())
                    {
                        SocketChannel client = serverSocket.accept();
                        client.configureBlocking(false);
                        client.register(selectorIO, SelectionKey.OP_READ);
                        System.out.println("New connection");
                    }
                }
            }
            catch(Exception e)
            {
                System.out.println(e);
            }
        }
    }
    
    public void readData()
    {
        int numberOfKeys = 0;
        ByteBuffer buffer = ByteBuffer.allocate(256);
        
        while(true)
        {
            try
            {
                System.out.println("About to block on IO selector");
                numberOfKeys = selectorIO.select();
                System.out.println("I NEVER GET HERE");
                
                Set<SelectionKey> keys = selectorIO.selectedKeys();
                Iterator<SelectionKey> itr = keys.iterator();
                
                while(itr.hasNext())
                {
                    SelectionKey key = itr.next();
                    if(key.isReadable())
                    {
                        SocketChannel channel = (SocketChannel)key.channel();
                        channel.read(buffer);
                        
                        String s = buffer.toString();
                        System.out.println(s);
                    }
                }
            }
            catch(Exception e)
            {
                System.out.println(e);
            }
        }
    }
    
    
}

And here is my main class to kick the server off and also create a client.

import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class Main
{
    public static void main(String[] args)
    {
        Server s = new Server();
        
        
        
        Thread t = new Thread(new Runnable() {
            public void run()
            {
                try
                {
                    Socket s = new Socket("localhost", 4444);
                    byte[] data = "hello".getBytes();
                    s.getOutputStream().write(data);
                    s.getOutputStream().flush();
                } catch (UnknownHostException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        
        t.start();
    }
}

CodePudding user response:

My guess is that a wakeup would be sufficient

client.register(selectorIO, SelectionKey.OP_READ);
selectorIO.wakeup();

Or just execute a task on the client thread where it registers the client on the same selectorIO it is waiting for. So the client thread should also check for a task queue as part of its loop (e.g. ConcurrentLinkedQueue) and call a selectorIO.wakup after placing the task in the task queue.

  • Related