概述:
NIO即non-blocking IO,顾名思义是一种非阻塞模型。NIO的目的就是实现一个线程处理多个连接,故引入了几个重要的核心概念:
- Buffer,缓冲区。Buffer底层是一个数组,供Channel实现对数据的读写。Buffer的position、limit、capacity分别指当前索引、读/写上限索引、数组容量。
- Channel,管道。Channel可以理解为连接,与BIO中Sokcet类似,一个连接对应一个Channel,但Channel中仍内置了一个Socket,可以调用socket()获取。
- Selector,选择器。Selector类似一个调度中心,所有Channel都需要注册到选择器中,并绑定一个SelectionKey,绑定时还会指定要监听的事件,如:连接就绪、读就绪、写就绪等。可以调用Selector提供的API实现对发生监听事件的连接进行处理。
一、功能:
编写一个NIO群聊系统,实现客户端与客户端的通信需求(非阻塞)
1、服务器端:监测用户是否上线,离线,并实现消息的转发
2、客户端:接收服务器其转发它客户端发送的消息,同时也可发送消息给服务器
二、实现:
服务器端:
public class NioChatServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public static final Integer PORT = 9999;
public NioChatServer() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void listen() {
try {
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
if (selectionKey.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
readClientData(selectionKey);
}
it.remove();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 接收客户端消息,并转发给所有客户端
*
* @param selectionKey
*/
private void readClientData(SelectionKey selectionKey) {
SocketChannel channel = null;
try {
channel = (SocketChannel) selectionKey.channel();
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = 0;
while ((count = channel.read(buffer)) > 0) {
buffer.flip();
String msg = new String(buffer.array(), 0, buffer.remaining());
System.out.println("接收到的消息:" + msg);
//发送消息给所有人
sendMsgAllClients(msg, channel);
}
} catch (IOException e) {
try {
System.out.println("有人离线了...." + channel.getRemoteAddress());
//发生异常,取消注册
selectionKey.cancel();
//关闭通道,离线
channel.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
private void sendMsgAllClients(String msg, SocketChannel currentChannel) {
try {
System.out.println("服务器开始转发消息===" + Thread.currentThread().getName());
for (SelectionKey selectionKey : selector.keys()) {
Channel channel = selectionKey.channel();
//排除自己,不发送给自己
if (channel instanceof SocketChannel && channel != currentChannel) {
//把msg转成buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//转化为SocketChannel
SocketChannel socketChannel = (SocketChannel) channel;
//发送消息
socketChannel.write(buffer);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
NioChatServer server = new NioChatServer();
server.listen();
}
}
客户端
public class NioChatClient {
private Selector selector;
private SocketChannel socketChannel;
private static final String IP = "127.0.0.1";
public NioChatClient() {
try {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(IP, NioChatServer.PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
NioChatClient client = new NioChatClient();
//监听服务器回复的消息
new Thread(
new Runnable() {
@Override
public void run() {
client.readInfo();
}
}
).start();
//发送消息给服务器
Scanner scanner = new Scanner(System.in);
System.out.print("请说:");
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
//发送消息给服务器
client.sendMsgToServer(msg);
}
}
private void sendMsgToServer(String msg) {
try {
String name = Thread.currentThread().getName();
socketChannel.write(ByteBuffer.wrap(("线程【" + name + "】说:" + msg).getBytes()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void readInfo() {
try {
while (selector.select() > 0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if(selectionKey.isReadable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
if(count > 0){
buffer.flip();
System.out.println("收到服务器的消息:" + new String(buffer.array(), 0, buffer.remaining()));
}
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
三、源码下载
https://gitee.com/charlinchenlin/store-pos