【面试】一篇文章帮你彻底搞清楚“I/O多路复用”和“异步I/O”的前世今生,
【面试】一篇文章帮你彻底搞清楚“I/O多路复用”和“异步I/O”的前世今生,
曾经的VIP服务 https://github.com/coding-new-talking/java-code-demo.git (END) 作者是工作超过10年的码农,现在任架构师。喜欢研究技术,崇尚简单快乐。追求以通俗易懂的语言解说技术,希望所有的读者都能看懂并记住。下面是公众号和知识星球的二维码,欢迎关注!
客户端:创建20个Socket并连接到服务器上,再创建20个线程,每个线程负责一个Socket。/**
* @author lixinjie
* @since 2019-05-07
*/
public class BioServer {
static AtomicInteger counter = new AtomicInteger(0); static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress("localhost", 8080));
while (true) {
Socket s = ss.accept();
processWithNewThread(s);
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void processWithNewThread(Socket s) {
Runnable run = () -> {
InetSocketAddress rsa = (InetSocketAddress)s.getRemoteSocketAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + counter.incrementAndGet());
try {
String result = readBytes(s.getInputStream());
System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.getAndDecrement());
s.close();
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(run).start();
}
static String readBytes(InputStream is) throws Exception {
long start = 0;
int total = 0;
int count = 0;
byte[] bytes = new byte[1024];
//开始读数据的时间
long begin = System.currentTimeMillis();
while ((count = is.read(bytes)) > -1) {
if (start < 1) {
//第一次读到数据的时间
start = System.currentTimeMillis();
}
total += count;
}
//读完数据的时间
long end = System.currentTimeMillis();
return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
}
static String time() { return sdf.format(new Date());
}
}
/**
* @author lixinjie
* @since 2019-05-07
*/
public class Client {
public static void main(String[] args) { try {
for (int i = 0; i < 20; i++) {
Socket s = new Socket();
s.connect(new InetSocketAddress("localhost", 8080));
processWithNewThread(s, i);
}
} catch (IOException e) {
e.printStackTrace();
}
}
static void processWithNewThread(Socket s, int i) { Runnable run = () -> {
try {
//睡眠随机的5-10秒,模拟数据尚未就绪
Thread.sleep((new Random().nextInt(6) + 5) * 1000);
//写1M数据,为了拉长服务器端读数据的过程
s.getOutputStream().write(prepareBytes());
//睡眠1秒,让服务器端把数据读完
Thread.sleep(1000);
s.close();
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(run).start();
}
static byte[] prepareBytes() {
byte[] bytes = new byte[1024*1024*1];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 1;
}
return bytes;
}
}
执行结果如下:时间->IP:Port->线程Id:当前线程数
15:11:52->127.0.0.1:55201->10:1
15:11:52->127.0.0.1:55203->12:2
15:11:52->127.0.0.1:55204->13:3
15:11:52->127.0.0.1:55207->16:4
15:11:52->127.0.0.1:55208->17:5
15:11:52->127.0.0.1:55202->11:6
15:11:52->127.0.0.1:55205->14:7
15:11:52->127.0.0.1:55206->15:8
15:11:52->127.0.0.1:55209->18:9
15:11:52->127.0.0.1:55210->19:10
15:11:52->127.0.0.1:55213->22:11
15:11:52->127.0.0.1:55214->23:12
15:11:52->127.0.0.1:55217->26:13
15:11:52->127.0.0.1:55211->20:14
15:11:52->127.0.0.1:55218->27:15
15:11:52->127.0.0.1:55212->21:16
15:11:52->127.0.0.1:55215->24:17
15:11:52->127.0.0.1:55216->25:18
15:11:52->127.0.0.1:55219->28:19
15:11:52->127.0.0.1:55220->29:20
时间->等待数据的时间,读取数据的时间,总共读取的字节数->线程Id:当前线程数15:11:58->wait=5012ms,read=1022ms,total=1048576bs->17:20
15:11:58->wait=5021ms,read=1022ms,total=1048576bs->13:19
15:11:58->wait=5034ms,read=1008ms,total=1048576bs->11:18
15:11:58->wait=5046ms,read=1003ms,total=1048576bs->12:17
15:11:58->wait=5038ms,read=1005ms,total=1048576bs->23:16
15:11:58->wait=5037ms,read=1010ms,total=1048576bs->22:15
15:11:59->wait=6001ms,read=1017ms,total=1048576bs->15:14
15:11:59->wait=6016ms,read=1013ms,total=1048576bs->27:13
15:11:59->wait=6011ms,read=1018ms,total=1048576bs->24:12
15:12:00->wait=7005ms,read=1008ms,total=1048576bs->20:11
15:12:00->wait=6999ms,read=1020ms,total=1048576bs->14:10
15:12:00->wait=7019ms,read=1007ms,total=1048576bs->26:9
15:12:00->wait=7012ms,read=1015ms,total=1048576bs->21:8
15:12:00->wait=7023ms,read=1008ms,total=1048576bs->25:7
15:12:01->wait=7999ms,read=1011ms,total=1048576bs->18:6
15:12:02->wait=9026ms,read=1014ms,total=1048576bs->10:5
15:12:02->wait=9005ms,read=1031ms,total=1048576bs->19:4
15:12:03->wait=10007ms,read=1011ms,total=1048576bs->16:3
15:12:03->wait=10006ms,read=1017ms,total=1048576bs->29:2
15:12:03->wait=10010ms,read=1022ms,total=1048576bs->28:1
可以看到服务器端确实为每个连接创建一个线程,共创建了20个线程。
等待数据”和“读取数据”这两个过程上。
一是有很多客户端同时发起请求的话,服务器端要创建很多的线程,可能会因为超过了上限而造成崩溃。
其实,VIP映射的是一对一的模型,主要体现在“专用”上或“私有”上。
真正的多路复用技术
可见,多路复用技术是一种一对多的模型,“多”的这一方复用了“一”的这一方。
其实,一对多的模型主要体现在“公用”上或“共享”上。
您先看着,我一会再过来
所以实际当中更多的情况是,客人坐下后,会给他一个菜单,让他先看着,反正也不可能立马点餐,服务员就去忙别的了。
连接建立后,找个地方把它放到那里,可以暂时先不管它,反正此时也没有数据可读。
先铺垫一下吧
1、专门设立一个“跑腿”服务员,工作职责单一,就是问问客人是否需要服务。
于是跑腿服务员就有了一个任务,替大堂经理盯梢。终于来客人了,跑腿服务员赶紧告诉了大堂经理。
于是跑腿服务员就又多了一个任务,就是盯着这桌客人,不时来问问,如果需要服务的话,就叫点餐服务员过来服务。
大堂经理和点餐服务员是需求的提供者或实现者,跑腿服务员是需求的发现者,并识别出需求的种类,需要接待的交给大堂经理,需要点餐的交给点餐服务员。
哈哈,Java NIO来啦
代码的写法非常的固定,可以配合着后面的解说来看,这样就好理解了,如下:/**
* @author lixinjie
* @since 2019-05-07
*/
public class NioServer {
static int clientCount = 0; static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress("localhost", 8080));
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel ssc1 = (ServerSocketChannel)key.channel();
SocketChannel sc = null;
while ((sc = ssc1.accept()) != null) {
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
InetSocketAddress rsa = (InetSocketAddress)sc.socket().getRemoteSocketAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
}
} else if (key.isReadable()) {
//先将“读”从感兴趣操作移出,待把数据从通道中读完后,再把“读”添加到感兴趣操作中
//否则,该通道会一直被选出来
key.interestOps(key.interestOps() & (~ SelectionKey.OP_READ));
processWithNewThread((SocketChannel)key.channel(), key);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void processWithNewThread(SocketChannel sc, SelectionKey key) { Runnable run = () -> {
counter.incrementAndGet();
try {
String result = readBytes(sc);
//把“读”加进去
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.get());
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
counter.decrementAndGet();
};
new Thread(run).start();
}
static String readBytes(SocketChannel sc) throws Exception {
long start = 0;
int total = 0;
int count = 0;
ByteBuffer bb = ByteBuffer.allocate(1024);
//开始读数据的时间
long begin = System.currentTimeMillis();
while ((count = sc.read(bb)) > -1) {
if (start < 1) {
//第一次读到数据的时间
start = System.currentTimeMillis();
}
total += count;
bb.clear();
}
//读完数据的时间
long end = System.currentTimeMillis();
return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
}
static String time() {
return sdf.format(new Date());
}
}
1、定义一个选择器,Selector。
相当于设立一个跑腿服务员。
相等于聘请了一位大堂经理。
相当于大堂经理给跑腿服务员说,帮我盯着门外,有客人来了告诉我。
相当于跑腿服务员一遍又一遍的去询问、去转悠。
相当于跑腿服务员终于发现门外来客人了,客人是需要接待的。
相当于跑腿服务员把大堂经理叫来了,大堂经理开始着手接待。
相当于大堂经理把客人带到座位上,给了客人菜单,并又把客人委托给跑腿服务员,说客人接下来肯定是要点餐的,你不时的来问问。
相当于跑腿服务员继续不时的询问着、转悠着。
相当于跑腿服务员终于发现了一桌客人有了需求,是需要点餐的。
相当于跑腿服务员叫来了点餐服务员,点餐服务员开始为客人写菜单。
相当于点餐服务员写好菜单后,就走了,可以再去为其他客人写菜单。
相当于跑腿服务员继续着重复的询问、转悠,不知道未来在何方。
相信你已经看出来了,大堂经理相当于服务器端套接字,跑腿服务员相当于选择器,点餐服务员相当于Worker线程。时间->IP:Port->主线程Id:当前连接数
16:34:39->127.0.0.1:56105->1:1
16:34:39->127.0.0.1:56106->1:2
16:34:39->127.0.0.1:56107->1:3
16:34:39->127.0.0.1:56108->1:4
16:34:39->127.0.0.1:56109->1:5
16:34:39->127.0.0.1:56110->1:6
16:34:39->127.0.0.1:56111->1:7
16:34:39->127.0.0.1:56112->1:8
16:34:39->127.0.0.1:56113->1:9
16:34:39->127.0.0.1:56114->1:10
16:34:39->127.0.0.1:56115->1:11
16:34:39->127.0.0.1:56116->1:12
16:34:39->127.0.0.1:56117->1:13
16:34:39->127.0.0.1:56118->1:14
16:34:39->127.0.0.1:56119->1:15
16:34:39->127.0.0.1:56120->1:16
16:34:39->127.0.0.1:56121->1:17
16:34:39->127.0.0.1:56122->1:18
16:34:39->127.0.0.1:56123->1:19
16:34:39->127.0.0.1:56124->1:20
时间->等待数据的时间,读取数据的时间,总共读取的字节数->线程Id:当前线程数16:34:45->wait=1ms,read=1018ms,total=1048576bs->11:5
16:34:45->wait=0ms,read=1054ms,total=1048576bs->10:5
16:34:45->wait=0ms,read=1072ms,total=1048576bs->13:6
16:34:45->wait=0ms,read=1061ms,total=1048576bs->14:5
16:34:45->wait=0ms,read=1140ms,total=1048576bs->12:4
16:34:46->wait=0ms,read=1001ms,total=1048576bs->15:5
16:34:46->wait=0ms,read=1062ms,total=1048576bs->17:6
16:34:46->wait=0ms,read=1059ms,total=1048576bs->16:5
16:34:47->wait=0ms,read=1001ms,total=1048576bs->19:4
16:34:47->wait=0ms,read=1001ms,total=1048576bs->20:4
16:34:47->wait=0ms,read=1015ms,total=1048576bs->18:3
16:34:47->wait=0ms,read=1001ms,total=1048576bs->21:2
16:34:48->wait=0ms,read=1032ms,total=1048576bs->22:4
16:34:49->wait=0ms,read=1002ms,total=1048576bs->23:3
16:34:49->wait=0ms,read=1001ms,total=1048576bs->25:2
16:34:49->wait=0ms,read=1028ms,total=1048576bs->24:4
16:34:50->wait=0ms,read=1008ms,total=1048576bs->28:4
16:34:50->wait=0ms,read=1033ms,total=1048576bs->27:3
16:34:50->wait=1ms,read=1002ms,total=1048576bs->29:2
16:34:50->wait=0ms,read=1001ms,total=1048576bs->26:2
服务器端接受20个连接,创建20个通道,并把它们注册到选择器上,此时不需要额外线程。
处理同样的20个请求,一个需要用20个线程,一个需要用6个线程,节省了70%线程数。
其实对于选择器的个数、选择器运行在哪个线程里、是否使用新的线程来处理请求都没有要求,要根据实际情况来定。
当然,也可以使用两个选择器,一个处理OP_ACCEPT,一个处理OP_READ,让它们分别运行在两个单独的I/O线程里。对于能快速完成的操作可以直接在I/O线程里做了,对于非常耗时的操作一定要使用Worker线程池来处理。
这种处理模式就是被称为的多路复用I/O,多路指的是多个Socket通道,复用指的是只用一个线程来管理它们。
再稍微分析一下
从VIP到多路复用,形式上确实有很大的不同,其本质是从一对一到一对多的转变,其实就是牺牲了响应速度,换来了效率的提升,不过综合性能还是得到了极大的改进。
就饭店而言,究竟几张桌子配一个跑腿服务员,几张桌子配一个点餐服务员,经过一段时间运行,一定会有一个最优解。
那就是抛弃服务员服务客人这种模式,把饭店改成自助餐厅。
嘻嘻,Java AIO来啦/**
* @author lixinjie
* @since 2019-05-13
*/
public class AioServer {
static int clientCount = 0; static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();
assc.bind(new InetSocketAddress("localhost", 8080));
//非阻塞方法,其实就是注册了个回调,而且只能接受一个连接
assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
public void completed(AsynchronousSocketChannel asc, Object attachment) {
//再次注册,接受下一个连接
assc.accept(null, this);
try {
InetSocketAddress rsa = (InetSocketAddress)asc.getRemoteAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
} catch (Exception e) {
}
readFromChannelAsync(asc);
}
public void failed(Throwable exc, Object attachment) {
}
});
//不让主线程退出
synchronized (AioServer.class) {
AioServer.class.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void readFromChannelAsync(AsynchronousSocketChannel asc) { //会把数据读入到该buffer之后,再触发工作线程来执行回调
ByteBuffer bb = ByteBuffer.allocate(1024*1024*1 + 1);
long begin = System.currentTimeMillis();
//非阻塞方法,其实就是注册了个回调,而且只能接受一次读取
asc.read(bb, null, new CompletionHandler<Integer, Object>() {
//从该连接上一共读到的字节数
int total = 0;
/**
* @param count 表示本次读取到的字节数,-1表示数据已读完
*/
public void completed(Integer count, Object attachment) {
counter.incrementAndGet();
if (count > -1) {
total += count;
}
int size = bb.position();
System.out.println(time() + "->count=" + count + ",total=" + total + "bs,buffer=" + size + "bs->" + Thread.currentThread().getId() + ":" + counter.get());
if (count > -1) {//数据还没有读完
//再次注册回调,接受下一次读取
asc.read(bb, null, this);
} else {//数据已读完
try {
asc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
counter.decrementAndGet();
}
public void failed(Throwable exc, Object attachment) {
}
});
long end = System.currentTimeMillis();
System.out.println(time() + "->exe read req,use=" + (end -begin) + "ms" + "->" + Thread.currentThread().getId());
}
static String time() {
return sdf.format(new Date());
}
}
它的大致处理过程如下:
1、初始化一个AsynchronousServerSocketChannel对象,并开始监听
注:如果出现ByteBuffer空间不足,则系统不会装入数据,就会导致客户端数据总是读不完,极有可能进入死循环。时间->IP:Port->回调线程Id:当前连接数
17:20:47->127.0.0.1:56454->15:1
时间->发起一个读请求,耗时->回调线程Id
17:20:47->exe read req,use=3ms->15
17:20:47->127.0.0.1:56455->15:2
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56456->15:3
17:20:47->exe read req,use=0ms->15
17:20:47->127.0.0.1:56457->16:4
17:20:47->127.0.0.1:56458->15:5
17:20:47->exe read req,use=1ms->16
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56460->15:6
17:20:47->127.0.0.1:56459->17:7
17:20:47->exe read req,use=0ms->15
17:20:47->127.0.0.1:56462->15:8
17:20:47->127.0.0.1:56461->16:9
17:20:47->exe read req,use=1ms->15
17:20:47->exe read req,use=0ms->16
17:20:47->exe read req,use=0ms->17
17:20:47->127.0.0.1:56465->16:10
17:20:47->127.0.0.1:56463->18:11
17:20:47->exe read req,use=0ms->18
17:20:47->127.0.0.1:56466->15:12
17:20:47->exe read req,use=1ms->16
17:20:47->127.0.0.1:56464->17:13
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56467->18:14
17:20:47->exe read req,use=2ms->17
17:20:47->exe read req,use=1ms->18
17:20:47->127.0.0.1:56468->15:15
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56469->16:16
17:20:47->127.0.0.1:56470->18:17
17:20:47->exe read req,use=1ms->18
17:20:47->exe read req,use=1ms->16
17:20:47->127.0.0.1:56472->15:18
17:20:47->127.0.0.1:56473->19:19
17:20:47->exe read req,use=2ms->15
17:20:47->127.0.0.1:56471->17:20
17:20:47->exe read req,use=1ms->19
17:20:47->exe read req,use=1ms->17
时间->本次接受到的字节数,截至到目前接受到的字节总数,buffer中的字节总数->回调线程Id:当前线程数17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=230188,total=295724bs,buffer=295724bs->12:1
17:20:52->count=752852,total=1048576bs,buffer=1048576bs->14:3
17:20:52->count=131072,total=196608bs,buffer=196608bs->17:2
。。。。。。。。。。。。。。。。。。。。。。
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:117:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
系统接受到连接后,在工作线程中执行了回调。并且在回调中执行了read方法,耗时是0,因为只是注册了个接受数据的回调而已。
处理同样的20个请求,一个需要用20个线程,一个需要用6个线程,一个需要3个线程,又节省了50%线程数。
注:不用特别较真这个比较结果,这里只是为了说明问题而已。哈哈。
三种处理方式的对比
第一种是阻塞IO,阻塞点有两个,等待数据就绪的过程和读取数据的过程。
可见,这是一个逐步消除阻塞点的过程。
只有一个线程,接受一个连接,读取数据,处理业务,写回结果,再接受下一个连接,这是同步阻塞。这种用法几乎没有。
redis也是多路复用,但它只有一个线程在执行select操作,处理就绪的连接,整个是串行化的,所以天然不存在并发问题。只能把它归为同步阻塞了。
BIO是阻塞IO,可以是同步阻塞,也可以是异步阻塞。AIO是异步IO,只有异步非阻塞这一种。因此没有同步非阻塞这种说法,因为同步一定是阻塞的。
注:以上的说法是站在用户程序/线程的立场上来说的。
建议把代码下载下来,自己运行一下,体会体会:
相关文章
- 暂无相关文章
用户点评