0
点赞
收藏
分享

微信扫一扫

gRPC开发: gRPC的四种通信模式

认真的老去 2022-02-26 阅读 64

引言


一元RPC模式


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2FYsWpgr-1645887063623)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/7dc1fb85-2adc-4b01-8f5b-4db0ba69ce49/Untitled.png)]

服务定义:

//ecommerce/order_management.pb.go

syntax="proto3";
//导入这个包,使用常用的数据类型,如StringValue
import "google/protobuf/wrappers.proto";
package ecommerce;

option go_package = "../ecommerce";
service OrderManagement{
  rpc getOrder(google.protobuf.StringValue) returns (Order);//检索订单的远程方法
}

message Order{
  string id = 1;
  repeated string items = 2;//repeated表示该字段可以在消息中重复出现任意次
  string description = 3;
  float price = 4;
  string destination = 5;
}

服务端实现

//main.go
package main

import (
	"context"
	wrapper "github.com/golang/protobuf/ptypes/wrappers"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"log"
	"net"
	pb "orderServiceUnaryGRPC/service/ecommerce"
)

// 服务定义
type server struct {
	orderMap map[string]*pb.Order
}

const (
	port = ":50051"
)

var orderMap = make(map[string]pb.Order)

// 初始化订单信息
func initSampleData() {
	orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
	orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
	orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
	orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
	orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 300.00}
}

// GetOrder 通过订单编号获取订单消息
func (s *server) GetOrder(ctx context.Context, orderId *wrapper.StringValue) (*pb.Order, error) {
	//服务实现
	ord, exists := orderMap[orderId.Value]
	if exists {
		return &ord, status.New(codes.OK, "").Err()
	}
	return nil, status.Errorf(codes.NotFound, "Order does not exist.:", orderId)
}

func main() {
	initSampleData()
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen:%v", err)
	}
	log.Printf("Starting gRPC listener on port:%s", port)
	s := grpc.NewServer()
	pb.RegisterOrderManagementServer(s, &server{})
	// 在指定端口上开始监听传入的消息
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve:%v", err)
	}
}

客户端实现

//main.go
package main

import (
	"context"
	wrapper "github.com/golang/protobuf/ptypes/wrappers"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"log"
	pb "orderServiceUnaryGRPC/client/ecommerce"
	"time"
)

const (
	address = "localhost:50051"
)

func main() {
	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Did not connect:%v", err)
	}
	defer conn.Close()
	client := pb.NewOrderManagementClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	// 获取订单
	ord, err := client.GetOrder(ctx, &wrapper.StringValue{Value: "105"})
	log.Printf("GetOrder Response ->:%v", ord)
}

服务器端流RPC模式


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CM1DEmYb-1645887063625)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/3fe97dcc-be8c-49f2-9027-804d83d1f23e/Untitled.png)]

服务定义

//ecommerce/order_management.proto

syntax = "proto3";
import "google/protobuf/wrappers.proto";

package ecommerce;
option go_package = "../ecommerce";

service OrderManagement{
  // 通过返回 Order 消息的 stream 定义服务器端流
  rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
}

message Order{
  string id = 1;
  repeated string items = 2;
  string description = 3;
  float price = 4;
  string destination = 5;
}

服务端实现

//service/main.go

package main

import (
	"fmt"
	wrapper "github.com/golang/protobuf/ptypes/wrappers"
	"google.golang.org/grpc"
	"log"
	"net"
	pb "orderServeStream/service/ecommerce"
	"strings"
)

//服务定义
type serve struct {
	orderMap map[string]*pb.Order
}

const (
	port = ":50051"
)

var orderMap = make(map[string]pb.Order)

// 初始化订单信息
func initSampleData() {
	orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
	orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
	orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
	orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
	orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 300.00}
}

func (s *serve) SearchOrders(searchQuery *wrapper.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
	for key, order := range orderMap {
		log.Print(key, order)
		for _, itemStr := range order.Items {
			log.Print(itemStr)
			if strings.Contains(itemStr, searchQuery.Value) {
				//在流中发出匹配的订单
				err := stream.Send(&order)
				if err != nil {
					return fmt.Errorf("error sending message to stream:%v", err)
				}
				log.Printf("Matching Order Found : %v", key)
				break
			}
		}
	}
	return nil
}

func main() {
	initSampleData()
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen:%v", err)
	}
	s := grpc.NewServer()
	pb.RegisterOrderManagementServer(s, &serve{})
	log.Printf("Starting gRPC listener on port:%v", port)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve:%v", err)
	}
}

客户端实现

//client/main.go

package main

import (
	"context"
	"github.com/golang/protobuf/ptypes/wrappers"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"log"
	pb "orderServeStream/client/ecommerce"
	"time"
)

