0
点赞
收藏
分享

微信扫一扫

【Grpc(二)】两种stub, 四种模式(unary,客户端stream,服务端strea)示例


protobuff定义:

syntax = "proto3";

package com.liyao;

option java_package = "com.liyao.protobuf.test.service";
option java_outer_classname = "MyServiceProto";
option java_multiple_files = true;

message MyRequest {
repeated uint32 keys = 1;
}

message MyResponse {
string value = 1;
}

service MyService {
rpc GetByKey (MyRequest) returns (MyResponse);
rpc GetByKeyServerStream (MyRequest) returns (stream MyResponse);
rpc GetByKeyClientStream (stream MyRequest) returns (MyResponse);
rpc GetByKeyBiStream (stream MyRequest) returns (stream MyResponse);
}

服务比较简单,请求包含一个int的list,返回对应的key。

服务端实现类:

public class MyRpcServiceImpl extends MyServiceGrpc.MyServiceImplBase {

private final Map<Integer, String> map = ImmutableMap.<Integer, String>builder()
.put(1, "v1")
.put(2, "v2")
.put(3, "v3")
.put(4, "v4")
.put(5, "v5")
.build();

@Override
public void getByKey(MyRequest request, StreamObserver<MyResponse> responseObserver) {
int key = request.getKeys(0);
String value = map.getOrDefault(key, "null");

responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
responseObserver.onCompleted();
}

@Override
public void getByKeyServerStream(MyRequest request, StreamObserver<MyResponse> responseObserver) {
for (int key : request.getKeysList()) {
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
}
responseObserver.onCompleted();
}

@Override
public StreamObserver<MyRequest> getByKeyClientStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
String values = "";
@Override
public void onNext(MyRequest myRequest) {
int key = myRequest.getKeys(0);
values += map.getOrDefault(key, "null");
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {
responseObserver.onNext(MyResponse.newBuilder().setValue(values).build());
responseObserver.onCompleted();
}
};
}

@Override
public StreamObserver<MyRequest> getByKeyBiStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
@Override
public void onNext(MyRequest myRequest) {
int key = myRequest.getKeys(0);
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}

服务端启动类:

public class RpcServer {
public static final int port = 8088;

public static void main( String[] args ) throws IOException, InterruptedException {
MyRpcServiceImpl service = new MyRpcServiceImpl();
Server server = io.grpc.ServerBuilder
.forPort(port)
.addService(service)
.build();
server.start();
server.awaitTermination();
}
}

客户端启动类:

public class RpcClient {
private static ManagedChannel channel = ManagedChannelBuilder
.forAddress("127.0.0.1", RpcServer.port)
.usePlaintext()
.build();

private static MyServiceGrpc.MyServiceBlockingStub blockingStub = MyServiceGrpc.newBlockingStub(channel);
private static MyServiceGrpc.MyServiceStub asyncStub = MyServiceGrpc.newStub(channel);

private static final StreamObserver<MyResponse> responseObserver = new StreamObserver<MyResponse>() {
@Override
public void onNext(MyResponse response) {
System.out.println("receive: " + response.getValue());
}

@Override
public void onError(Throwable t) {
System.out.println("error");
}

@Override
public void onCompleted() {
System.out.println("completed");
}
};

public static void main(String[] args) throws InterruptedException {
simpleSync();
simpleAsync();
serverStreamSync();
serverStreamAsync();
clientStream();
biStream();

Thread.sleep(100000);
}

private static void simpleSync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.build();
String value = blockingStub.getByKey(request).getValue();
System.out.println(value);
}

private static void simpleAsync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.build();
asyncStub.getByKey(request, responseObserver);
}

private static void serverStreamSync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.addKeys(2)
.addKeys(3)
.build();
Iterator<MyResponse> itr = blockingStub.getByKeyServerStream(request);
while (itr.hasNext()) {
System.out.println(itr.next());
}
}

private static void serverStreamAsync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.addKeys(2)
.addKeys(3)
.build();
asyncStub.getByKeyServerStream(request, responseObserver);
}

private static void clientStream() {
StreamObserver<MyRequest> requestData = asyncStub.getByKeyClientStream(responseObserver);
for (int i = 1; i <= 5; i++) {
requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
}
requestData.onCompleted();
}

private static void biStream() {
StreamObserver<MyRequest> requestData = asyncStub.getByKeyBiStream(responseObserver);
for (int i = 1; i <= 5; i++) {
requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
}
requestData.onCompleted();
}
}

对于同步stub,只能调用unary以及服务端stream的方法;对于异步stub,可以调用任意方法;

unary以及服务端stream写法比较简单;对于客户端stream的情况,需要在构建请求参数的observer。

\

举报

相关推荐

0 条评论