0
点赞
收藏
分享

微信扫一扫

基于java nio模拟redis服务端

飞鸟不急 2021-09-24 阅读 96

背景

在刚工作的时候,有一回老大曾鼓励我们自己完成一个redis服务,能够实现简单的get, set , del 等,后来也不知为何,没有完成这个事情,一晃这都很多年过去了。最近有了解netty相关的东西,再次接触到网络编程,遂想基于java底层的网络api,实现个简单的demo。至于为何还是选择redis,是因为redis协议足够简单,且可实现的功能也比较有操作性。通过这些可以直观地了解网络交互,以及协议等细节,加深理解,想必这也是当时大佬让我们自己实现redis服务端的初衷。

描述

这里主要以java nio为基础,完成一个可以进行get set del ping命令的redis服务端,重点在协议部分。关于nio以后有机会再详细介绍。数据存储也只是用一个HashMap代替,实际的存储实现肯定要复杂好几个数量级。回到实现,首先用nio完成一个通用的网络模型SelectorServer,这里就用到了各种SocketChannelSelectionKey。在Server中轮询,然后获取到注册的事件keys,进行遍历,再根据不同的事件类型,进行处理。处理事件类型逻辑,交给下游的实现类完成,即AcceptWriteRead

而具体的实现类RedisServer中,在取到对应待处理事件后(AcceptWriteRead), 分别处理三种事件,实现各自的逻辑。注意accept事件后,再给附带channel注册read事件,而处理read事件后,给附带的channel注册write事件,这样就完成了一个获得连接,读取请求数据,完成处理,写入回复的完整过程。
具体的可以看代码,注意其中的注释。

实现代码

通用的nio服务端模板

一个通用模板,绑定ip的端口,然后启动服务,首先注册accept事件等待首次连接,然后交给selector处理,拿到所有触发的事件集合selectionKey,具体依据不同类型的事件,交给下游的扩展类完成。

package server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;

public abstract class SelectorServer {

