0
点赞
收藏
分享

微信扫一扫

grpc之 普通流 、服务端流、 客户端流 、双向流模式

流模式入门(上)、场景:批量查询用户积分

为何要用流模式

前面的例子,我们仅仅是传输比较小的数据 基本模式是客户端请求----服务端响应

如果是传输较大数据呢?会带来

1、数据包过大导致压力陡增

2、需要等待客户端包全部发送,才能处理以及响应


1,普通查询积分方式

服务端:

grpc之 普通流 、服务端流、 客户端流 、双向流模式_grpcgrpc之 普通流 、服务端流、 客户端流 、双向流模式_客户端_02

syntax="proto3";
package services;

import "google/protobuf/timestamp.proto";

message ProdModel{ //商品模型
int32 prod_id=1;
string prod_name=2;
float prod_price=3;
}


message OrderMain{ //主订单模型
int32 order_id=1;//订单ID,数字自增
string order_no=2; //订单号
int32 user_id=3; //购买者ID
float order_money=4;//商品金额
google.protobuf.Timestamp order_time=5; //下单时间
repeated OrderDetail order_details=6;
}

//子订单模型
message OrderDetail{
int32 detail_id=1;
string order_no=2;
int32 prod_id=3;
float prod_price=4;
int32 prod_num=5;
}

//用户模型
message UserInfo{
int32 user_id=1;
int32 user_score=2;
}

Models.proto

grpc之 普通流 、服务端流、 客户端流 、双向流模式_grpcgrpc之 普通流 、服务端流、 客户端流 、双向流模式_客户端_02

syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
}

Users.proto


执行脚本 生成pd.go文件

cd pbfiles && protoc --go_out=plugins=grpc:../services  Prod.proto
protoc --go_out=plugins=grpc:../services Orders.proto
protoc --go_out=plugins=grpc:../services Users.proto

protoc --go_out=plugins=grpc:../services --validate_out=lang=go:../services Models.proto

protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto
protoc --grpc-gateway_out=logtostderr=true:../services Orders.proto
protoc --grpc-gateway_out=logtostderr=true:../services Users.proto

cd ..


grpc之 普通流 、服务端流、 客户端流 、双向流模式_grpcgrpc之 普通流 、服务端流、 客户端流 、双向流模式_客户端_02

package services

import "context"

type UserService struct {
}
func(*UserService) GetUserScore(ctx context.Context, in *UserScoreRequest) (*UserScoreResponse, error){
var score int32=101
users:=make([]*UserInfo,0)
for _,user:=range in.Users{
user.UserScore=score
score++
users=append(users,user)
}
return &UserScoreResponse{Users:users},nil

}

UserService.go


grpc之 普通流 、服务端流、 客户端流 、双向流模式_grpcgrpc之 普通流 、服务端流、 客户端流 、双向流模式_客户端_02

package main

import (
"google.golang.org/grpc"
"grpcpro/services"
"net"
)

func main() {
rpcServer:=grpc.NewServer()
services.RegisterProdServiceServer(rpcServer,new(services.ProdService))//商品服务
services.RegisterOrderSerivceServer(rpcServer,new(services.OrdersService))//订单服务
services.RegisterUserServiceServer(rpcServer,new(services.UserService))

lis,_:=net.Listen("tcp",":8081")

rpcServer.Serve(lis)


}

server.go


go build server.go


客户端:

拷贝服务端生成的pd.go文件到客户端

grpc之 普通流 、服务端流、 客户端流 、双向流模式_客户端_09



func main(){
conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
if err!=nil{
log.Fatal(err)
}
defer conn.Close()
ctx:=context.Background()
userClient:=services.NewUserServiceClient(conn)
var i int32
req:=services.UserScoreRequest{}
req.Users=make([]*services.UserInfo,0)

for i=1;i<20;i++{
req.Users=append(req.Users,&services.UserInfo{UserId:i})
}
res,_ := userClient.GetUserScore(ctx,&req)
fmt.Println(res.Users)

}


