0
点赞
收藏
分享

微信扫一扫

基于grpc从零开始搭建一个准生产分布式应用(4) - 02- grpc详解基础实现

本章开始大概会有三篇文章来详细讲解下grpc原生和springGrpc框的使用。并不会所有方面全讲解,只讲一下实际项目中会用到的一些重要内容。PS:为了方便理解,笔者会重写一些DEMO例子(本文档中的例子与之前的代码无关),后续开始第5篇对象传输和mybastis时会再优化。

本章代码任务:以一个例子掌握原生GRPC的用法。

一、proto定义

syntax = "proto3";

package ecommerce;

service ProductInfo {
rpc addProduct(Product) returns (ProductID);
rpc getProduct(ProductID) returns (Product);
}

message Product {
string id = 1;
string name = 2;
string description = 3;
float price = 4;
}

message ProductID {
string value = 1;
}

二、java实现

2.1、server端实现

public class ProductInfoImpl extends ProductInfoGrpc.ProductInfoImplBase {

private Map productMap = new HashMap<String, ProductInfoOuterClass.Product>();

@Override
public void addProduct(ProductInfoOuterClass.Product request,
io.grpc.stub.StreamObserver<ProductInfoOuterClass.ProductID> responseObserver) {
UUID uuid = UUID.randomUUID();
String randomUUIDString = uuid.toString();
request = request.toBuilder().setId(randomUUIDString).build();
productMap.put(randomUUIDString, request);
ProductInfoOuterClass.ProductID id
= ProductInfoOuterClass.ProductID.newBuilder().setValue(randomUUIDString).build();
responseObserver.onNext(id);
responseObserver.onCompleted();
}

@Override
public void getProduct(ProductInfoOuterClass.ProductID request,
io.grpc.stub.StreamObserver<ProductInfoOuterClass.Product> responseObserver) {
String id = request.getValue();
if (productMap.containsKey(id)) {
responseObserver.onNext((ProductInfoOuterClass.Product) productMap.get(id));
responseObserver.onCompleted();
} else {
responseObserver.onError(new StatusException(Status.NOT_FOUND));
}
}
}

2.2、client端实现

public class ProductInfoClient {

private static final Logger logger = Logger.getLogger(ProductInfoClient.class.getName());

public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();

ProductInfoGrpc.ProductInfoBlockingStub stub = ProductInfoGrpc.newBlockingStub(channel);

ProductInfoOuterClass.ProductID productID = stub.addProduct(
ProductInfoOuterClass.Product.newBuilder()
.setName("Samsung S10")
.setDescription("Samsung Galaxy S10 is the latest smart phone, " +
"launched in February 2019")
.setPrice(700.0f)
.build());
logger.info("Product ID: " + productID.getValue() + " added successfully.");

ProductInfoOuterClass.Product product = stub.getProduct(productID);
logger.info("Product: " + product.toString());
channel.shutdown();
}
}

2.3、测试程序

public class ProductInfoServer {

private static final Logger logger = Logger.getLogger(ProductInfoServer.class.getName());

private Server server;

private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new ProductInfoImpl())//注册服务
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
logger.info("*** shutting down gRPC server since JVM is shutting down");
ProductInfoServer.this.stop();
logger.info("*** server shut down");
}));
}

private void stop() {
if (server != null) {
server.shutdown();
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
final ProductInfoServer server = new ProductInfoServer();
server.start();
server.blockUntilShutdown();
}

}

三、python实现

3.1、server端实现

from concurrent import futures
import logging
import uuid
import grpc
import time

import product_info_pb2
import product_info_pb2_grpc

class ProductInfoServicer(product_info_pb2_grpc.ProductInfoServicer):

def __init__(self):
self.productMap = {}

def addProduct(self, request, context):
id = uuid.uuid1()
request.id = str(id)
print("addProduct:request", request)
self.productMap[str(id)] = request
response = product_info_pb2.ProductID(value = str(id))

print("addProduct:response", response)
return response

def getProduct(self, request, context):
print("getProduct:request", request)
id = request.value
response = self.productMap[str(id)]
print("getProduct:response", response)
return response

# create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

# use the generated function `add_CalculatorServicer_to_server`
# to add the defined class to the server
product_info_pb2_grpc.add_ProductInfoServicer_to_server(
ProductInfoServicer(), server)

# listen on port 50051
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()

# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)

3.2、client端实现

import grpc
import product_info_pb2
import product_info_pb2_grpc
import time;

def run():
# open a gRPC channel
channel = grpc.insecure_channel('localhost:50051')
# create a stub (client)
stub = product_info_pb2_grpc.ProductInfoStub(channel)

response = stub.addProduct(product_info_pb2.Product(name = "Apple iPhone 11", description = "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode.", price = 699.0 ))
print("add product: response", response)
productInfo = stub.getProduct(product_info_pb2.ProductID(value = response.value))
print("get product: response", productInfo)

run()
举报

相关推荐

0 条评论