    public void start() {
        ServerSocketChannel serverChannel = null;
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.bind(new InetSocketAddress("127.0.0.1", 6379)); //绑定ip和端口
            serverChannel.configureBlocking(false); //设置非阻塞,nio特性
            Selector selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);  //处理连接进入事件
            while (true) {
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();  //触发的事件集合
                while (iterator.hasNext()) {
                    handle(iterator.next());
                    iterator.remove();
                }
            }


        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }finally {
            if (null != serverChannel) {
                try {
                    serverChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    protected abstract void handle(SelectionKey key) throws IOException;  //扩展类需要实现的方法。
}

实现redis的读写

这里就是对应的网络交互部分,连接之后,注册读事件,然后完成redis协议的交互,完成后再注册写事件,并把结果添加到attachment。写事件处理中,拿到attachment,通过channel写入,最后关闭channel即可,实际情况,服务端的连接应该是没有主动关闭的。

package server;

import handler.RedisHandler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class RedisServer extends SelectorServer {

    @Override
    protected void handle(SelectionKey key) throws IOException {
        //根据不同事件处理
        if (key.isAcceptable()) {
            handleAccept(key);
        } else if (key.isReadable()) {
            handleRead(key);
        } else if (key.isWritable()) {
            handleWrite(key);
        }
    }

    //1 先连接
    private void handleAccept(SelectionKey key) throws IOException {
        SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
        channel.configureBlocking(false);
        channel.register(key.selector(), SelectionKey.OP_READ);
    }

    //3 最后写
    private static void handleWrite(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        channel.configureBlocking(false);
        String att = (String)key.attachment();  //获取到结果数据
        channel.write(ByteBuffer.wrap(att.getBytes(StandardCharsets.UTF_8)));
        channel.close();  //写入并关闭channel
    }

    //2 再读取
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocateDirect(2048);
        StringBuilder builder = new StringBuilder();
        while (channel.read(buffer) > 0) {
            builder.append(printString(buffer));
        }
        String resp = RedisHandler.handleCmd(builder.toString());  //redis逻辑处理完成,然后进行返回给客户端
        channel.register(key.selector(), SelectionKey.OP_WRITE, resp);  //结果放到attachment
    }

   public static String printString(ByteBuffer byteBuffer) {
        byteBuffer.flip();
        byte[] bytes = new byte[byteBuffer.limit() - byteBuffer.position()];
        byteBuffer.get(bytes);
        String s = new String(bytes, StandardCharsets.UTF_8);
//        System.out.println(s);
        return s;
    }
}

redis协议的部分

对于redis协议的处理,这里先都读取成文本,主要是为了可读。如果是请求,redis服务端收到的内容如下,比如一个get请求:

*2\r\n$3\r\nget\r\n$1\r\na\r\n

比较可读的话是

*2
$3
get
$1
a

表示 get a。一般是用星号*开头,然后一个数字表示后面是多少个标识,如get a,有2个,开头就是*2。然后每个元素用\r\n的换行符分隔。再后面的$3,即表示其后挨着的元素有多少个字符,如get就是$3ping就是$4。基于如上的逻辑,写了一个简单的parse()方法,解析到服务端收到的redis命令和参数,返回是一个命令的包装类,包括元素个数,命令,参数key,参数value。

对于返回这里用了三种格式。

错误返回

- 开头,比如unknown command

状态返回

+ 开头,比如 +ok

”批量“返回

$开头,这里没有理解所谓批量是什么意思,只是在查询没有值的时候,返回$null,客户端展示为(nil)

package handler;

import java.util.HashMap;

public class RedisHandler {
    //用一个HashMap表示存储,实现get set命令
    private static final HashMap<String, String> store = new HashMap<>();

    //处理不同的命令
    public static String handleCmd(String cmd) {
        Cmd c = parse(cmd);
        String value = "ok";
        switch (c.cmd) {
            case "get":
                value = store.get(c.key);
                break;
            case "set":
                store.put(c.key,c.value);
                break;
            case "del":
                store.remove(c.key);
                break;
            case "ping":
                value = "pong";
                break;
                default:
                    value = "-" + "unknown cmd\r\n";
                    return value;
        }
        if (value != null) {
            value = "+" + value + "\r\n";
        }else {
            value = "$null\r\n";
        }
        return value;
    }

   //解析协议
    private static Cmd parse(String cmd) {
        String[] segs = cmd.split("\r\n");
        Cmd c  = new Cmd();
        int idx = 0;
        c.setArgs(segs[idx++].charAt(1) - '0');  //表示元素数目
        c.setCmd(segs[++idx]);  //第二个为命令
        idx++;
        if (c.args > 1) {
            c.setKey(segs[++idx]);    //一般第三个为key, 要注意ping的情况,没有key
        }
        idx++;
        if (c.args > 2) {
            c.setValue(segs[++idx]);  //set 的时候用到
        }
        return c;
    }

    static class Cmd {
        int args; //元素数目
        String key;
        String cmd;
        String value;

        public Cmd() {
        }

        public int getArgs() {
            return args;
        }

        public void setArgs(int args) {
            this.args = args;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }

        public String getCmd() {
            return cmd;
        }

        public void setCmd(String cmd) {
            this.cmd = cmd;
        }

        public String getValue() {
            return value;
        }

        public void setValue(String value) {
            this.value = value;
        }
    }
}


服务启动入口

服务启动入口比较简单,新建个服务端,然后启动即可。

import server.RedisServer;

public class Main {

    public static void main(String[] args) {
        System.out.println("Hello World!");
        //        Server.start();
//        new HttpChannelServer().start();
        new RedisServer().start();
    }
}

样例展示

总结

以上就是本期的所有内容,基本实现了需要的功能,后续计划再在存储,协议方面多做了解,争取能详细分享一篇。
“欠的债总归是要还的”,能量和收益某些时刻会趋向平衡,还是要好好学习,稳步发展,一点感慨。

参考资料

  1. redis协议
  2. cheap redis
举报

相关推荐

0 条评论