const (
	address = "localhost:50051"
)

func main() {
	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Did not connect:%v", err)
	}
	defer conn.Close()
	client := pb.NewOrderManagementClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	//SearchOrders 方法返回 OrderManagement_SearchOrdersClient 的客户端流,它有一个名为 Recv 的方法。
	ordStream, err := client.SearchOrders(ctx, &wrappers.StringValue{Value: "Google"})

	for {
		// 调用客户端流的 Recv 方法,逐个检索 Order 响应。
		searchOrder, err := ordStream.Recv()
		//当发现流结束的时候,Recv 会返回 io.EOF
		if err != nil {
			break
		}
		log.Print("Search Result:", searchOrder)
	}

}

客户端流RPC模式


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N3M5YxiG-1645887063626)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/8d669bad-9d6e-431e-b7d7-c5d0e9d359e6/Untitled.png)]

服务定义

//ecommerce/order_management.pb.go

syntax = "proto3";
import "google/protobuf/wrappers.proto";

package ecommerce;
option go_package = "../ecommerce";

service OrderManagement{
  //只需使用 stream Order 作为 updateOrders 方法的参数,就能表明 updateOrders 会接收来自客户 端的多条消息作为输入。
  // 因为服务器端只发送一个响应,所以返回值是 单一的字符串消息。
  rpc updateOrders(stream Order) returns(google.protobuf.StringValue);
}

message Order{
  string id = 1;
  repeated string items = 2;
  string description = 3;
  float price = 4;
  string destination = 5;
}

服务端实现

// service/main.go

package main

import (
	"github.com/golang/protobuf/ptypes/wrappers"
	"google.golang.org/grpc"
	"io"
	"log"
	"net"
	pb "orderClientStream/service/ecommerce"
)

const (
	port = ":50051"
)

type serve struct {
	orderMap map[string]*pb.Order
}

var orderMap = make(map[string]pb.Order)

// 初始化订单信息
func initSampleData() {
	orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
	orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
	orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
	orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
	orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 300.00}
}

func (s *serve) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
	ordersStr := "Updated Order IDs :"
	for {
		order, err := stream.Recv()
		if err == io.EOF {
			// 结束从流中读取数据
			return stream.SendAndClose(&wrappers.StringValue{Value: "Orders processed " + ordersStr})
		}
		if err != nil {
			return err
		}
		// 更新订单数据
		orderMap[order.Id] = *order
		log.Printf("Order ID : %s - %s", order.Id, "Updated")
		ordersStr += order.Id + ","
	}
}

func main() {
	initSampleData()
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen:%v", err)
	}
	s := grpc.NewServer()
	pb.RegisterOrderManagementServer(s, &serve{})
	log.Printf("Starting gRPC listener on port:%v", port)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve:%v", err)
	}
}

客户端实现

//client/main.go

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"log"
	pb "orderClientStream/client/ecommerce"
	"time"
)

const (
	address = "localhost:50051"
)

func main() {
	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Did not connect:%v", err)
	}
	defer conn.Close()
	client := pb.NewOrderManagementClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	// 要更新的数据
	updOrder1 := pb.Order{Id: "102", Items: []string{"Google Pixel 3A11", "Google Pixel Book11"}, Destination: "Mountain View, CA", Price: 1100.00}
	updOrder2 := pb.Order{Id: "103", Items: []string{"Apple Watch S411", "Mac Book Pro", "iPad Pro11"}, Destination: "San Jose, CA", Price: 2800.00}
	updOrder3 := pb.Order{Id: "104", Items: []string{"Google Home Mini11", "Google Nest Hub", "iPad Mini11"}, Destination: "Mountain View, CA", Price: 2200.00}

	// 调用 UpdateOrders 远程方法。
	updateStream, err := client.UpdateOrders(ctx)
	if err != nil {
		log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)
	}
	// 通过客户端流发送订单更新的请求。
	if err := updateStream.Send(&updOrder1); err != nil {
		// 处理在发送消息到流时发生的错误。
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
	}
	if err := updateStream.Send(&updOrder2); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
	}
	if err := updateStream.Send(&updOrder3); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
	}
	//关闭流并接收响应。
	updateResp, err := updateStream.CloseAndRecv()
	log.Printf("Updates Orders Res: %s", updateResp)

}

双向流RPC模式


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gw455aZp-1645887063628)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/6d38c56d-73a5-4401-8502-f2e7fba696c1/Untitled.png)]

服务定义

//ecommerce/order_management.proto
syntax = "proto3";

import "google/protobuf/wrappers.proto";
option go_package = "../ecommerce";

service OrderManagement{
  rpc processOrders(stream google.protobuf.StringValue) returns(stream CombinedShipment);
}

