背景
在刚工作的时候,有一回老大曾鼓励我们自己完成一个redis服务,能够实现简单的get, set , del 等,后来也不知为何,没有完成这个事情,一晃这都很多年过去了。最近有了解netty相关的东西,再次接触到网络编程,遂想基于java底层的网络api,实现个简单的demo。至于为何还是选择redis,是因为redis协议足够简单,且可实现的功能也比较有操作性。通过这些可以直观地了解网络交互,以及协议等细节,加深理解,想必这也是当时大佬让我们自己实现redis服务端的初衷。
描述
这里主要以java nio为基础,完成一个可以进行get set del ping
命令的redis服务端,重点在协议部分。关于nio以后有机会再详细介绍。数据存储也只是用一个HashMap代替,实际的存储实现肯定要复杂好几个数量级。回到实现,首先用nio完成一个通用的网络模型SelectorServer
,这里就用到了各种SocketChannel
,SelectionKey
。在Server中轮询,然后获取到注册的事件keys,进行遍历,再根据不同的事件类型,进行处理。处理事件类型逻辑,交给下游的实现类完成,即Accept
,Write
,Read
。
而具体的实现类RedisServer
中,在取到对应待处理事件后(Accept
,Write
,Read
), 分别处理三种事件,实现各自的逻辑。注意accept事件后,再给附带channel注册read事件,而处理read事件后,给附带的channel注册write事件,这样就完成了一个获得连接,读取请求数据,完成处理,写入回复的完整过程。
具体的可以看代码,注意其中的注释。
实现代码
通用的nio服务端模板
一个通用模板,绑定ip的端口,然后启动服务,首先注册accept事件等待首次连接,然后交给selector处理,拿到所有触发的事件集合selectionKey,具体依据不同类型的事件,交给下游的扩展类完成。
package server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
public abstract class SelectorServer {
public void start() {
ServerSocketChannel serverChannel = null;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress("127.0.0.1", 6379)); //绑定ip和端口
serverChannel.configureBlocking(false); //设置非阻塞,nio特性
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT); //处理连接进入事件
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); //触发的事件集合
while (iterator.hasNext()) {
handle(iterator.next());
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}finally {
if (null != serverChannel) {
try {
serverChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
protected abstract void handle(SelectionKey key) throws IOException; //扩展类需要实现的方法。
}
实现redis的读写
这里就是对应的网络交互部分,连接之后,注册读事件,然后完成redis协议的交互,完成后再注册写事件,并把结果添加到attachment。写事件处理中,拿到attachment,通过channel写入,最后关闭channel即可,实际情况,服务端的连接应该是没有主动关闭的。
package server;
import handler.RedisHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class RedisServer extends SelectorServer {
@Override
protected void handle(SelectionKey key) throws IOException {
//根据不同事件处理
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
} else if (key.isWritable()) {
handleWrite(key);
}
}
//1 先连接
private void handleAccept(SelectionKey key) throws IOException {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.configureBlocking(false);
channel.register(key.selector(), SelectionKey.OP_READ);
}
//3 最后写
private static void handleWrite(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
channel.configureBlocking(false);
String att = (String)key.attachment(); //获取到结果数据
channel.write(ByteBuffer.wrap(att.getBytes(StandardCharsets.UTF_8)));
channel.close(); //写入并关闭channel
}
//2 再读取
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocateDirect(2048);
StringBuilder builder = new StringBuilder();
while (channel.read(buffer) > 0) {
builder.append(printString(buffer));
}
String resp = RedisHandler.handleCmd(builder.toString()); //redis逻辑处理完成,然后进行返回给客户端
channel.register(key.selector(), SelectionKey.OP_WRITE, resp); //结果放到attachment
}
public static String printString(ByteBuffer byteBuffer) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit() - byteBuffer.position()];
byteBuffer.get(bytes);
String s = new String(bytes, StandardCharsets.UTF_8);
// System.out.println(s);
return s;
}
}
redis协议的部分
对于redis协议的处理,这里先都读取成文本,主要是为了可读。如果是请求,redis服务端收到的内容如下,比如一个get请求:
*2\r\n$3\r\nget\r\n$1\r\na\r\n
比较可读的话是
*2
$3
get
$1
a
表示 get a
。一般是用星号*
开头,然后一个数字表示后面是多少个标识,如get a
,有2个,开头就是*2
。然后每个元素用\r\n
的换行符分隔。再后面的$3
,即表示其后挨着的元素有多少个字符,如get
就是$3
,ping
就是$4
。基于如上的逻辑,写了一个简单的parse()
方法,解析到服务端收到的redis命令和参数,返回是一个命令的包装类,包括元素个数,命令,参数key,参数value。
对于返回这里用了三种格式。
错误返回
用 -
开头,比如unknown command
状态返回
+
开头,比如 +ok
”批量“返回
$
开头,这里没有理解所谓批量是什么意思,只是在查询没有值的时候,返回$null
,客户端展示为(nil)
。
package handler;
import java.util.HashMap;
public class RedisHandler {
//用一个HashMap表示存储,实现get set命令
private static final HashMap<String, String> store = new HashMap<>();
//处理不同的命令
public static String handleCmd(String cmd) {
Cmd c = parse(cmd);
String value = "ok";
switch (c.cmd) {
case "get":
value = store.get(c.key);
break;
case "set":
store.put(c.key,c.value);
break;
case "del":
store.remove(c.key);
break;
case "ping":
value = "pong";
break;
default:
value = "-" + "unknown cmd\r\n";
return value;
}
if (value != null) {
value = "+" + value + "\r\n";
}else {
value = "$null\r\n";
}
return value;
}
//解析协议
private static Cmd parse(String cmd) {
String[] segs = cmd.split("\r\n");
Cmd c = new Cmd();
int idx = 0;
c.setArgs(segs[idx++].charAt(1) - '0'); //表示元素数目
c.setCmd(segs[++idx]); //第二个为命令
idx++;
if (c.args > 1) {
c.setKey(segs[++idx]); //一般第三个为key, 要注意ping的情况,没有key
}
idx++;
if (c.args > 2) {
c.setValue(segs[++idx]); //set 的时候用到
}
return c;
}
static class Cmd {
int args; //元素数目
String key;
String cmd;
String value;
public Cmd() {
}
public int getArgs() {
return args;
}
public void setArgs(int args) {
this.args = args;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getCmd() {
return cmd;
}
public void setCmd(String cmd) {
this.cmd = cmd;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}
服务启动入口
服务启动入口比较简单,新建个服务端,然后启动即可。
import server.RedisServer;
public class Main {
public static void main(String[] args) {
System.out.println("Hello World!");
// Server.start();
// new HttpChannelServer().start();
new RedisServer().start();
}
}
样例展示
总结
以上就是本期的所有内容,基本实现了需要的功能,后续计划再在存储,协议方面多做了解,争取能详细分享一篇。
“欠的债总归是要还的”,能量和收益某些时刻会趋向平衡,还是要好好学习,稳步发展,一点感慨。