0
点赞
收藏
分享

微信扫一扫

NIO实现多人聊天


概述:

NIO即non-blocking IO,顾名思义是一种非阻塞模型。NIO的目的就是实现一个线程处理多个连接,故引入了几个重要的核心概念:

  • Buffer,缓冲区。Buffer底层是一个数组,供Channel实现对数据的读写。Buffer的position、limit、capacity分别指当前索引、读/写上限索引、数组容量。
  • Channel,管道。Channel可以理解为连接,与BIO中Sokcet类似,一个连接对应一个Channel,但Channel中仍内置了一个Socket,可以调用socket()获取。
  • Selector,选择器。Selector类似一个调度中心,所有Channel都需要注册到选择器中,并绑定一个SelectionKey,绑定时还会指定要监听的事件,如:连接就绪、读就绪、写就绪等。可以调用Selector提供的API实现对发生监听事件的连接进行处理。

一、功能:

编写一个NIO群聊系统,实现客户端与客户端的通信需求(非阻塞)
1、服务器端:监测用户是否上线,离线,并实现消息的转发
2、客户端:接收服务器其转发它客户端发送的消息,同时也可发送消息给服务器

二、实现:

服务器端:

public class NioChatServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public static final Integer PORT = 9999;

    public NioChatServer() {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(PORT));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void listen() {
        try {
            while (selector.select() > 0) {
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = it.next();
                    if (selectionKey.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {
                        readClientData(selectionKey);
                    }
                    it.remove();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 接收客户端消息,并转发给所有客户端
     *
     * @param selectionKey
     */
    private void readClientData(SelectionKey selectionKey) {
        SocketChannel channel = null;
        try {
            channel = (SocketChannel) selectionKey.channel();
            channel.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = 0;
            while ((count = channel.read(buffer)) > 0) {
                buffer.flip();
                String msg = new String(buffer.array(), 0, buffer.remaining());
                System.out.println("接收到的消息:" + msg);
                //发送消息给所有人
                sendMsgAllClients(msg, channel);
            }

        } catch (IOException e) {
            try {
                System.out.println("有人离线了...." + channel.getRemoteAddress());
                //发生异常,取消注册
                selectionKey.cancel();
                //关闭通道,离线
                channel.close();
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }

        }

    }

    private void sendMsgAllClients(String msg, SocketChannel currentChannel) {
        try {
            System.out.println("服务器开始转发消息===" + Thread.currentThread().getName());
            for (SelectionKey selectionKey : selector.keys()) {
                Channel channel = selectionKey.channel();
                //排除自己,不发送给自己
                if (channel instanceof SocketChannel && channel != currentChannel) {
                    //把msg转成buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    //转化为SocketChannel
                    SocketChannel socketChannel = (SocketChannel) channel;
                    //发送消息
                    socketChannel.write(buffer);
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        NioChatServer server = new NioChatServer();
        server.listen();
    }
}

客户端

public class NioChatClient {

    private Selector selector;
    private SocketChannel socketChannel;
    private static final String IP = "127.0.0.1";

    public NioChatClient() {
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open(new InetSocketAddress(IP, NioChatServer.PORT));
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        NioChatClient client = new NioChatClient();
        //监听服务器回复的消息
        new Thread(
                new Runnable() {
                    @Override
                    public void run() {
                        client.readInfo();
                    }
                }
        ).start();
        //发送消息给服务器
        Scanner scanner = new Scanner(System.in);
        System.out.print("请说:");
        while (scanner.hasNextLine()){
            String msg = scanner.nextLine();
            //发送消息给服务器
            client.sendMsgToServer(msg);
        }
    }

    private void sendMsgToServer(String msg) {
        try {
            String name = Thread.currentThread().getName();
            socketChannel.write(ByteBuffer.wrap(("线程【" + name + "】说:" + msg).getBytes()));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void readInfo() {
        try {
            while (selector.select() > 0){
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isReadable()){
                        SocketChannel channel = (SocketChannel) selectionKey.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int count = channel.read(buffer);
                        if(count > 0){
                            buffer.flip();
                            System.out.println("收到服务器的消息:" + new String(buffer.array(), 0, buffer.remaining()));
                        }
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

三、源码下载

https://gitee.com/charlinchenlin/store-pos


举报

相关推荐

0 条评论