go build maiin.go


打印结果:

[user_id:1 user_score:101  user_id:2 user_score:102  user_id:3 user_score:103  user_id:4 user_score:104  user_id:5 user_score:105  user_id:6 user_score:106  user_id:7 user_score:107  user_id:8 user_score:108  user_id:9 user_score:109  user_id:10 user_score:110  user_id:11 user_score:111  user_id:12 user_score:112  user_id:13 user_score:113  user_id:14 user_score:114  user_id:15 user_score:115  user_id:16 user_score:116  user_id:17 user_score:117  user_id:18 user_score:118  user_id:19 user_score:119 ]

Process finished with exit code 0


2,服务端流

 假设 客户端一次性发送6个客户数据给服务端

再假设 服务端查询用户积分 有点慢。因此 采用的策略是 服务端每查询2个就发送给客户端


服务端:

修改users.proto

syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
}


处理方法:

func(*UserService)     GetUserScoreByServerStream(in *UserScoreRequest,stream UserService_GetUserScoreByServerStreamServer) error {
var score int32=101
users:=make([]*UserInfo,0)
for index,user:=range in.Users{
user.UserScore=score
score++
users=append(users,user)

if (index+1) % 2==0 && index>0{
err:=stream.Send(&UserScoreResponse{Users:users})
if err!=nil{
return err
}
users=(users)[0:0]
}
time.Sleep(time.Second*1)
}
if len(users)>0{
err:=stream.Send(&UserScoreResponse{Users:users})
if err!=nil{
return err
}
}
return nil
}


客户端调用:

stream,_:=userClient.GetUserScoreByServerStream(ctx,&req)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}

fmt.Println(resp.Users)
}


打印出:

[user_id:1 user_score:101  user_id:2 user_score:102 ]
[user_id:3 user_score:103 user_id:4 user_score:104 ]
[user_id:5 user_score:105 ]



3,客户端流:

客户端流模式、场景:分批发送请求

grpc之 普通流 、服务端流、 客户端流 、双向流模式_grpc_10



场景:
客户端批量查询用户积分
1、客户端一次性把用户列表发送过去(不是很多,获取列表很快)
2、服务端查询积分比较耗时
。 因此查到一部分 就返回一部分。

而不是 全部查完再返回给客户端

服务端:

修改users.proto

syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
rpc GetUserScoreByClientStream(stream UserScoreRequest) returns (UserScoreResponse);
}

新增service处理方法

func(*UserService)  GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error{
var score int32=101
users:=make([]*UserInfo,0)
for{
req,err:=stream.Recv()
if err==io.EOF{ //接收完了
return stream.SendAndClose(&UserScoreResponse{Users:users})
}
if err!=nil{
return err
}
for _,user:=range req.Users{
user.UserScore=score //这里好比是服务端做的业务处理
score++
users=append(users,user)
}
}
}


客户端:

//客户端流
func main(){
conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
if err!=nil{
log.Fatal(err)
}
defer conn.Close()

ctx:=context.Background()
userClient:=services.NewUserServiceClient(conn)
var i int32
if err!=nil{
log.Fatal(err)
}
stream,err:=userClient.GetUserScoreByClientStream(ctx)
if err!=nil{
log.Fatal(err)
}

for j:=1;j<=3;j++{
req:=services.UserScoreRequest{}
req.Users=make([]*services.UserInfo,0)
for i=1;i<=5;i++{ //加了5条用户信息 假设是一个耗时的过程
req.Users=append(req.Users,&services.UserInfo{UserId:i})
}
err:=stream.Send(&req)
if err!=nil{
log.Println(err)
}
}
res,_:=stream.CloseAndRecv()
fmt.Println(res.Users)
}

go build server.go

go build main.go

