client
Public class TcpClient {
Public static void main (String [] args) throws IOException {
//open network channel
SocketChannel SocketChannel=SocketChannel. Open (new InetSocketAddress (88) "127.0.0.1,");
SocketChannel. Write (ByteBuffer. Wrap (" hello world ". GetBytes ()));
While (true);
}
}
Server
Public class ReactorServer {
//to start listening to service
Public static void start (Integer port) {
Try {
//1. The service side open a listening passage
ServerSocketChannel ServerSocketChannel=ServerSocketChannel. The open ();
//2. Binding port
ServerSocketChannel. Bind (new InetSocketAddress (port));
//3. Set to a non-blocking
ServerSocketChannel. ConfigureBlocking (false);
//open a selector that
The Selector Selector=the Selector. The open ();
//registered channel and time to listen to
ServerSocketChannel. Register (selector, SelectionKey OP_ACCEPT, new Acceptor (selector, serverSocketChannel));
While (selector. The select () & gt; 0 {
SetSelectionKeys=selector. SelectedKeys ();
IteratorThe iterator=selectionKeys. The iterator ();
//handle events
While (iterator. HasNext ()) {
SelectionKey key=iterator. Next ();
A Runnable handler=(Runnable) key. Attachment ();
Handler. The run ();
//remove the event
The iterator. Remove ();
}
}
} the catch (IOException e) {
e.printStackTrace();
}
}
Public static void main (String [] args) {
Start (88);
}
}
Accepor
Public class Acceptor implements Runnable {
Private Selector Selector;
Private ServerSocketChannel ServerSocketChannel;
Public Acceptor (Selector Selector, ServerSocketChannel ServerSocketChannel) {
Enclosing the selector=the selector;
Enclosing serverSocketChannel=serverSocketChannel;
}
@ Override
Public void the run () {
Try {
SocketChannel SocketChannel=serverSocketChannel. The accept ();
//set non-blocking
SocketChannel. ConfigureBlocking (false);
//register the selector, and pay the dispatchhandler processing
System. The out. Println (" a client connection in ");
SocketChannel. Register (selector, SelectionKey OP_READ, new DispatchHandler (socketChannel));
} the catch (IOException e) {
e.printStackTrace();
}
}
}
DispatchHandler
Public class DispatchHandler implements Runnable {
Private static Executor Executor=Executors. NewFixedThreadPool (Runtime. The getRuntime (). AvailableProcessors () & lt; <1);
Private SocketChannel SocketChannel;
Public DispatchHandler (SocketChannel SocketChannel) {
Enclosing a socketChannel=socketChannel;
}
@ Override
Public void the run () {
System. The out. Println (" open event thread processing, speaking, reading and writing ");
//by thread pool
Executor. Execute (new ReadHandler (socketChannel));
}
}
ReadHandler
Public class ReadHandler implements Runnable {
Private SocketChannel SocketChannel;
Public ReadHandler (SocketChannel SocketChannel) {
Enclosing a socketChannel=socketChannel;
}
@ Override
Public void the run () {
ByteBuffer ByteBuffer=ByteBuffer. The allocate (1024);
Charset Charset=Charset. Class.forname (" utf-8 ");
Try {
While (socketChannel. Read (byteBuffer) & gt; 0 {
ByteBuffer. Flip ();
System. The out. Println (charset. Decode (byteBuffer). The toString ());
}
ByteBuffer. The clear ();
//data back to write
SocketChannel. Write (ByteBuffer. Wrap (" received ". GetBytes ()));
} the catch (IOException e) {
e.printStackTrace();
}
}
}
below is the server running the results
Have client connection in
Open event thread processing, speaking, reading and writing
Open event thread processing, speaking, reading and writing
Open event thread processing, speaking, reading and writing
.
Hello world
Read the event will be repeated, why I am above all deleted
CodePudding user response:
DispatchHandler initialization to establish minimum number of objects in the object poolCodePudding user response: