4 Java I/O:AIO和NIO中的Selector( 二 )

之前说过,同步指的按顺序一次完成一个任务,直到前一个任务完成并有了结果以后 , 才能再执行后面的任务 。而异步指的是前一个任务结束后,并不等待任务结果,而是继续执行后一个任务 , 在所有任务都「执行」完后,通过任务的回调函数去获得结果 。所以异步使得应用性能有了极大的提高 。为了更加生动地说明什么是异步,可以来做个实验:

4 Java I/O:AIO和NIO中的Selector

文章插图
通过调用CompletableFuture.supplyAsync()方法可以很明显地观察到,处于位置2的「这一步先执行」会最先显示,然后才执行位置1的代码 。而这就是异步的具体实现 。
NIO为了支持异步 , 升级到了NIO2,也就是AIO 。而AIO引入了新的异步Channel的概念,并提供了异步FileChannel和异步SocketChannel的实现 。AIO的异步SocketChannel是真正的异步非阻塞I/O 。通过代码可以更好地说明:
/** * AIO客户端 * * @author xiangwang */public class AioClient {public void start() throws IOException, InterruptedException {AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();if (channel.isOpen()) {// socket接收缓冲区recbuf大小channel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024);// socket发送缓冲区recbuf大小channel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024);// 保持长连接状态channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);// 连接到服务端channel.connect(new InetSocketAddress(8080), null,new AioClientHandler(channel));// 阻塞主进程for(;;) {TimeUnit.SECONDS.sleep(1);}} else {throw new RuntimeException("Channel not opened!");}}public static void main(String[] args) throws IOException, InterruptedException {new AioClient().start();}}/** * AIO客户端CompletionHandler * * @author xiangwang */public class AioClientHandler implements CompletionHandler<Void, AioClient> {private final AsynchronousSocketChannel channel;private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));public AioClientHandler(AsynchronousSocketChannel channel) {this.channel = channel;}@Overridepublic void failed(Throwable exc, AioClient attachment) {throw new RuntimeException("channel not opened!");}@Overridepublic void completed(Void result, AioClient attachment) {System.out.println("send message to server: ");try {// 将输入内容写到bufferString line = input.readLine();channel.write(ByteBuffer.wrap(line.getBytes()));// 在操作系统中的Java本地方法native已经把数据写到了buffer中// 这里只需要一个缓冲区能接收就行了ByteBuffer buffer = ByteBuffer.allocate(1024);while (channel.read(buffer).get() != -1) {buffer.flip();System.out.println("from server: " + decoder.decode(buffer).toString());if (buffer.hasRemaining()) {buffer.compact();} else {buffer.clear();}// 将输入内容写到bufferline = input.readLine();channel.write(ByteBuffer.wrap(line.getBytes()));}} catch (IOException | InterruptedException | ExecutionException e) {e.printStackTrace();}}}/** * AIO服务端 * * @author xiangwang */public class AioServer {public void start() throws InterruptedException, IOException {AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();if (channel.isOpen()) {// socket接受缓冲区recbuf大小channel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);// 端口重用,防止进程意外终止,未释放端口 , 重启时失败// 因为直接杀进程,没有显式关闭套接字来释放端口,会等待一段时间后才可以重新use这个关口// 解决办法就是用SO_REUSEADDRchannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);channel.bind(new InetSocketAddress(8080));} else {throw new RuntimeException("channel not opened!");}// 处理client连接channel.accept(null, new AioServerHandler(channel));System.out.println("server started");// 阻塞主进程for(;;) {TimeUnit.SECONDS.sleep(1);}}public static void main(String[] args) throws IOException, InterruptedException {AioServer server = new AioServer();server.start();}}/** * AIO服务端CompletionHandler * * @author xiangwang */public class AioServerHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {private final AsynchronousServerSocketChannel serverChannel;private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));public AioServerHandler(AsynchronousServerSocketChannel serverChannel) {this.serverChannel = serverChannel;}@Overridepublic void failed(Throwable exc, Void attachment) {// 处理下一次的client连接serverChannel.accept(null, this);}@Overridepublic void completed(AsynchronousSocketChannel result, Void attachment) {// 处理下一次的client连接,类似链式调用serverChannel.accept(null, this);try {// 将输入内容写到bufferString line = input.readLine();result.write(ByteBuffer.wrap(line.getBytes()));// 在操作系统中的Java本地方法native已经把数据写到了buffer中// 这里只需要一个缓冲区能接收就行了ByteBuffer buffer = ByteBuffer.allocate(1024);while (result.read(buffer).get() != -1) {buffer.flip();System.out.println("from client: " + decoder.decode(buffer).toString());if (buffer.hasRemaining()) {buffer.compact();} else {buffer.clear();}// 将输入内容写到bufferline = input.readLine();result.write(ByteBuffer.wrap(line.getBytes()));}} catch (InterruptedException | ExecutionException | IOException e) {e.printStackTrace();}}}

推荐阅读