message Order {
  string id = 1;
  repeated string items = 2;
  string description = 3;
  float price = 4;
  string destination = 5;
}

//定义发货组合消息体
message CombinedShipment {
  string id = 1;
  string status = 2;
  repeated Order ordersList = 3;
}

服务端实现

package main

import (
	"google.golang.org/grpc"
	"io"
	"log"
	"net"
	pb "service/ecommerce"
)

type server struct {
	orderMap map[string]*pb.Order
}

const (
	port           = ":50051"
	orderBatchSize = 3 //设定每个发货组合最多包含的订单数
)

var orderMap = make(map[string]pb.Order)

// 初始化订单信息
func initSampleData() {
	orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
	orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
	orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
	orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
	orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 300.00}
}

func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
	batchMark := 1
	var combinedShipmentMap = make(map[string]pb.CombinedShipment)
	for {
		orderId, err := stream.Recv() //接收客户端传过来的订单ID
		log.Printf("Reading Proc order:%v", orderId)
		if err == io.EOF {
			//如果客户端发送完所有消息,则开始开始对订单进行分组
			log.Printf("EOF:%v", orderId)
			for _, shipment := range combinedShipmentMap {
				if err := stream.Send(&shipment); err != nil {
					return err
				}
			}
			return nil
		}
		if err != nil {
			log.Println(err)
			return err
		}
		destination := orderMap[orderId.GetValue()].Destination
		shipment, found := combinedShipmentMap[destination]

		//查看是否已有相同地址的订单,如果有则加入已有发货组,没有则新建
		if found {
			ord := orderMap[orderId.GetValue()]
			shipment.OrdersList = append(shipment.OrdersList, &ord)
			combinedShipmentMap[destination] = shipment
		} else {
			comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!"}
			ord := orderMap[orderId.GetValue()]
			// 新建订单组
			comShip.OrdersList = append(comShip.OrdersList, &ord)
			combinedShipmentMap[destination] = comShip
			log.Print(len(comShip.OrdersList), comShip.GetId())
		}
		if batchMark == orderBatchSize {
			for _, comb := range combinedShipmentMap {
				log.Printf("Shipping:%v", len(comb.OrdersList))
				if err := stream.Send(&comb); err != nil {
					return nil
				}
			}
			batchMark = 1
			combinedShipmentMap = make(map[string]pb.CombinedShipment)
		} else {
			batchMark++
		}

	}
}

func main() {
	initSampleData()
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen:%v", err)
	}
	s := grpc.NewServer()
	pb.RegisterOrderManagementServer(s, &server{})
	log.Printf("Starting gRPC listener on port:%v", port)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve:%v", err)
	}
}

客户端实现

package main

import (
	pb "client/ecommerce"
	"context"
	"github.com/golang/protobuf/ptypes/wrappers"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"io"
	"log"
	"time"
)

const (
	address = "localhost:50051"
)

func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
	for {
		//在客户端读取服务的消息。
		combinedShipment, errProcOrder := streamProcOrder.Recv()
		//该条件探测流是否已经结束。
		if errProcOrder == io.EOF {
			break
		}
		log.Printf("Combined shipment : %v", combinedShipment.OrdersList)
	}
	<-c
}

func main() {
	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Did not connect:%v", err)
	}
	defer conn.Close()
	client := pb.NewOrderManagementClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	// 调用远程方法并获取流引用,以便在客户端写入和读取
	streamProcOrder, _ := client.ProcessOrders(ctx)
	if err := streamProcOrder.Send(&wrappers.StringValue{Value: "102"}); err != nil {
		log.Fatalf("%v.Send(%v)=%v", client, "102", err)
	}
	if err := streamProcOrder.Send(&wrappers.StringValue{Value: "103"}); err != nil {
		log.Fatalf("%v.Send(%v)=%v", client, "103", err)
	}
	if err := streamProcOrder.Send(&wrappers.StringValue{Value: "104"}); err != nil {
		log.Fatalf("%v.Send(%v)=%v", client, "104", err)
	}
	//创建Goroutines所使用的通道
	channel := make(chan struct{})
	//使用 Goroutines 调用函数,以便并行读取来自服务的消息。
	go asncClientBidirectionalRPC(streamProcOrder, channel)
	// 模拟向服务发送消息的延迟。
	time.Sleep(time.Millisecond * 1000)
	if err := streamProcOrder.Send(&wrappers.StringValue{Value: "101"}); err != nil {
		log.Fatalf("%v.Send(%v)=%v", client, "101", err)
	}
	//为客户端流标记流的结束(订单 ID)
	if err := streamProcOrder.CloseSend(); err != nil {
		log.Fatal(err)
	}

	<-channel
}
举报

相关推荐

0 条评论