带注册中心的RPC框架可参看
根据资料学习,写了一遍。
实现自己的RPC框架如果不需要自定义协议的话那就要基于Socket+序列化。
ProcessorHandler:
主要是用来处理客户端的请求。
package dgb.nospring.myrpc;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* 任务处理类
*
* @author Dongguabai
* @date 2018/11/1 16:10
*/
public class ProcessorHandler implements Runnable {
private Socket socket;
/**
* 服务端发布的服务
*/
private Object service;
public ProcessorHandler(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}
//处理请求
@Override
public void run() {
ObjectInputStream objectInputStream = null;
try {
objectInputStream = new ObjectInputStream(socket.getInputStream());
//反序列化
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest);
//将结果返回给客户端
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
objectInputStream.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 反射调用
*
* @param rpcRequest
*/
private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
System.out.println("服务端开始调用------");
Object[] parameters = rpcRequest.getParameters();
Class[] parameterTypes = new Class[parameters.length];
for (int i = 0, length = parameters.length; i < length; i++) {
parameterTypes[i] = parameters[i].getClass();
}
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), parameterTypes);
return method.invoke(service, parameters);
}
}
RemoteInvocationHandler:
动态代理InvocationHandler。
package dgb.nospring.myrpc;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* @author Dongguabai
* @date 2018/11/1 16:20
*/
public class RemoteInvocationHandler implements InvocationHandler{
private String host;
private int port;
/**
*发起客户端和服务端的远程调用。调用客户端的信息进行传输
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
TcpTransport tcpTransport = new TcpTransport(host,port);
return tcpTransport.send(rpcRequest);
}
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
}
RpcClientProxy:
客户端获取代理对象。
package dgb.nospring.myrpc;
import java.lang.reflect.Proxy;
/**
* 客户端代理
* @author Dongguabai
* @date 2018/11/1 16:18
*/
public class RpcClientProxy {
public <T> T clientProxy(final Class<T> interfaceClass,final String host,final int port){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(host, port));
}
}
RpcRequest:
封装的一个传输对象。
package dgb.nospring.myrpc;
import java.io.Serializable;
/**
* 统一传输对象(让服务端知道当前要做什么)
*
* @author Dongguabai
* @date 2018/11/1 16:16
*/
public class RpcRequest implements Serializable {
private String className;
private String methodName;
private Object[] parameters;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}
RpcServer:
服务端发布服务。
package dgb.nospring.myrpc;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Dongguabai
* @date 2018/11/1 15:53
*/
public class RpcServer {
//不建议通过Executors创建线程池,这里为了方便
private static final ExecutorService executor = Executors.newCachedThreadPool();
public void publisher(final Object service, int port) {
//启动一个服务监听
try (ServerSocket serverSocket = new ServerSocket(port)) {
while (true){
//通过ServerSocket获取请求
Socket socket = serverSocket.accept();
executor.execute(new ProcessorHandler(socket,service));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
TcpTransport:
处理Socket传输。
package dgb.nospring.myrpc;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* socket传输
*
* @author Dongguabai
* @date 2018/11/1 16:25
*/
public class TcpTransport {
private String host;
private int port;
public TcpTransport(String host, int port) {
this.host = host;
this.port = port;
}
private Socket newSocket() {
System.out.println("准备创建Socket连接,host:" + host + ",port:" + port);
try {
Socket socket = new Socket(host, port);
return socket;
} catch (IOException e) {
throw new RuntimeException("Socket连接创建失败!host:" + host + ",port:" + port);
}
}
public Object send(RpcRequest rpcRequest) {
Socket socket = null;
try {
socket = newSocket();
try {
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rpcRequest);
outputStream.flush();
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
Object result = inputStream.readObject();
inputStream.close();
outputStream.close();
return result;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("发起远程调用异常!",e);
}
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
测试Demo
接口:
package dgb.nospring.myrpc.demo;
/**
* @author Dongguabai
* @date 2018/11/1 15:50
*/
public interface IHelloService {
String sayHello(String name);
}
实现类:
package dgb.nospring.myrpc.demo;
/**
* @author Dongguabai
* @date 2018/11/1 15:51
*/
public class HelloServiceImpl implements IHelloService {
@Override
public String sayHello(String name) {
return "你好," + name;
}
}
客户端:
package dgb.nospring.myrpc.demo;
import dgb.nospring.myrpc.RpcClientProxy;
/**
* @author Dongguabai
* @date 2018/11/1 18:10
*/
public class ClientDemo {
public static void main(String[] args) {
RpcClientProxy proxy = new RpcClientProxy();
IHelloService helloService = proxy.clientProxy(IHelloService.class, "127.0.0.1", 12345);
String name = helloService.sayHello("张三");
System.out.println(name);
}
}
服务端:
package dgb.nospring.myrpc.demo;
import dgb.nospring.myrpc.RpcServer;
/**
* @author Dongguabai
* @date 2018/11/1 18:07
*/
public class ServerDemo {
public static void main(String[] args) {
RpcServer rpcServer = new RpcServer();
rpcServer.publisher(new HelloServiceImpl(),12345);
}
}
目前大部分远程调用框架都是基于netty去实现的,毕竟Socket的性能实在不行。