io编程

本文将介绍从传统的BIO到NIO再到AIO,并附上完整的代码说明。

下面的代码中会用到一个例子:客户端发送一串公式给服务器,服务器计算后将结果返回给客户端。

代码的所有描述都直接作为注释嵌入到代码中,阅读代码时会更容易理解。代码中将使用一个用于计算结果的工具类。参见文章的代码部分。

1、BIO编程

1.1、传统的生物编程

网络编程的基本模式是C/S模式,即两个进程之间的通信。

服务器提供IP和监听端口,客户端通过连接操作向服务器监听的地址发起连接请求,通过三次握手进行连接。如果连接成功建立,双方可以通过套接字进行通信。

在传统同步阻塞模型的开发中,ServerSocket负责绑定IP地址,启动监听端口;套接字负责启动连接操作。连接成功后,双方通过输入输出流进行同步阻塞通信。

简单描述一下BIO的服务端通信模型:对于采用BIO通信模型的服务端,通常由一个独立的接受线程负责监控客户端的连接。收到客户端的连接请求后,为每个客户端创建一个新的线程来处理链接,并通过输出流向客户端返回回复,线程被销毁。也就是典型的一求一答通宵模式。

传统生物通信模型图:

io编程

这种模式最大的问题是缺乏灵活性和可扩展性。当客户端并发访问次数增加时,服务器上的线程数与客户端并发访问次数呈现1:1的比例关系。Ja中的线程也是宝贵的系统资源。线程数量急剧膨胀后,系统的性能会急剧下降。随着访问量的不断增加,这个系统最终会消亡。

由同步阻塞I/O创建的服务器源代码:

