易错知识点总结
-
NIO基于Channel(通道)和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道
-
selector、channel、buffer关系图
关系说明:
1.每个channel都会对应一个buffer
2.selector对应一个线程,一个selector可以监听多个channel(连接)
3.程序根据selector监听切换到哪个channel是由事件决定的
4.selector会根据监听到的不同的事件在各个通道上切换
5.buffer就是一个内存块,底层有一个数组
6.NIO中的buffer是可以读也可以写,但需要flip等方法进行切换
7.channel是双向的,可以同时进行读写,可以实现异步读写数据,可以从缓冲读数据,也可以写数据到缓冲 -
ServerSocketChannel和SocketChannel用于TCP的数据读写。
-
ServerSocketChannel和SocketChannel的作用:简单理解ServerSocketChannel是当客户端向服务器端发起连接请求时,服务器端通过ServerSocketChannel分配给此客户端一个SocketChannel,然后客户端与服务器端的通讯就依赖这个SocketChannel。无论是客户端还是服务器端,channel都需要通过buffer进行连接。
每一个客户端都有一个SocketChannel与服务器端进行通讯,channel.read(buffer),将channel里的内容写入buffer(从通道里面读取内容),channel.write(buffer),将buffer的内容写入channel(将内容写入通道)。 -
selector: Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
-
OP_READ 事件不仅仅只有可读时才触发,以下情况都会触发:channel 中数据读取完毕、连接管道的另一端被关闭、有一个错误的 pending、对方发送消息过来。
案例一:
简单的服务器端、客户端连接(控制台显示)
服务器端代码:
package com.haust.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class server {
public static void main(String[] args) throws IOException {
//创建ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个selector对象
Selector selector = Selector.open();
//绑定一个本地6666端口,在服务器进行监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//将channel设置为非阻塞
serverSocketChannel.configureBlocking(false);
//把serverSocketChannel注册到selector,监听的事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
//这里我们等待1秒,如果没有事件发生,返回
if (selector.select(1000) == 0){//在1秒内没有事件发生
System.out.println("服务器等待1秒,无连接");
continue;
}
//如果返回的>0,就获取到相关的selectionKey集合
//selector.keys()表示注册到selector上channel的数量
//1.如果返回的>0,表示已经获取到关注的事件
//2.selector.selectedKeys()返回监听到的事件集合
//通过selectionKeys反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//遍历Set<SelectionKey>,在这里我们使用迭代器遍历
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
//获取到Selectionkey
SelectionKey key = keyIterator.next();
//根据key对应的通道发生的事件做相应的处理
if (key.isAcceptable()){//如果是accept事件表示有新的客户端连接
//为该客户端生成一个socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
//设置成非阻塞
socketChannel.configureBlocking(false);
//将socketChannel注册到selector,监听事件为OP_READ,同时给socketChannel关联一个buffer
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()){
//通过key反向获取到channel
SocketChannel channel = (SocketChannel)key.channel();
//获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
//读取数据
channel.read(buffer);
System.out.println("from 客户端"+new String(buffer.array()));
}
//手动从集合中移动当前的selectionkey,防止重复操作
keyIterator.remove();
}
}
}
}
客户端代码
package com.haust.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class client {
public static void main(String[] args) throws IOException {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//提供服务器端的IP和端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作..");
}
}
//如果连接成功,就发送数据
String str = "hello nio";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
//发送数据,将buffer数据写入channel
socketChannel.write(buffer);
System.in.read();
}
}
效果:
案例二:群聊系统
简单的服务器端和客户端的群聊系统(控制台板)
服务器端代码:
package com.haust.groupchat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class GroupChatServer {
//定义属性
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT =6667;
//构造器 初始化工作
public GroupChatServer(){
try {
//得到选择器
selector = Selector.open();
//ServerSocketChannel
listenChannel = ServerSocketChannel.open();
//绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
//设置非阻塞模式
listenChannel.configureBlocking(false);
//将listenChannel注册到selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
}catch (IOException e) {
e.printStackTrace();
}
}
public void listen(){
try {
while (true){
int count = selector.select(2000);//select方法本身是阻塞的(不带参数),2000代表2秒后返回结果,变为非阻塞的
if (count>0){//说明有事件发生
//遍历得到selectionKey集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
//取出selectionkey
SelectionKey key = iterator.next();
//监听到accept
if (key.isAcceptable()){
SocketChannel sc = listenChannel.accept();
//将该sc注册到selector
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
//提示
System.out.println(sc.getRemoteAddress()+"上线");
}
if (key.isReadable()){
readData(key);
}
//防止重复操作
iterator.remove();
}
}else {
// System.out.println("等待...");
}
}
}catch (IOException e) {
e.printStackTrace();
}finally {
//发生异常时处理
}
}
//读取客户端消息
public void readData(SelectionKey key){
SocketChannel channel = null;
try {
//得到channel
channel = (SocketChannel) key.channel();
//创建buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
//根据count的值进行相应的处理
if (count>0){
//把缓存区的数据转成字符串
String msg = new String(buffer.array());
//输出该消息
System.out.println("from 客户端"+msg);
//向其他客户端转发消息(要去掉自己)
sendInfoToOtherClients(msg,channel);
}
}catch (IOException e){
try {
System.out.println(channel.getRemoteAddress()+"离线了");
//取消注册
key.cancel();
//关闭通道
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
//向其他客户端转发消息的方法
private void sendInfoToOtherClients(String msg,SocketChannel self) throws IOException {
System.out.println("服务器转发消息中");
//遍历所有注册到selector上的SocketChannel,并排除self
for (SelectionKey key : selector.keys()) {
//通过key 取出对应的SocketChannel
Channel targetChannel = key.channel();
//排除自己
if (targetChannel instanceof SocketChannel && targetChannel != self){
//转型
SocketChannel dest = (SocketChannel)targetChannel;
//将msg 存储到buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//将buffer的数据写入通道
dest.write(buffer);
}
}
}
public static void main(String[] args) {
//创建服务器对象
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}
客户端代码
package com.haust.groupchat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class GroupChatClient {
//定义相关属性
private final String HOST = "127.0.0.1";//服务器的IP
private final int PORT = 6667;//服务器端口
private Selector selector;
private SocketChannel socketChannel;
private String username;
//构造器,完成初始化工作
public GroupChatClient() throws IOException {
selector = Selector.open();
//连接服务器
socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1",PORT ));
//设置非阻塞
socketChannel.configureBlocking(false);
//将channel注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
//得到username
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username+"is OK");
}
//向服务器发送消息
public void sendInfo(String info){
info = username + " 说:" +info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
public void readInfo(){
try {
int readChannels = selector.select(2000);
if (readChannels > 0){//表明有可以用的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if (key.isReadable()){
//得到相关的通道
SocketChannel sc = (SocketChannel)key.channel();
//得到一个buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取
sc.read(buffer);
//把读到的缓冲区的数据转换成字符串
String msg = new String(buffer.array());
System.out.println(msg.trim());
//删除当前的selectionkey防止重复操作
iterator.remove();
} else{
// System.out.println("暂时没有通道");
}
}
}
}catch (IOException e){
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
//启动客户端
final GroupChatClient chatClient = new GroupChatClient();
//启动一个线程,每隔三秒,读取从服务器发送数据
new Thread(){
public void run(){
while (true){
chatClient.readInfo();
try{
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
//发送数据给服务器端端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}
特别注意:注册到selector的channel是非阻塞的!在监听事件处理结束后需要将此监听事件对应的key移除,防止重复执行。(如果不移除下次还会在集合中)
同一个程序,多次运行的话(应用不同的线程)需要在idea中设置
效果:
将三个客户端断开