[user_id:1 user_score:101  user_id:2 user_score:102  user_id:3 user_score:103  user_id:4 user_score:104  user_id:5 user_score:105  user_id:1 user_score:106  user_id:2 user_score:107  user_id:3 user_score:108  user_id:4 user_score:109  user_id:5 user_score:110  user_id:1 user_score:111  user_id:2 user_score:112  user_id:3 user_score:113  user_id:4 user_score:114  user_id:5 user_score:115 ]

Process finished with exit code 0

客户端分批发送,服务端一次返回结果


双向流模式

grpc之 普通流 、服务端流、 客户端流 、双向流模式_服务端_11


场景:
客户端批量查询用户积分
1、客户端分批把用户列表发送过去(客户端获取列表比较慢)
2、服务端查询积分也很慢,所以分批发送过去

此时我们可以使用 双向流模式

服务端:

修改users.proto

rpc GetUserScoreByTWS(stream UserScoreRequest) returns (stream UserScoreResponse);

grpc之 普通流 、服务端流、 客户端流 、双向流模式_grpcgrpc之 普通流 、服务端流、 客户端流 、双向流模式_客户端_02

syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
rpc GetUserScoreByClientStream(stream UserScoreRequest) returns (UserScoreResponse);
rpc GetUserScoreByTWS(stream UserScoreRequest) returns (stream UserScoreResponse);
}

View Code


然后生成

cd pbfiles && protoc --go_out=plugins=grpc:../services  Prod.proto
protoc --go_out=plugins=grpc:../services Orders.proto
protoc --go_out=plugins=grpc:../services Users.proto

protoc --go_out=plugins=grpc:../services Models.proto

protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto
protoc --grpc-gateway_out=logtostderr=true:../services Orders.proto
protoc --grpc-gateway_out=logtostderr=true:../services Users.proto

cd ..


处理 UserService.go

//双向流
func(*UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error {
var score int32=101
users:=make([]*UserInfo,0)
for{
req,err:=stream.Recv()
if err==io.EOF{ //接收完了
return nil
}
if err!=nil{
return err
}
for _,user:=range req.Users{
user.UserScore=score //这里好比是服务端做的业务处理
score++
users=append(users,user)
}
err=stream.Send(&UserScoreResponse{Users:users})
if err!=nil{
log.Println(err)
}
users=(users)[0:0]
}
}

客户端:

//双向流
func main(){
conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
if err!=nil{
log.Fatal(err)
}
defer conn.Close()

ctx:=context.Background()
userClient:=services.NewUserServiceClient(conn)
var i int32
if err!=nil{
log.Fatal(err)
}
stream,err:=userClient.GetUserScoreByTWS(ctx)
if err!=nil{
log.Fatal(err)
}

var uid int32=1
for j:=1;j<=3;j++{
req:=services.UserScoreRequest{}
req.Users=make([]*services.UserInfo,0)
for i=1;i<=5;i++{ //加5条用户信息 假设是一个耗时的过程
req.Users=append(req.Users,&services.UserInfo{UserId:uid})
uid++
}
err:=stream.Send(&req)
if err!=nil{
log.Println(err)
}
res,err:=stream.Recv()
if err==io.EOF{
break
}
if err!=nil{
log.Println(err)
}
fmt.Println(res.Users)

}
}


返回结果:

[user_id:1 user_score:101  user_id:2 user_score:102  user_id:3 user_score:103  user_id:4 user_score:104  user_id:5 user_score:105 ]
[user_id:6 user_score:106 user_id:7 user_score:107 user_id:8 user_score:108 user_id:9 user_score:109 user_id:10 user_score:110 ]
[user_id:11 user_score:111 user_id:12 user_score:112 user_id:13 user_score:113 user_id:14 user_score:114 user_id:15 user_score:115 ]

Process finished with exit code 0

 可以看出,当我们生产环境中 客户端获取数据耗时并且服务端处理数据耗时,此时运用双向流模式大大节省任务时间

 源码地址:

​​https://github.com/sunlongv520/grpc-learn​​

​​https://github.com/sunlongv520/grpc-doc​​


举报

相关推荐

0 条评论