导航
Golang RPC实现
- day01 我们实现了简单的服务端和客户端。
- 我们简单总结一下day01的模式。
- 服务端按顺序处理客户端过来的请求,按顺序响应客户端的请求。
- 客户端以同步的方式发送请求,不能并发发出请求。
- 那么我们day02干的事情就是,让客户端异步并发的发出请求(请求顺序变得随机),服务端依然是按请求顺序进行处理,处理完某一个请求就返回,可以不按请求的顺序响应数据,但是响应数据是要上锁的,否则会发生响应数据并发安全问题。
- 主要逻辑是修改了客户端的代码,服务端和day01没有变化
一、客户端异步并发多个请求
1、 客户端结构体
type Client struct {
cc codec.Codec//编码方式
opt *Option//发出请求的第一个包,用来协商后续包的格式和编码方式
sending sync.Mutex // 当一个请求正在发送时,不可以转头去执行别的请求
header codec.Header // 请求头内容
mu sync.Mutex // protect following
seq uint64 //记录该客户端一次请求连接的序号,
pending map[uint64]*Call//通过seq快速找到客户端的某个请求
closing bool // user has called Close
shutdown bool // server has told us to stop
}
2、 一个客户端,异步发送多个请求,使用call
结构体代表客户端的每次请求
type Call struct {
Seq uint64 //当前请求的序号,唯一标识一个请求
ServiceMethod string // format "<service>.<method>" 此次请求的服务和方法
Args interface{} // arguments to the function 请求函数的参数
Reply interface{} // reply from the function 服务端函数的响应数据
Error error // if error occurs, it will be set //发生错误时的信息
Done chan *Call // Strobes when call is complete.完成一次请求通过chan来通知
}
3、客户端并发多个请求
- 主函数逻辑
func main() {
log.SetFlags(0)
addr := make(chan string)
go startServer(addr)
client, _ := geerpc.Dial("tcp", <-addr)
defer func() { _ = client.Close() }()
time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {//go 实现异步非阻塞发送多个请求
defer wg.Done()
args := fmt.Sprintf("geerpc req %d", i)//一次请求携带的数据
var reply string
if err := client.Call("Foo.Sum", args, &reply); err != nil {//call发出一次请求,&reply,传的是引用,如果有响应,就能接收到
log.Fatal("call Foo.Sum error:", err)
}
log.Println("reply:", reply)
}(i)
}
wg.Wait()
}
- Call 准备发出一次请求
// Call invokes the named function, waits for it to complete,
// and returns its error status.
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done//阻塞等待此次请求的channel,直到服务端处理并响应才返回
return call.Error
}
- 绑定数据到请求中
// Go invokes the function asynchronously.
// It returns the Call structure representing the invocation.
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10)
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
call := &Call{
ServiceMethod: serviceMethod,//此次请求的服务和方法
Args: args,//此次请求的参数
Reply: reply,//此处是引用类型,暂时还没有数据,等服务端响应就有数据了
Done: done,//绑定此次请求的响应channel,服务端响应后就往对应的channel发一条数据
}
client.send(call)
return call
}
- 发送请求到服务端
func (client *Client) send(call *Call) {
// make sure that the client will send a complete request
client.sending.Lock()
defer client.sending.Unlock()
// register this call.
seq, err := client.registerCall(call)//注册这次call,把这次的请求ID注册到客户端中。。。
if err != nil {
call.Error = err
call.done()
return
}
// prepare request header
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""
// encode and send the request
if err := client.cc.Write(&client.header, call.Args); err != nil {//发送请求头和请求参数
call := client.removeCall(seq)
// call may be nil, it usually means that Write partially failed,
// client has received the response and handled
if call != nil {
call.Error = err
call.done()
}
}
}
4、客户端接收请求
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil { 接收请求是一个个来,当连接关闭时,此处会报错,退出整个客户端
break
}
call := client.removeCall(h.Seq)//通过Seq唯一标识符删除一个请求
switch {
case call == nil:
// it usually means that Write partially failed
// and call was already removed.
err = client.cc.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()//向通道发送一条消息,客户端等待的这个call可以推出了
}
}
// error occurs, so terminateCalls pending calls
client.terminateCalls(err)//关闭所有请求
}