包com . an xpp . io . calculator . bio;导入Ja . io . io exception;导入Ja . net . server socket;导入Ja . net . socket;/** * BIO服务器源代码* @ author yangtao _ _ anxpp.com * @ version 1.0 */public final class server normal {//默认端口号private static int default _ port = 12345;//单服务器套接字私有静态服务器套接字服务器;//根据传入参数设置监听端口。如果没有参数,调用下面的方法,使用public static void start()throwsioexception的默认值{//使用start(DEFAULT_PORT)的默认值;}//这个方法不会被大量并发用户访问,所以不需要考虑效率。直接同步方法就可以了。公共同步静态void start (int port)引发io异常{if (server!= null)返回;Try{//通过构造函数//创建一个serversocket,如果端口合法空空闲,服务器将成功监听server = new server socket(port);System.out.println(“服务器启动,端口号:”+port);//通过无线循环监听客户端连接//如果没有客户端访问,会阻塞接受操作。while(true){ Socket Socket = server . accept();//当一个新的客户端访问时,会执行下面的代码//然后创建一个新的线程来处理这个Socket链接,new thread(new server handler(Socket))。start();} }最后{//一些必要的清理工作如果(服务器!= null){System.out.println(“服务器已关闭。”);server . close();server = null}}}}客户端消息处理线程ServerHandler源代码:

包com . an xpp . io . calculator . bio;导入Ja . io . buffered reader;导入Ja . io . io exception;导入Ja . io . inputstreamreader;导入Ja . io . printwriter;导入Ja . net . socket;导入com . an xpp . io . utils . calculator;/* * * Client thread * @ author yang Tao _ _ an xpp . com *用于处理客户端的套接字链接*/public class server handler实现runnable { private socketpublic server handler(Socket Socket){ this . Socket = Socket;} @ override public void run(){ buffered reader in = null;PrintWriter out = nulltry { in = new buffered reader(new InputStreamReader(socket . getinputstream()));out = new PrintWriter(socket . get output stream(),true);字符串表达式;字符串结果;While(true){//通过BufferedReader读取一行//如果已经读取了输入流的末尾,则返回null并退出循环//如果得到非[/k0/]值,则尝试计算结果并返回if ((expression = in。readline())= = null)break;System.out.println(“服务器接收消息:”+表达式);试试{ result = calculator . cal(expression)。toString();}catch(Exception e){result = “计算错误:”+e . getmessage();}out.println(结果);}}catch(异常e){ e . printstacktrace();}最后{//一些必要的清理工作如果(在!= null){ try { in . close();} catch(io exception e){ e . printstacktrace();} in = null}如果(出!= null){ out . close();out = null}如果(插座!= null){ try { socket . close();} catch(io exception e){ e . printstacktrace();} socket = null}}}}同步阻塞I/O创建的客户端源代码:

包com . an xpp . io . calculator . bio;导入Ja . io . buffered reader;导入Ja . io . io exception;导入Ja . io . inputstreamreader;导入Ja . io . printwriter;导入Ja . net . socket;/* * *阻塞I/O创建的客户端* @ author yangtao _ _ anxpp.com * @ version 1.0 */public class Client {//默认端口号private static int default _ server _ port = 12345;私有静态字符串DEFAULT _ SERVER _ IP = ” 127 . 0 . 0 . 1 “;公共静态void send(字符串表达式){send(DEFAULT_SERVER_PORT,expression);} public static void send (int port,string表达式){system。out.println(“算术表达式为:”+表达式);Socket socket = nullBufferedReader in = nullPrintWriter out = nulltry { Socket = new Socket(DEFAULT _ SERVER _ IP,port);in = new buffered reader(new InputStreamReader(socket . getinputstream()));out = new PrintWriter(socket . get output stream(),true);out.println(表达式);System.out.println(“___结果为:”+in . readline());} catch(Exception e){ e . printstacktrace();}最后{//检查必要的清洁工作如果(在!= null){ try { in . close();} catch(io exception e){ e . printstacktrace();} in = null}如果(出!= null){ out . close();out = null}如果(插座!= null){ try { socket . close();} catch(io exception e){ e . printstacktrace();} socket = null}}}}测试代码,放在同一个程序(jvm)中,方便在控制台上查看输出结果:

包com . an xpp . io . calculator . bio;导入Ja . io . io exception;导入Ja . util . random;/* * *测试方法* @ author yangtao _ _ anxpp.com * @ version 1.0 */public class test {//test main方法public static void main(string[]args)。Throws InterruptedException {//运行服务器new thread(newrunnable(){ @ override public void Run(){ try { server better . start();} catch(io exception e){ e . printstacktrace();}}}).start();//避免客户端在服务器启动前执行代码thread . sleep(100);//Run client charoperators [] = {‘+’,’-‘,’ * ‘,/’ };Random Random = new Random(system . current time millis());new thread(newrunnable(){ @ suppress warnings(” static-access “)@ override public void run(){ while(true){//随机算术表达式String expression = random。Nextint (10)。+” “+运算符[random . nextint(4)]+(random . nextint(10)+1);Client.send(表达式);请尝试{Thread.currentThread()。sleep(random . nextint(1000));} catch(interrupted exception e){ e . printstacktrace();}}}}).start();}}运行结果之一:

服务器已经启动。端口号:12345算术表达式为:4-2服务器接收消息:4-2 _ _结果为:2算术表达式为:5-10服务器接收消息:5-10 _ _结果为:-5算术表达式为:0-9服务器接收消息:0 -9 _ _结果为:-9算术表达式为:6__结果为:0.16666666666666…从上面的代码很容易看出,BIO的主要问题是每次有新的客户端请求访问,服务器都必须创建一个新的线程来处理这个链接,这在需要满足高性能和高并发的场景下是无法应用的(创建大量的新线程会严重影响服务器性能甚至罢工)。

1.2、伪异步I/O编程

为了改进这种连接一个线程的模型,我们可以使用线程池来管理这些线程(更多信息请参考前面提供的文章),实现一个或多个线程处理N个客户端的模型(但底层仍然使用同步阻塞I/O),这就是通常所说的“伪异步I/O模型”。

伪异步I/O模型图:

实现非常简单。我们只需要把创建新线程的地方交给线程池来管理,只需要修改刚才的服务器代码:

包com . an xpp . io . calculator . bio;导入Ja . io . io exception;导入Ja . net . server socket;导入Ja . net . socket;导入Ja . util . concurrent . executorservice;导入Ja . util . concurrent . executors;/** * BIO server源代码_ _伪异步I/O * @ author yangtao _ _ anxpp.com * @ version 1.0 */public final class server better {//默认端口号private static int default _ port = 12345;//单服务器套接字私有静态服务器套接字服务器;//线程池惰性单例私有静态executorserviceexecutorservice = executors。NewFixedThreadPool(60);//根据传入参数设置监听端口。如果没有参数,调用下面的方法,使用public static void start()throwsioexception的默认值{//使用start(DEFAULT_PORT)的默认值;}//这个方法不会被大量并发用户访问,所以不需要考虑效率。直接同步方法就可以了。公共同步静态void start (int port)引发io异常{if (server!= null)返回;Try{//通过构造函数//创建一个serversocket,如果端口合法空空闲,服务器将成功监听server = new server socket(port);System.out.println(“服务器启动,端口号:”+port);//通过无线循环监听客户端连接//如果没有客户端访问,会阻塞接受操作。while(true){ Socket Socket = server . accept();//新的客户端访问时,会执行下面的代码//然后创建一个新的线程来处理这个Socket link ExecutorService。Execute(新服务器处理程序(套接字));} }最后{//一些必要的清理工作如果(服务器!= null){System.out.println(“服务器已关闭。”);server . close();server = null}}}测试结果是一样的。

我们知道,如果使用CachedThreadPool(线程数量不限,不清楚请参考文章开头提供的文章),其实它可以自动帮助我们管理线程(复用),看起来就像是1:1的客户端:线程数量模型。通过使用FixedThreadPool,可以有效控制线程的最大数量,保证对系统有限资源的控制,实现N: M的伪异步I/O。

但由于线程数量的限制,如果出现大量并发请求,超过最大数量的线程只能等待,直到线程池中的空闲线程可以被重用。当套接字的输入流被读取时,它将被阻塞,直到:

有数据要读。

可用数据和已读数据

发生了空指针或I/O异常。

所以在读取数据比较慢的时候(比如数据量大,网络传输慢等。)和大量并发消息,其他被访问的消息只能一直等待,这是最大的弊端。

后面要介绍的NIO可以解决这个问题。

2、NIO 编程

JDK 1.4的ja.nio.*包中引入了一个新的Ja I/O库,旨在提高速度。事实上,“旧的”I/O包已经用NIO重新实现了,即使我们没有显式地使用NIO编程,我们也可以从中受益。文件I/O和网络I/O都可能会提高速度,但本文只讨论后者。

2.1.介绍

NIO一般被认为是新I/O(也是官方名称),因为它是旧I/O类库的新补充(实际上在JDK 1.4中已经引入了,但这个术语还会继续使用很长时间,即使它们现在已经“旧”了,所以它提醒我们在命名时需要慎重考虑),而且它做了很大的改动。但人们称之为非阻塞I/O,即非阻塞I/O,因为它更能体现其特点。下面的NIO不是指整个新的I/O库,而是指非阻塞I/O..

NIO提供了两种不同的套接字通道实现,Socket通道和ServerSocket通道,分别对应于传统BIO模型中的Socket和Server Socket。

两个新通道都支持阻塞和非阻塞模式。

阻塞模式的使用,和传统支持一样,简单,但是性能和可靠性不好;非阻塞模式正好相反。

对于低负载、低并发的应用,可以使用同步阻塞I/O来提高开发速度和更好的可维护性。对于高负载高并发(网络)的应用,应该使用NIO的非阻塞模式进行开发。

下面先介绍一下基础知识。

2.2、缓冲缓冲

Buffer是一个包含一些要写入或读取的数据的对象。

在NIO库中,所有数据都用缓冲区处理。读取数据时,直接读入缓冲区;当写入数据时,它也被写入缓冲区。无论何时访问NIO中的数据,都是通过缓冲区进行操作的。

Buffer实际上是一个数组,它提供对数据和信息的结构化访问,比如维护读写位置。

有这些特定的缓存区域:ByteBuffe、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer和DoubleBuffer。他们实现了相同的接口:Buffer。

2.3.频道

我们通过通道读写数据,通道就是一个像水管一样的通道。通道与流的不同之处在于,它是双向的,可以用于读、写,也可以同时用于读和写。

底层操作系统的通道一般是全双工的,所以全双工通道比流更能映射底层操作系统的API。

渠道主要分为两类:

SelectableChannel:用户网络读写

FileChannel:用于文件操作。

以下代码中涉及的ServerSocketChannel和SocketChannel是SelectableChannel的子类。

2.4、多路选择器

选择器是Ja NIO编程的基础。

选择器提供了选择就绪任务的能力:选择器将持续轮询其上注册的通道。如果通道上发生读或写事件,通道将处于就绪状态,并将被选择器轮询,然后可以通过SelectionKey获得就绪通道集,用于后续的I/O操作。

一个选择器可以同时轮询多个通道,因为JDK使用epoll()而不是传统的select实现,所以对1024/2048的最大连接句柄没有限制。因此,只需要一个线程来轮询选择器,就可以访问成千上万个客户端。

2.5,NIO服务器

代码看起来比传统的套接字编程复杂得多。

直接粘贴代码,以注释的形式给出代码描述。

NIO创建的服务器源代码:

包com . an xpp . io . calculator . nio;public class Server { private static int DEFAULT _ PORT = 12345;私有静态ServerHandle serverHandlepublic static void start(){ start(DEFAULT _ PORT);}公共静态同步void start(int port){ if(server handle!= null)server handle . stop();server handle = new server handle(port);新线程(serverHandle,“服务器”)。start();}公共静态void main(String[]args){ start();} }服务器句柄:

包com . an xpp . io . calculator . nio;导入Ja . io . io exception;导入Ja . net . inetsocketaddress;导入Ja . nio . byte buffer;导入Ja . nio . channels . selection key;导入Ja . nio . channels . selector;导入Ja . nio . channels . serversocketchannel;导入Ja . nio . channels . socket channel;导入Ja . util . iterator;导入Ja . util . set;导入com . an xpp . io . utils . calculator;/* * * NIO server * @ author yangtao _ _ anxpp.com * @ version 1.0 */公共类服务器句柄实现runnable {private selector选择器;私有服务器套接字通道服务器通道;私有可变布尔值已启动;/* * * constructor * @param port指定监听的端口号*/public server handle(int port){ try {//Create selector selector = selector . open();//打开监听通道server channel = serversocketchannel . Open();//如果为真,该通道将被置于阻塞模式;如果为false,此通道将被置于非阻塞模式server channel . configure blocking(false);//开启非阻塞模式//将绑定端口backlog设置为1024 serverchannel.socket()。bind(新inetsocket地址(端口),1024);//监听客户端连接请求serverchannel.register(选择器,selection key . op _ accept);//标记服务器已启动= trueSystem.out.println(“服务器启动,端口号:”+port);} catch(io exception e){ e . printstacktrace();system . exit(1);} } public void stop(){ started = false;}@Overridepublic void run() {//循环通过selectorwhile(started){try{//无论是否有读写事件,选择器每1s唤醒一次。选择(1000);//阻塞,只有当至少一个注册的事件发生时,阻塞才会继续。//selector . select();设置& lt选择键& gtkeys = selector . selected keys();迭代器& lt选择键& gtit = keys . iterator();SelectionKey key = nullwhile(it . has next()){ key = it . next();it . remove();试试{ handle input(key);}catch(Exception e){if(key!= null){ key . cancel();if(key.channel()!= null){key.channel()。close();} } } } } catch(Throwable t){ t . printstacktrace();} }//资源if(选择器!= null)请尝试{ selector . close();} catch(Exception e){ e . printstacktrace();}}私有void句柄输入(选择键)抛出io异常{if (key。is valid()){//处理新的访问请求消息if (key。is acceptable()){服务器套接字通道SSC =(服务器套接字通道)密钥。channel();//通过ServerSocketChannel的accept创建SocketChannel的实例//完成这个操作意味着完成TCP三次握手,TCP物理链路正式建立socket channel sc = SSC . accept();//设置为非阻塞sc . configure blocking(false);//寄存器读取sc.register(选择器,selection key . op _ read);}//读取消息if(key . is readable()){ socket channel SC =(socket channel)key . channel();//创建一个ByteBuffer,打开一个1M的缓冲区ByteBuffer = ByteBuffer。分配(1024);//读取请求码流,返回读取的字节数int Read bytes = sc . Read(buffer);//读取字节并对字节进行编解码if(Read bytes > >:0){//将缓冲区的当前限制设置为position=0,用于buffer.flip()的后续读取操作;//根据缓冲区中可读字节数创建一个字节数组byte[]bytes = new byte[buffer . remaining()];//将缓冲区可读字节数组复制到新创建的数组buffer . get(bytes);字符串表达式=新字符串(字节,“UTF-8”);System.out.println(“服务器接收消息:”+表达式);//处理数据字符串结果= null试试{ result = calculator . cal(expression)。toString();}catch(Exception e){result = “计算错误:”+e . getmessage();}//发送回复消息doWrite(sc,result);}//未读取的字节忽略//else if(read Bytes = = 0);//链接已经关闭,如果(readBytes&lt

打开ServerSocketChannel并监听客户端连接。

绑定监听端口,并将连接设置为非阻塞模式。

创建反应器线程,创建多路复用器和启动线程。

将ServerSocketChannel注册到反应器线程中的选择器,并侦听ACCEPT事件。

选择器轮询就绪键。

选择器监控新的客户端访问,处理新的访问请求,完成TCP三次握手,并恢复物理链路。

将客户端链接设置为非阻塞模式

将新访问的客户端连接注册到反应器线程的选择器,监控读取操作,读取客户端发送的网络消息。

将客户端消息异步读取到缓冲区

对缓冲区进行编码和解码,处理半包消息,并将解码成功的消息封装到任务中。

将回复消息编码为Buffer,调用SocketChannel的write将消息异步发送到客户端。

因为回复消息的发送是异步无阻塞的,不能保证要发送的数据一次就能发送完,这个时候就会出现写半个包的问题。我们需要注册写操作,不断轮询选择器发送未完成的消息,然后通过缓冲区的hasRemain()方法判断消息是否发送。

2.6,NIO客户端

我们直接看代码,过程不需要太多解释,有点类似于服务器代码。

客户:

包com . an xpp . io . calculator . nio;公共类Client { private static String DEFAULT _ HOST = ” 127 . 0 . 0 . 1 “;private static int DEFAULT _ PORT = 12345;私有静态ClientHandle clientHandlepublic static void start(){ start(DEFAULT _ HOST,DEFAULT _ PORT);} public static synchronized void start(String IP,int port){if(clientHandle!= null)client handle . stop();client handle = new client handle(IP,port);新线程(clientHandle,“服务器”)。start();}//向服务器发送消息public static Boolean sendmsg(string msg)throws exception { if(msg . equals(” q “))返回falseclient handle . send msg(msg);返回true}公共静态void main(String[]args){ start();}}客户端句柄:

包com . an xpp . io . calculator . nio;导入Ja . io . io exception;导入Ja . net . inetsocketaddress;导入Ja . nio . byte buffer;导入Ja . nio . channels . selection key;导入Ja . nio . channels . selector;导入Ja . nio . channels . socket channel;导入Ja . util . iterator;导入Ja . util . set;/* * * NIO client * @ author yangtao _ _ anxpp.com * @ version 1.0 */公共类客户端句柄实现者runnable { private string host专用int端口;私有选择器选择器;私有SocketChannel socketChannel私有可变布尔值已启动;public ClientHandle(String ip,int port){ this . host = IP;this.port = portTry{// Create selector选择器= selector . open();//打开监控通道socket channel = socket channel . Open();//如果为真,该通道将被置于阻塞模式;如果为false,通道将被置于非阻塞模式:socketchannel。configure blocking(false);//开启非阻塞模式。开始=真;} catch(io exception e){ e . printstacktrace();system . exit(1);} } public void stop(){ started = false;} @ override public void run(){ try { doc connect();} catch(io exception e){ e . printstacktrace();system . exit(1);}//循环遍历selectorwhile(started){try{//无论是否有读写事件,选择器每1s唤醒一次。选择(1000);//阻塞,只有当至少一个注册的事件发生时,阻塞才会继续。//selector . select();设置& lt选择键& gtkeys = selector . selected keys();迭代器& lt选择键& gtit = keys . iterator();SelectionKey key = nullwhile(it . has next()){ key = it . next();it . remove();试试{ handle input(key);}catch(Exception e){if(key!= null){ key . cancel();if(key.channel()!= null){key.channel()。close();}}}}}catch(异常e){ e . printstacktrace();system . exit(1);} }//资源if(选择器!= null)请尝试{ selector . close();} catch(Exception e){ e . printstacktrace();} }私有void handle input(selection key key)抛出io exception { if(key . is valid()){ socket channel sc =(socket channel)key . channel();if(key . is connectable()){ if(sc . finish connect());else system . exit(1);}//读取消息if(key.isReadable()){//创建一个ByteBuffer,打开一个1M的缓冲区,ByteBuffer = ByteBuffer。分配(1024);//读取请求码流并返回读取的字节数,int Read bytes = sc . Read(buffer);//读取字节并对字节进行编解码if(Read bytes > >:0){//将缓冲区的当前限制设置为position=0,用于buffer.flip()的后续读取操作;//根据缓冲区中可读字节数创建一个字节数组byte[]bytes = new byte[buffer . remaining()];//将缓冲区可读字节数组复制到新创建的数组buffer . get(bytes);字符串结果=新字符串(字节,“UTF-8”);System.out.println(“客户端接收消息:”+result);}//未读取的字节忽略//else if(read Bytes = = 0);//链接已经关闭,如果(readBytes&lt

首先运行服务器,顺便运行客户端:

包com . an xpp . io . calculator . nio;导入Ja . util . scanner;/* * *测试方法* @ author yangtao _ _ anxpp.com * @ version 1.0 */Test main方法@ suppressed warnings(” resource “)public static void main(string[]args)throws exception {//Run server . start();//避免客户端在服务器启动前执行代码thread . sleep(100);//运行client . start();while(Client.sendMsg(新扫描器(System.in))。nextLine()));}}我们也可以单独运行客户端,效果是一样的。

测试的结果:

服务器启动,端口号:123451+2+3+4+5+6服务器接收消息:1+2+3+4+5+6客户端接收消息:211*2/3-4+5*6/7-8服务器接收消息:1*2/3-4+5*。

3、AIO编程

NIO 2.0引入了异步通道的新概念,并提供了异步文件通道和异步套接字通道的实现。

异步套接字通道是真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。他可以异步读写,而不需要太多选择器轮询注册的通道,从而简化了NIO的编程模型。

编码就行了。

3.1,服务器端代码

服务器:

包com . an xpp . io . calculator . AIO . server;/* * * anxpp.com服务器* @ author yangtao _ _ AIO * @ version 1.0 */public class server { private static int default _ port = 12345;私有静态AsyncServerHandler server handle;public volatile静态long client count = 0;public static void start(){ start(DEFAULT _ PORT);}公共静态同步void start(int port){ if(server handle!=null)返回;server handle = new AsyncServerHandler(port);新线程(serverHandle,“服务器”)。start();} public static void main(String[]args){ server . start();}} AsyncServerHandler:包com . an xpp . io . calculator . AIO . server;导入Ja . io . io exception;导入Ja . net . inetsocketaddress;import Ja . nio . channels . asynchronousserversocketchannel;导入Ja . util . concurrent . countdownlatch;公共类AsyncServerHandler实现Runnable { public CountDownLatch latch;公共异步服务器套接字通道;public async server handler(int port){ try {//Create server channel = asynchronous server socket . open();//bind port channel . bind(new inetsocketaddress(port));System.out.println(“服务器启动,端口号:”+port);} catch(io exception e){ e . printstacktrace();} } @ override public void run(){//count down Latch初始化//它的作用:在完成一组正在执行的操作之前,允许当前站点一直阻塞//在这里,让站点阻塞在这里,执行后阻止服务器退出//你也可以用while(true)+sleep //生成一个环境,这样就不需要担心这个问题,以为服务器不会Lat退出。//用于接收连接通道。客户端的accept (this,new accept handler());请尝试{ latch . await();} catch(interrupted exception e){ e . printstacktrace();}}}接受处理程序:

包com . an xpp . io . calculator . AIO . server;导入Ja . nio . byte buffer;导入Ja . nio . channels . asynchronoussocketchannel;导入Ja . nio . channels . completion handler;//作为处理程序,接收客户端连接到公共类接受处理程序实现完成处理程序

包com . an xpp . io . calculator . AIO . server;导入Ja . io . io exception;import Ja . io . unsupportedencodingexception;导入Ja . nio . byte buffer;导入Ja . nio . channels . asynchronoussocketchannel;导入Ja . nio . channels . completion handler;导入com . an xpp . io . utils . calculator;公共类ReadHandler实现CompletionHandler & lt整数,字节缓冲区& gt{//私有异步SocketChannel,用于读取半包消息和发送回复;public read handler(AsynchronousSocketChannel channel){ this . channel = channel;}//读取消息后的处理@ override public void completed(整数结果,字节缓冲附件){//flip操作attachment . flip();//根据byte []消息=新字节[附件。剩余()];attachment.get(消息);试试{ String expression = new String(message,“UTF-8”);System.out.println(“服务器接收消息:”+表达式);String calrResult = null试试{ calrResult = calculator . cal(expression)。toString();}catch(Exception e){calrResult = “计算错误:”+e . getmessage();}//向客户端发送消息doWrite(calrResult);} catch(UnsupportedEncodingException e){ e . printstacktrace();} }//发送消息私有void do write(string result){ byte[]bytes = result。getbytes();byte buffer write buffer = byte buffer . allocate(bytes . length);writeBuffer.put(字节);write buffer . flip();//异步写数据参数与前一次读相同。Channel.write (writebuffer,writebuffer,新的完成处理程序

现在看看客户端代码。

3.2、客户端代码

客户:

包com . an xpp . io . calculator . AIO . client;导入Ja . util . scanner;公共类Client { private static String DEFAULT _ HOST = ” 127 . 0 . 0 . 1 “;private static int DEFAULT _ PORT = 12345;私有静态AsyncClientHandler client handle;public static void start(){ start(DEFAULT _ HOST,DEFAULT _ PORT);} public static synchronized void start(String IP,int port){if(clientHandle!=null)返回;client handle = new AsyncClientHandler(IP,port);新线程(clientHandle,“客户端”)。start();}//向服务器发送消息public static Boolean sendmsg(string msg)throws exception { if(msg . equals(” q “))返回falseclient handle . send msg(msg);返回true}@SuppressWarnings(“resource “)公共静态void main(String[] args)抛出异常{ client . start();System.out.println(“请输入请求消息:”);Scanner scanner =新扫描仪(system . in);while(client . sendmsg(scanner . nextline());}}AsyncClientHandler:

包com . an xpp . io . calculator . AIO . client;导入Ja . io . io exception;导入Ja . net . inetsocketaddress;导入Ja . nio . byte buffer;导入Ja . nio . channels . asynchronoussocketchannel;导入Ja . nio . channels . completion handler;导入Ja . util . concurrent . countdownlatch;公共类AsyncClientHandler实现CompletionHandler & ltVoid,AsyncClientHandler & gt,Runnable { private AsynchronousSocketChannel client channel;私有字符串主机;专用int端口;私有CountDownLatch闩锁;public async client handler(String host,int port){ this . host = host;this.port = portTry {//创建异步客户端通道,client channel = asynchronoussocketchannel . open();} catch(io exception e){ e . printstacktrace();} } @ override public void run(){//Create CountDownLatch Waiting latch = new CountDownLatch(1);//发起异步连接操作,回调参数是类本身。如果连接成功,它将回调完成的方法客户端通道。Connect(新inetSocket地址(主机,端口),this,this);请尝试{ latch . await();} catch(interrupted exception E1){ E1 . printstacktrace();}请尝试{ client channel . close();} catch(io exception e){ e . printstacktrace();} }//成功连接服务器//表示TCP三次握手已经完成@ override public void completed(void result,asyncclienthandler attachment){ system . out . println(“客户端成功连接服务器…”);}//连接服务器失败@ override public void failed(throwable exc,asyncclienthandler attachment){ system . err . println(“连接服务器失败…”);exc . printstacktrace();请尝试{ client channel . close();latch . count down();} catch(io exception e){ e . printstacktrace();} }//向服务器发送消息public void sendmsg(string msg){ byte[]req = msg . getbytes();byte buffer write buffer = byte buffer . allocate(req . length);write buffer . put(req);write buffer . flip();//异步写入client channel . write(writebuffer,write buffer,new writehandler (clientchannel,latch));}}写处理程序:

包com . an xpp . io . calculator . AIO . client;导入Ja . io . io exception;导入Ja . nio . byte buffer;导入Ja . nio . channels . asynchronoussocketchannel;导入Ja . nio . channels . completion handler;导入Ja . util . concurrent . countdownlatch;公共类WriteHandler实现CompletionHandler & lt整数,字节缓冲区& gt{ private AsynchronousSocketChannel client channel;私有CountDownLatch闩锁;public write handler(AsynchronousSocketChannel client channel,CountDownLatch latch){ this . client channel = client channel;this.latch = latch} @ override public void completed(integer result,byte buffer){//完成所有数据的写入if (buffer。has remaining()){客户端通道。写(缓冲,缓冲,这个);}else {//读取数据byte buffer read buffer = byte buffer . allocate(1024);clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel,latch));}} @ override public void失败(throwable exc,byte buffer attachment){ system . err . println(“数据传输失败…”);请尝试{ client channel . close();latch . count down();} catch(io exception e){ } } } ReadHandler:

包com . an xpp . io . calculator . AIO . client;导入Ja . io . io exception;import Ja . io . unsupportedencodingexception;导入Ja . nio . byte buffer;导入Ja . nio . channels . asynchronoussocketchannel;导入Ja . nio . channels . completion handler;导入Ja . util . concurrent . countdownlatch;公共类ReadHandler实现CompletionHandler & lt整数,字节缓冲区& gt{ private AsynchronousSocketChannel client channel;私有CountDownLatch闩锁;public read handler(AsynchronousSocketChannel client channel,CountDownLatch latch){ this . client channel = client channel;this.latch = latch}@Overridepublic void completed(整数结果,byte buffer buffer){ buffer . flip();byte[] bytes =新字节[buffer . remaining()];buffer.get(字节);弦体;试试{body = new String(bytes,“UTF-8”);System.out.println(“客户端接收结果:”+body);} catch(UnsupportedEncodingException e){ e . printstacktrace();}} @ override public void失败(throwable exc,byte buffer attachment){ system . err . println(“数据读取失败…”);请尝试{ client channel . close();latch . count down();} catch (IOException e) {}}}这个API使用起来确实很方便。

3.3.测试

测试:

包com . an xpp . io . calculator . AIO;导入Ja . util . scanner;导入com . an xpp . io . calculator . AIO . client . client;导入com . an xpp . io . calculator . AIO . server . server;/* * * Test method * @ author yangtao _ _ anxpp.com * @ version 1.0 */Test main method @ suppressed warnings(” resource “)public static void main(string[]args)throws exception {//Run server . start();//避免客户端在服务器启动前执行代码thread . sleep(100);//运行client . start();System.out.println(“请输入请求消息:”);Scanner scanner =新扫描仪(system . in);while(client . sendmsg(scanner . nextline());}}我们可以在控制台输入需要计算的算术字符串,服务器会返回结果。当然,我们也可以运行大量的客户端,这些都是没有问题的,所以我们这里把它设计成单例客户端,所以不做大量客户端并发的演示。

读者可以自己修改客户端类,然后利用构造方法打开大量线程,创建大量客户端测试。

以下是其中一个参数的输出:

服务器已启动,端口号:12345。请输入请求消息:客户端成功连接到服务器…连接的客户端数量:1123456+789+456。服务器收到消息:123456+789+456。客户端收到结果:1247019526*56。服务器收到消息:9526*56。客户端收到的结果:5356。

我们来比较一下几种I/O编程的优缺点。

4、各种I/O的对比

我们用一张表来做个直观的对比:

选择什么样的模型或者NIO框架,完全是基于业务的实际应用场景和性能需求。如果客户端少,服务器负载不重,就没必要选择开发相对简单的NIO作为服务器。相反,您应该考虑使用NIO或相关框架。

5、附录

以上文章中服务器用于计算的工具类:

package com.anxpp.utils;import jax.script.ScriptEngine;import jax.script.ScriptEngineManager;import jax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName(“JaScript”); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}私信我:“资料”,可免费领取更多学习资料哦

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。

发表回复

登录后才能评论