1. 背景
Alluxio底层使用到了Grpc作为底层通讯框架,为了弄清楚Alluxio服务端的线程模型,必然需要对Grpc代码有所掌握。本文先介绍Grpc底层HTTP2的知识,通过一个GRPC项目,深入GRPC源码,探索GRPC的线程模型。
2. Grpc简介
GRPC是Google 推出的RPC框架。并且支持多种语言。GRPC的几种模式:
单向RPC
客户端发出单个请求,获得单个响应。客户端调用服务端的某个方法。客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。如下:
rpc SayHello (HelloRequest) returns (HelloReply) {}
服务端流式 RPC
客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。个人认为当有客户端需要主动从服务端读取数据的时候可以用。如下:
rpc RecordRoute(stream Point) returns (RouteSummary) {}
客户端流式 RPC
客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。个人认为应该是客户端需要把数据发送给服务端的时候使用。如下:
rpc ListFeatures(Rectangle) returns (stream Feature) {}
双向流式 RPC
是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。如下:
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
3. HTTP多路复用模型发展历程
当客户端想要发送100次HTTP请求时,默认情况就是发送第一个HTTP请求,收到响应后,才能发送下一个HTTP请求,这个效率非常低。在HTTP协议发展过程中,针对这个特性逐步进行了优化,逐步发展出HTTP多路复用模型。
3.1 HTTP1.0
对于HTTP1.0协议,客户端连续发送100个HTTP请求要经历下面的流程:
- 每发送一个HTTP请求,就需要建立一个TCP连接,经历三次握手。
- 每个TCP连接都要经历拥塞控制,通过慢启动探测网络的拥塞情况,TCP的滑动窗口才从0上升到最大值(滑动窗口控制TCP数据报的传输的并发度)。
- HTTP是无状态协议,必须等前面的HTTP的响应处理,才能发下一个请求。如果HTTP请求乱序发送,无法确定HTTP响应是针对哪个请求的,因此HTTP是无状态协议。
上面HTTP1.0协议中,第1、2条流程可以知道HTTP性能非常差,TCP连接+拥塞控制慢启动,增加了HTTP请求的额外耗时。第3条流程使得HTTP请求无法并发。可以针对耗时和并发这两点来优化HTTP协议。
3.2 HTTP1.1
HTTP1.1针对HTTP的缺点,可以通过在发送http的请求头中设置Connection: keep-alive
进行连接复用,将放到同一个socket服务端的请求放到一个TCP连接中,避免多次连接。使用连接复用时,还是要经历请求->响应->请求->响应...的过程。对于多个HTTP请求,它们依然是串行的:
为了并行发送HTTP请求,即一次性把所有HTTP请求发送出去,最后按照发送顺序接收响应,HTTP1.1提出了pipeline管线化技术。如下图所示,客户端一次性发送所有HTTP请求,服务端依次处理所有请求并依次返回响应:
pipeline技术有一个致命缺陷,就是线头阻塞(Head-of-line blocking),即服务端一旦处理某个HTTP较慢,后面的HTTP请求均要进行阻塞等待,因此性能不好。由于这个缺陷,HTTP1.1管线化技术并未普及。
3.3 HTTP2
HTTP2对HTTP1.1中的管线化技术进行改进,将所有的HTTP请求分批发送到服务端,每个批次有一个StreamID,即流ID。每一个流中的HTTP请求要依次处理,不用的流中的HTTP请求可以并行处理。这样,一个流中的HTTP请求即使发生阻塞,也不会影响其他流中的HTTP请求:
如下所示,所有流共享一条TCP连接,不同流HTTP请求并发发送,同一个流中的HTTP请求依次发送:
HTTP2优点如下:
- 一个客户端与一个服务端端口连接时,即使发送多个请求,只会使用一个TCP连接。
- Request通过streamId并行发送请求/响应,实现并发HTTP发送的效果。
4.项目代码
项目定义一个proto文件,编译成为Java类和Grpc代码。通过grpc代码发送HTTP请求。本文通过debug这个项目,逐步深入了解GRPC通信框架。
4.1 maven依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<grpc.version>1.6.1</grpc.version>
<protobuf.version>3.3.0</protobuf.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.52.Final</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<protoSourceRoot>${project.basedir}/src/main/java/proto</protoSourceRoot>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4.2 proto定义
syntax = "proto3";
option java_outer_classname = "Hello";
package protobuf;
option java_generic_services = true;
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
rpc SayHi (stream HelloRequest) returns (HelloResponse);
rpc SayGood (HelloRequest) returns (stream HelloResponse);
rpc SayBad (stream HelloRequest) returns (stream HelloResponse);
rpc SayOK (stream HelloRequest) returns (stream HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}
生成两个类:
1.Hello类就是序列化类,包含请求类HelloRequest和响应类HelloResponse。
2.HelloServiceGrpc则是代理类,客户端调用这个类远程调用服务端的方法。
序列化类如下所示:
代理类如下所示:
- HelloServiceBlockingStub表示阻塞式调用,收到响应前一直阻塞。
- HelloServiceStub表示异步调用,传入回调方法,请求完后不阻塞等待响应,异步线程收到响应时,直接调用回调方法处理。
4.3 服务端代码
服务端启动方法监听19999端口,并指定StreamHelloServiceImpl类为服务端处理请求的逻辑:
public class StreamServer {
public static void main(String[] args) throws IOException, InterruptedException {
io.grpc.Server server = ServerBuilder.forPort(19999).addService(new StreamHelloServiceImpl()).build().start();
System.out.println("start server");
server.awaitTermination();
}
}
服务端处理逻辑分为两个方法:
- 当客户端远程调用sayHello方法时,服务端调用sayHello处理客户端请求。当客户端发出一次请求时,服务端响应三次。
- 当客户端远程调用sayOK方法时,服务端调用sayOK处理客户端请求。当客户端发出一次请求时,服务端响应三次。
public class StreamHelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
@Override
public StreamObserver<Hello.HelloRequest> sayBad(StreamObserver<Hello.HelloResponse> responseObserver) {
return new StreamObserver<Hello.HelloRequest>() {
@Override
public void onNext(Hello.HelloRequest value) {
System.out.println("receive : " + value.getGreeting());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad1: " + value.getGreeting()).build());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad2: " + value.getGreeting()).build());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad3: " + value.getGreeting()).build());
}
@Override
public void onError(Throwable t) {
System.out.println("error");
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed");
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Hello.HelloRequest> sayOK(StreamObserver<Hello.HelloResponse> responseObserver) {
return new StreamObserver<Hello.HelloRequest>() {
@Override
public void onNext(Hello.HelloRequest value) {
System.out.println("receive : " + value.getGreeting());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok1: " + value.getGreeting()).build());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok2: " + value.getGreeting()).build());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok3: " + value.getGreeting()).build());
}
@Override
public void onError(Throwable t) {
System.out.println("error");
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed");
responseObserver.onCompleted();
}
};
}
}
本文中只讨论双向流式调用,其他的调用方式同理。
4.4 客户端代码
客户端指定一个回调对象streamObserver,用于打印服务端的响应。在main方法中向服务端发出sayBad和sayOK两种rpc请求,每种请求发送两条消息:
public class StreamClient {
static StreamObserver<Hello.HelloResponse> streamObserver = new StreamObserver<Hello.HelloResponse>(){
@Override
public void onNext(Hello.HelloResponse value) {
System.out.println(value.getReply());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed");
}
};
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 19999).usePlaintext(true).build();
HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(channel);
StreamObserver<Hello.HelloRequest> helloRequestStreamObserver = helloServiceStub.sayBad(streamObserver);
helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: i'm sad").build());
helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: i'm happy").build());
StreamObserver<Hello.HelloRequest> helloOKStreamObserver = helloServiceStub.sayOK(streamObserver);
helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: i'm sad").build());
helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: i'm happy").build());
Thread.sleep(1000);
//channel.shutdown();
}
}
4.4 运行结果
服务端收到客户端请求:
客户端收到服务端响应:
5. 客户端执行流程
5.1 创建ManagedChannelImpl对象
创建ManagedChannel实现类:
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9999).usePlaintext(true).build();
调用forAddress方法,输入目的端的ip地址和端口:
public static ManagedChannelBuilder<?> forAddress(String name, int port) {
return ManagedChannelProvider.provider().builderForAddress(name, port);
}
ManagedChannelProvider.provider()方法负责找到提供ManagedChannel实现的类:
public static ManagedChannelProvider provider() {
if (provider == null) {
throw new ProviderNotFoundException("No functional channel service provider found. "
+ "Try adding a dependency on the grpc-okhttp or grpc-netty artifact");
}
return provider;
}
如下,默认通过SPI加载ManagedChannelProvider实现类:
private static final ManagedChannelProvider provider
= load(ManagedChannelProvider.class.getClassLoader());
load方法最终通过SPI加载实现类:
public static Iterable<ManagedChannelProvider> getCandidatesViaServiceLoader(
ClassLoader classLoader) {
return ServiceLoader.load(ManagedChannelProvider.class, classLoader);
}
发现grpc-netty包实现ManagedChannelProvider接口:
其实现类是:io.grpc.netty.NettyChannelProvider
后续通过NettyChannelProvider对象创建NettyChannelBuilder对象,NettyChannelBuilder负责构建NettyChannel。如下记录目的地址和端口:
public NettyChannelBuilder builderForAddress(String name, int port) {
return NettyChannelBuilder.forAddress(name, port);
}
通过debug发现,builderForAddress方法给NettyChannelBuilder的祖父类初始化字符串成员变量:
设置使用明文传输,数据不需要加密:
public NettyChannelBuilder usePlaintext(boolean skipNegotiation) {
if (skipNegotiation) {
negotiationType(NegotiationType.PLAINTEXT);
} else {
negotiationType(NegotiationType.PLAINTEXT_UPGRADE);
}
return this;
}
通过debug发现,usePlaintext方法最终给NettyChannelBuilder初始化enum类型测成员变量:
最终通过NettyChannelBuilder#build()方法生成ManagedChannelImpl对象,NettyChannelBuilder作为其形式参数,它记录了地址和port等信息:
public ManagedChannel build() {
return new ManagedChannelImpl(
this,
buildTransportFactory(),
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
GrpcUtil.STOPWATCH_SUPPLIER,
getEffectiveInterceptors());
}
NettyChannelBuilder返回了一个ManagedChannelImpl对象,其构造参数较多,如下所示:
ManagedChannelImpl(
AbstractManagedChannelImplBuilder<?> builder,
ClientTransportFactory clientTransportFactory,
BackoffPolicy.Provider backoffPolicyProvider,
ObjectPool<? extends Executor> oobExecutorPool,
Supplier<Stopwatch> stopwatchSupplier,
List<ClientInterceptor> interceptors) {
this.target = checkNotNull(builder.target, "target");
this.nameResolverFactory = builder.getNameResolverFactory();
this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
this.loadBalancerFactory =
checkNotNull(builder.loadBalancerFactory, "loadBalancerFactory");
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor);
this.delayedTransport.start(delayedTransportListener);
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
this.idleTimeoutMillis = builder.idleTimeoutMillis;
} else {
checkArgument(
builder.idleTimeoutMillis
>= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
"invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
this.idleTimeoutMillis = builder.idleTimeoutMillis;
}
this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
this.userAgent = builder.userAgent;
log.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
}
NettyChannelBuilder传递给ManagedChannelImpl的几个参数非常关键,下面展开看下传递的几个参数。
5.1.1 第一个参数:AbstractManagedChannelImplBuilder
第一个参数就是AbstractManagedChannelImplBuilder类型,它是抽象类,具体的实现就是NettyChannelBuilder,通过初始化,该对象包含服务端的ip+端口,同时通过NegotiationType.PLAINTEXT指定了明文传输。
5.1.2 第二个参数:ClientTransportFactory
grpc-netty包通过调用NettyChannelBuilder#buildTransportFactory方法,创建了NettyTransportFactory对象,NettyTransportFactory类是ClientTransportFactory的实现类。NettyTransportFactory 它负责创建NettyClientTransport,表示通过Netty作为数据传输框架:
protected ClientTransportFactory buildTransportFactory() {
return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions,
negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls);
}
NettyTransportFactory接收来自NettyChannelBuilder的参数,最终通过NettyTransportFactory#newClientTransport创建NettyClientTransport成员。一个NettyClientTransport对象管理当前客户端与服务端的指定端口的服务的连接:
public ConnectionClientTransport newClientTransport(
SocketAddress serverAddress, String authority, @Nullable String userAgent) {
checkState(!closed, "The transport factory is closed.");
TransportCreationParamsFilter dparams =
transportCreationParamsFilterFactory.create(serverAddress, authority, userAgent);
final AtomicBackoff.State keepAliveTimeNanosState = keepAliveTimeNanos.getState();
Runnable tooManyPingsRunnable = new Runnable() {
@Override
public void run() {
keepAliveTimeNanosState.backoff();
}
};
NettyClientTransport transport = new NettyClientTransport(
dparams.getTargetServerAddress(), channelType, channelOptions, group,
dparams.getProtocolNegotiator(), flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
tooManyPingsRunnable);
return transport;
}
NettyClientTransport维护netty的成员变量:
class NettyClientTransport implements ConnectionClientTransport {
private final LogId logId = LogId.allocate(getClass().getName());
private final Map<ChannelOption<?>, ?> channelOptions;
//rpc服务端的socket地址
private final SocketAddress address;
//NioSocketChannel类型
private final Class<? extends Channel> channelType;
//Netty的NioEventLoopGroup
private final EventLoopGroup group;
private final ProtocolNegotiator negotiator;
private final AsciiString authority;
private final AsciiString userAgent;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
private KeepAliveManager keepAliveManager;
private final long keepAliveTimeNanos;
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
private final Runnable tooManyPingsRunnable;
private ProtocolNegotiator.Handler negotiationHandler;
//客户端的Handler
private NettyClientHandler handler;
// We should not send on the channel until negotiation completes. This is a hard requirement
// by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
//NioSocketChannel对象
private Channel channel;
/** If {@link #start} has been called, non-{@code null} if channel is {@code null}. */
private Status statusExplainingWhyTheChannelIsNull;
}
NettyClientTransport有两个重要方法:start和newStream方法:
start方法负责通过netty连接服务端:
public Runnable start(Listener transportListener) {
lifecycleManager = new ClientTransportLifecycleManager(
Preconditions.checkNotNull(transportListener, "listener"));
EventLoop eventLoop = group.next();
if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
keepAliveManager = new KeepAliveManager(
new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
keepAliveWithoutCalls);
}
handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
NettyHandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);
Bootstrap b = new Bootstrap();
b.group(eventLoop);
b.channel(channelType);
if (NioSocketChannel.class.isAssignableFrom(channelType)) {
b.option(SO_KEEPALIVE, true);
}
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
// Every entry in the map is obtained from
// NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
// so it is safe to pass the key-value pair to b.option().
b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
}
/**
* We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
* is executed in the event loop and we need this handler to be in the pipeline immediately so
* that it may begin buffering writes.
*/
b.handler(negotiationHandler);
ChannelFuture regFuture = b.register();
channel = regFuture.channel();
if (channel == null) {
// Initialization has failed badly. All new streams should be made to fail.
Throwable t = regFuture.cause();
if (t == null) {
t = new IllegalStateException("Channel is null, but future doesn't have a cause");
}
statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
// Use a Runnable since lifecycleManager calls transportListener
return new Runnable() {
@Override
public void run() {
// NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
// an event loop in this case, so nothing should be accessing the lifecycleManager. We
// could use GlobalEventExecutor (which is what regFuture would use for notifying
// listeners in this case), but avoiding on-demand thread creation in an error case seems
// a good idea and is probably clearer threading.
lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
}
};
}
// Start the write queue as soon as the channel is constructed
handler.startWriteQueue(channel);
// Start the connection operation to the server.
channel.connect(address);
// This write will have no effect, yet it will only complete once the negotiationHandler
// flushes any pending writes.
channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// Need to notify of this failure, because NettyClientHandler may not have been added to
// the pipeline before the error occurred.
lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
}
}
});
// Handle transport shutdown when the channel is closed.
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Typically we should have noticed shutdown before this point.
lifecycleManager.notifyTerminated(
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
}
});
if (keepAliveManager != null) {
keepAliveManager.onTransportStarted();
}
return null;
}
连接过程中,注册了NettyClientHandler,这个是grpc通信的重中之重,它定义了HTTP2数据包发送的流程,也定义了数据包接收的流程。NettyClientHandler同时实现了ChannelInboundHandler和ChannelOutboundHandler。继承图下所示:
NettyClientHandler重写了ChannelOutboundHandler方法,根据不同的msg执行不同的行为:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof CreateStreamCommand) {
createStream((CreateStreamCommand) msg, promise);
} else if (msg instanceof SendGrpcFrameCommand) {
sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
} else if (msg instanceof CancelClientStreamCommand) {
cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
} else if (msg instanceof SendPingCommand) {
sendPingFrame(ctx, (SendPingCommand) msg, promise);
} else if (msg instanceof GracefulCloseCommand) {
gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
} else if (msg instanceof ForcefulCloseCommand) {
forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
} else if (msg == NOOP_MESSAGE) {
ctx.write(Unpooled.EMPTY_BUFFER, promise);
} else {
throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
}
}
最重要的,sendGrpcFrame负责向远程发送数据包:
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
ChannelPromise promise) {
// Call the base class to write the HTTP/2 DATA frame.
// Note: no need to flush since this is handled by the outbound flow controller.
encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
}
发送数据包流程较为复杂,大致思路是通过DefaultHttp2RemoteFlowController进行流量控制,控制发送的数据大小。由DefaultHttp2FrameWriter类发送数据包,数据包分两种:header帧和payload帧。
writeHeaders方法发送header帧:
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean exclusive,
int padding, boolean endStream, ChannelPromise promise) {
return writeHeadersInternal(ctx, streamId, headers, padding, endStream,
true, streamDependency, weight, exclusive, promise);
}
writeHeadersInternal定义Header帧的发送逻辑,简化后的处理过程如下:
writeFrameHeaderInternal(buf, payloadLength, HEADERS, flags, streamId);
writePaddingLength(buf, padding);
writeFrameHeaderInternal由header帧和payload帧共用的方法。定义了帧的类型,流ID,长度等等信息。Header帧使用了HEADERS类型:
static void writeFrameHeaderInternal(ByteBuf out, int payloadLength, byte type,
Http2Flags flags, int streamId) {
out.writeMedium(payloadLength);
out.writeByte(type);
out.writeByte(flags.value());
out.writeInt(streamId);
}
当发送payload帧时,调用writeData方法。调用writeFrameHeaderInternal,使用DATA类型表示发送payload帧:
writeFrameHeaderInternal(frameHeader2, remainingData, DATA, flags, streamId);
ctx.write(frameHeader2, promiseAggregator.newPromise());
// Write the payload.
ByteBuf lastFrame = data.readSlice(remainingData);
data = null;
ctx.write(lastFrame, promiseAggregator.newPromise());
5.2 创建RPC代理对象并执行
客户端main方法中,创建RPC代理对象:
HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(channel);
StreamObserver<Hello.HelloRequest> helloRequestStreamObserver = helloServiceStub.sayBad(streamObserver);
HelloServiceGrpc是通过protoc编译生成的RPC代理类。有以下方法:
客户端执行的两个方法如下:
public io.grpc.stub.StreamObserver<protobuf.Hello.HelloRequest> sayBad(
io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse> responseObserver) {
return asyncBidiStreamingCall(
getChannel().newCall(METHOD_SAY_BAD, getCallOptions()), responseObserver);
}
public io.grpc.stub.StreamObserver<protobuf.Hello.HelloRequest> sayOK(
io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse> responseObserver) {
return asyncBidiStreamingCall(
getChannel().newCall(METHOD_SAY_OK, getCallOptions()), responseObserver);
}
它们都执行asyncBidiStreamingCall方法,该方法的第一个参数是通过ManagedChannelImpl创建的ClientCallImpl对象,第二个参数responseObserver就是自定义的回调方法。传入asyncStreamingRequestCall方法中:
private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<ReqT>(call);
startCall(
call,
new StreamObserverToCallListenerAdapter<ReqT, RespT>(
responseObserver, adapter, streamingResponse),
streamingResponse);
return adapter;
}
通过ManagedChannelImpl$RealChannel#newCall
创建调用对象,传入要执行的方法,以及在哪个executor线程池中执行:
public final class ManagedChannelImpl extends ManagedChannel implements WithLogId {
private class RealChannel extends Channel {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
Executor executor = callOptions.getExecutor();
if (executor == null) {
executor = ManagedChannelImpl.this.executor;
}
return new ClientCallImpl<ReqT, RespT>(
method,
executor,
callOptions,
transportProvider,
terminated ? null : transportFactory.getScheduledExecutorService())
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry);
}
}
注意executor通过builder.executorPool
创建:
//线程池
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
//executorPool.getObject()表示创建一个线程
this.executor = checkNotNull(executorPool.getObject(), "executor");
而executorPool是AbstractManagedChannelImplBuilder类定义的DEFAULT_EXECUTOR_POOL:
public abstract class AbstractManagedChannelImplBuilder
<T extends AbstractManagedChannelImplBuilder<T>> extends ManagedChannelBuilder<T> {
private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL =
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
}
executor的线程池定义线程名是grpc-default-executor:
public final class GrpcUtil {
public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
new Resource<ExecutorService>() {
private static final String NAME = "grpc-default-executor";
@Override
public ExecutorService create() {
return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
}
@Override
public void close(ExecutorService instance) {
instance.shutdown();
}
@Override
public String toString() {
return NAME;
}
};
}
创建好调用对象ClientCallImpl对象后,中间会执行start方法,responseObserver此时已经封装称为ClientCall.Listener了:
public final class ClientCalls {
private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call,
ClientCall.Listener<RespT> responseListener, boolean streamingResponse) {
call.start(responseListener, new Metadata());
if (streamingResponse) {
call.request(1);
} else {
// Initially ask for two responses from flow-control so that if a misbehaving server sends
// more than one responses, we can catch it and fail it in the listener.
call.request(2);
}
}
}
不管是调用那个rpc方法,都会执行ClientCallImpl#start方法,它创建并启动客户端的流DelayedStream:
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
public void start(final Listener<RespT> observer, Metadata headers) {
checkState(stream == null, "Already started");
checkNotNull(observer, "observer");
checkNotNull(headers, "headers");
final String compressorName = callOptions.getCompressor();
Compressor compressor = null;
//
prepareHeaders(headers, decompressorRegistry, compressor);
Deadline effectiveDeadline = effectiveDeadline();
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
if (!deadlineExceeded) {
updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
context.getDeadline(), headers);
ClientTransport transport = clientTransportProvider.get(
new PickSubchannelArgsImpl(method, headers, callOptions));
Context origContext = context.attach();
try {
//创建客户端流
stream = transport.newStream(method, headers, callOptions);
} finally {
context.detach(origContext);
}
} else {
stream = new FailingClientStream(DEADLINE_EXCEEDED);
}
if (callOptions.getAuthority() != null) {
stream.setAuthority(callOptions.getAuthority());
}
if (callOptions.getMaxInboundMessageSize() != null) {
stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
}
if (callOptions.getMaxOutboundMessageSize() != null) {
stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
}
stream.setCompressor(compressor);
stream.setDecompressorRegistry(decompressorRegistry);
//启动客户端流
stream.start(new ClientStreamListenerImpl(observer));
// Delay any sources of cancellation after start(), because most of the transports are broken if
// they receive cancel before start. Issue #1343 has more details
// Propagate later Context cancellation to the remote side.
context.addListener(cancellationListener, directExecutor());
if (effectiveDeadline != null
// If the context has the effective deadline, we don't need to schedule an extra task.
&& context.getDeadline() != effectiveDeadline
// If the channel has been terminated, we don't need to schedule an extra task.
&& deadlineCancellationExecutor != null) {
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
}
if (cancelListenersShouldBeRemoved) {
// Race detected! ClientStreamListener.closed may have been called before
// deadlineCancellationFuture was set / context listener added, thereby preventing the future
// and listener from being cancelled. Go ahead and cancel again, just to be sure it
// was cancelled.
removeContextListenerAndCancelDeadlineFuture();
}
}
启动DelayedStream时,调用父类AbstractClientStream#start方法,注意,该方法将streamObserver封装成的Listener设置成为AbstractClientStream的成员变量,共message读取时调用:
public void start(ClientStreamListener listener) {
checkState(this.listener == null, "already started");
Status savedError;
boolean savedPassThrough;
synchronized (this) {
this.listener = checkNotNull(listener, "listener");
// If error != null, then cancel() has been called and was unable to close the listener
savedError = error;
savedPassThrough = passThrough;
if (!savedPassThrough) {
listener = delayedListener = new DelayedStreamListener(listener);
}
}
if (savedError != null) {
listener.closed(savedError, new Metadata());
return;
}
if (savedPassThrough) {
realStream.start(listener);
} else {
final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.start(finalListener);
}
});
}
}
客户端接收到服务器响应时,最终streamObserver中定义的处理逻辑会在异步线程中执行。如下所示,EventLoop线程此时正在执行NettyChannelHandler,NettyChannelHandler的祖父类ByteToMessageDecoder定义了消息接收方法,执行channelRead方法:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
//将消息解码
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
//省略
}
}
}
}
callDecode过程中,会调用DefaultHttp2FrameReader#processPayloadState解析服务端返回的数据包:
public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSizePolicy, Configuration {
private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)
throws Http2Exception {
if (in.readableBytes() < payloadLength) {
// Wait until the entire payload has been read.
return;
}
// Only process up to payloadLength bytes.
int payloadEndIndex = in.readerIndex() + payloadLength;
// We have consumed the data, next time we read we will be expecting to read a frame header.
readingHeaders = true;
// Read the payload and fire the frame event to the listener.
switch (frameType) {
case DATA:
readDataFrame(ctx, in, payloadEndIndex, listener);
break;
case HEADERS:
readHeadersFrame(ctx, in, payloadEndIndex, listener);
break;
case PRIORITY:
readPriorityFrame(ctx, in, listener);
break;
case RST_STREAM:
readRstStreamFrame(ctx, in, listener);
break;
case SETTINGS:
readSettingsFrame(ctx, in, listener);
break;
case PUSH_PROMISE:
readPushPromiseFrame(ctx, in, payloadEndIndex, listener);
break;
case PING:
readPingFrame(ctx, in.readLong(), listener);
break;
case GO_AWAY:
readGoAwayFrame(ctx, in, payloadEndIndex, listener);
break;
case WINDOW_UPDATE:
readWindowUpdateFrame(ctx, in, listener);
break;
case CONTINUATION:
readContinuationFrame(in, payloadEndIndex, listener);
break;
default:
readUnknownFrame(ctx, in, payloadEndIndex, listener);
break;
}
in.readerIndex(payloadEndIndex);
}
}
根据返回的消息类型决定如何处理,比如处理HEADER帧,中间执行AbstractClientStream$TransportState#inboundHeadersReceived
:
public abstract class AbstractClientStream extends AbstractStream
implements ClientStream, MessageFramer.Sink {
protected abstract static class TransportState extends AbstractStream.TransportState {
protected void inboundHeadersReceived(Metadata headers) {
Preconditions.checkState(!statusReported, "Received headers on closed stream");
statsTraceCtx.clientInboundHeaders();
Decompressor decompressor = Codec.Identity.NONE;
String encoding = headers.get(MESSAGE_ENCODING_KEY);
if (encoding != null) {
decompressor = decompressorRegistry.lookupDecompressor(encoding);
if (decompressor == null) {
deframeFailed(Status.INTERNAL.withDescription(
String.format("Can't find decompressor for %s", encoding)).asRuntimeException());
return;
}
}
setDecompressor(decompressor);
listener().headersRead(headers);
}
}
}
最终调用ClientCallImpl#headersRead方法:
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
public void headersRead(final Metadata headers) {
class HeadersRead extends ContextRunnable {
HeadersRead() {
super(context);
}
@Override
public final void runInContext() {
try {
if (closed) {
return;
}
observer.onHeaders(headers);
} catch (Throwable t) {
Status status =
Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
stream.cancel(status);
close(status, new Metadata());
}
}
}
callExecutor.execute(new HeadersRead());
}
}
在执行observer.onHeaders(headers)
时,底层调用的是StreamObserverToCallListenerAdapter#onHeaders方法,为空实现:
public final class ClientCalls {
private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
extends ClientCall.Listener<RespT> {
public void onHeaders(Metadata headers) {
}
}
}
当读取DATA帧时,调用readDataFrame方法,中间执行AbstractClientStream$TransportState#inboundDataReceived
方法:
public abstract class AbstractClientStream extends AbstractStream
implements ClientStream, MessageFramer.Sink {
protected abstract static class TransportState extends AbstractStream.TransportState {
protected void inboundDataReceived(ReadableBuffer frame) {
Preconditions.checkNotNull(frame, "frame");
boolean needToCloseFrame = true;
try {
if (statusReported) {
log.log(Level.INFO, "Received data on closed stream");
return;
}
needToCloseFrame = false;
//解析接受到的DATA帧
deframe(frame);
} finally {
if (needToCloseFrame) {
frame.close();
}
}
}
}
}
DATA帧中的处理逻辑稍显复杂,会通过MessageDeframer#deliver进行DATA数据投递:
public class MessageDeframer implements Closeable, Deframer {
private void deliver() {
// We can have reentrancy here when using a direct executor, triggered by calls to
// request more messages. This is safe as we simply loop until pendingDelivers = 0
if (inDelivery) {
return;
}
inDelivery = true;
try {
// Process the uncompressed bytes.
while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) {
switch (state) {
//处理DATA帧中的HTTP消息头
case HEADER:
processHeader();
break;
//处理DATA帧中的HTTP消息体
case BODY:
// Read the body and deliver the message.
processBody();
// Since we've delivered a message, decrement the number of pending
// deliveries remaining.
pendingDeliveries--;
break;
default:
throw new AssertionError("Invalid state: " + state);
}
}
//省略
}
}
处理到消息体时,调用MessageDeframer#processBody方法:
public class MessageDeframer implements Closeable, Deframer {
private void processBody() {
InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
nextFrame = null;
listener.messagesAvailable(new SingleMessageProducer(stream));
// Done with this frame, begin processing the next header.
state = State.HEADER;
requiredLength = HEADER_LENGTH;
}
}
后面就调用AbstractClientStream$TransportState#messagesAvailable
方法:
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private class ClientStreamListenerImpl implements ClientStreamListener {
public void messagesAvailable(final MessageProducer producer) {
class MessagesAvailable extends ContextRunnable {
MessagesAvailable() {
super(context);
}
@Override
public final void runInContext() {
if (closed) {
GrpcUtil.closeQuietly(producer);
return;
}
InputStream message;
try {
while ((message = producer.next()) != null) {
try {
//调用DATA的处理逻辑
observer.onMessage(method.parseResponse(message));
} catch (Throwable t) {
GrpcUtil.closeQuietly(message);
throw t;
}
message.close();
}
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
Status status =
Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
stream.cancel(status);
close(status, new Metadata());
}
}
}
//将Messages的处理逻辑封装成一个线程,放到线程池中执行
callExecutor.execute(new MessagesAvailable());
}
}
}
callExecutor就是将ClientCallImpl初始化的executor,即传SHARED_CHANNEL_EXECUTOR线程池进行包装到SerializingExecutor类中:
this.callExecutor = executor == directExecutor()
? new SerializeReentrantCallsDirectExecutor()
: new SerializingExecutor(executor);
SerializingExecutor类依次通过异步线程池执行队列中的Runnable逻辑,保证了执行的前后顺序:
public SerializingExecutor(Executor executor) {
Preconditions.checkNotNull(executor, "'executor' must not be null.");
this.executor = executor;
}
/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
*/
@Override
public void execute(Runnable r) {
runQueue.add(checkNotNull(r, "'r' must not be null."));
schedule(r);
}
private void schedule(@Nullable Runnable removable) {
if (running.compareAndSet(false, true)) {
boolean success = false;
try {
executor.execute(this);
success = true;
} finally {
// It is possible that at this point that there are still tasks in
// the queue, it would be nice to keep trying but the error may not
// be recoverable. So we update our state and propagate so that if
// our caller deems it recoverable we won't be stuck.
if (!success) {
if (removable != null) {
// This case can only be reached if 'this' was not currently running, and we failed to
// reschedule. The item should still be in the queue for removal.
// ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
// throw if the item to remove is null. If removable is present in the queue twice,
// the wrong one may be removed. It doesn't seem possible for this case to exist today.
// This is important to run in case of RejectedExectuionException, so that future calls
// to execute don't succeed and accidentally run a previous runnable.
runQueue.remove(removable);
}
running.set(false);
}
}
}
}
在异步线程中执行DATA处理逻辑,observer.onMessage(method.parseResponse(message))
其实就是调用ClientCalls$StreamObserverToCallListenerAdapter#onMessage
方法,它执行observer.onNext(message)
,即执行用户自定义的处理逻辑:
public final class ClientCalls {
private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
extends ClientCall.Listener<RespT> {
public void onMessage(RespT message) {
if (firstResponseReceived && !streamingResponse) {
throw Status.INTERNAL
.withDescription("More than one responses received for unary or client-streaming call")
.asRuntimeException();
}
firstResponseReceived = true;
observer.onNext(message);
if (streamingResponse && adapter.autoFlowControlEnabled) {
// Request delivery of the next inbound message.
adapter.request(1);
}
}
}
}
上述执行的onNext方法就是我在客户端启动类中定义的StreamObserver匿名类:
public class StreamClient {
static StreamObserver<Hello.HelloResponse> streamObserver = new StreamObserver<Hello.HelloResponse>(){
@Override
public void onNext(Hello.HelloResponse value) {
System.out.println(value.getReply());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed");
}
};
}
将DATA帧中的数据放到异步线程中执行,应该是担心客户端处理服务端响应的数据包时间过久,导致netty的EventLoop线程阻塞,这就无法处理其他socket数据了。
5.3 Netty客户端连接服务端
通过5.2节可以知道,在调用sayBad这个rpc方法时,中间会调用ClientCallImpl#start方法,如下所示:
public void start(final Listener<RespT> observer, Metadata headers) {
//省略
prepareHeaders(headers, decompressorRegistry, compressor);
Deadline effectiveDeadline = effectiveDeadline();
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
if (!deadlineExceeded) {
updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
context.getDeadline(), headers);
//创建DelayedClientTransport,实际最终还是创建NettyClientTransport对象
ClientTransport transport = clientTransportProvider.get(
new PickSubchannelArgsImpl(method, headers, callOptions));
Context origContext = context.attach();
try {
//创建DelayedStream对象
stream = transport.newStream(method, headers, callOptions);
} finally {
context.detach(origContext);
}
} else {
stream = new FailingClientStream(DEADLINE_EXCEEDED);
}
if (callOptions.getAuthority() != null) {
stream.setAuthority(callOptions.getAuthority());
}
if (callOptions.getMaxInboundMessageSize() != null) {
stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
}
if (callOptions.getMaxOutboundMessageSize() != null) {
stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
}
stream.setCompressor(compressor);
stream.setDecompressorRegistry(decompressorRegistry);
stream.start(new ClientStreamListenerImpl(observer));
// Delay any sources of cancellation after start(), because most of the transports are broken if
// they receive cancel before start. Issue #1343 has more details
// Propagate later Context cancellation to the remote side.
context.addListener(cancellationListener, directExecutor());
}
通过调用自定义的get方法获取DelayedClientTransport对象:
public final class ManagedChannelImpl extends ManagedChannel implements WithLogId {
private final ClientCallImpl.ClientTransportProvider transportProvider = new ClientCallImpl.ClientTransportProvider() {
public ClientTransport get(LoadBalancer.PickSubchannelArgs args) {
LoadBalancer.SubchannelPicker pickerCopy = ManagedChannelImpl.this.subchannelPicker;
if (ManagedChannelImpl.this.shutdown.get()) {
return ManagedChannelImpl.this.delayedTransport;
} else if (pickerCopy == null) {
ManagedChannelImpl.this.channelExecutor.executeLater(new Runnable() {
public void run() {
//退出空闲模式
ManagedChannelImpl.this.exitIdleMode();
}
}).drain();
return ManagedChannelImpl.this.delayedTransport;
} else {
LoadBalancer.PickResult pickResult = pickerCopy.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, args.getCallOptions().isWaitForReady());
return (ClientTransport)(transport != null ? transport : ManagedChannelImpl.this.delayedTransport);
}
}
};
}
在返回DelayedClientTransport前,调用ManagedChannelImpl.this.exitIdleMode()
退出空闲模式:
void exitIdleMode() {
if (!this.shutdown.get()) {
if (this.inUseStateAggregator.isInUse()) {
this.cancelIdleTimer();
} else {
this.rescheduleIdleTimer();
}
if (this.lbHelper == null) {
log.log(Level.FINE, "[{0}] Exiting idle mode", this.getLogId());
this.lbHelper = new LbHelperImpl(this.nameResolver);
this.lbHelper.lb = this.loadBalancerFactory.newLoadBalancer(this.lbHelper);
NameResolverListenerImpl listener = new NameResolverListenerImpl(this.lbHelper);
try {
//启动nameResolver
this.nameResolver.start(listener);
} catch (Throwable var3) {
listener.onError(Status.fromThrowable(var3));
}
}
}
}
nameResolver实现是DnsNameResolver,它负责将域名解析称为等效的ip:
public final synchronized void start(Listener listener) {
Preconditions.checkState(this.listener == null, "already started");
timerService = SharedResourceHolder.get(timerServiceResource);
//线程池,之前定义的GrpcUtil.SHARED_CHANNEL_EXECUTOR
executor = SharedResourceHolder.get(executorResource);
this.listener = Preconditions.checkNotNull(listener, "listener");
//解析域名
resolve();
}
private void resolve() {
if (resolving || shutdown) {
return;
}
executor.execute(resolutionRunnable);
}
在异步线程中执行resolutionRunnable
线程:
private final Runnable resolutionRunnable = new Runnable() {
@Override
public void run() {
Listener savedListener;
synchronized (DnsNameResolver.this) {
// If this task is started by refresh(), there might already be a scheduled task.
if (resolutionTask != null) {
resolutionTask.cancel(false);
resolutionTask = null;
}
if (shutdown) {
return;
}
savedListener = listener;
resolving = true;
}
try {
if (System.getenv("GRPC_PROXY_EXP") != null) {
EquivalentAddressGroup server =
new EquivalentAddressGroup(InetSocketAddress.createUnresolved(host, port));
savedListener.onAddresses(Collections.singletonList(server), Attributes.EMPTY);
return;
}
ResolutionResults resolvedInetAddrs;
try {
resolvedInetAddrs = delegateResolver.resolve(host);
} catch (Exception e) {
synchronized (DnsNameResolver.this) {
if (shutdown) {
return;
}
// Because timerService is the single-threaded GrpcUtil.TIMER_SERVICE in production,
// we need to delegate the blocking work to the executor
resolutionTask =
timerService.schedule(new LogExceptionRunnable(resolutionRunnableOnExecutor),
1, TimeUnit.MINUTES);
}
savedListener.onError(Status.UNAVAILABLE.withCause(e));
return;
}
// Each address forms an EAG
ArrayList<EquivalentAddressGroup> servers = new ArrayList<EquivalentAddressGroup>();
for (InetAddress inetAddr : resolvedInetAddrs.addresses) {
servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port)));
}
//执行Listener#onAddresses,处理这个地址相应连接
savedListener.onAddresses(servers, Attributes.EMPTY);
} finally {
synchronized (DnsNameResolver.this) {
resolving = false;
}
}
}
};
然后执行到ManagedChannelImpl#onAddresses:
public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
if (servers.isEmpty()) {
onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
return;
}
log.log(Level.FINE, "[{0}] resolved address: {1}, config={2}",
new Object[] {getLogId(), servers, config});
helper.runSerialized(new Runnable() {
@Override
public void run() {
if (terminated) {
return;
}
try {
//通过balance处理刚刚解析好的地址
balancer.handleResolvedAddressGroups(servers, config);
} catch (Throwable e) {
log.log(
Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e);
// It must be a bug! Push the exception back to LoadBalancer in the hope that it may
// be propagated to the application.
balancer.handleNameResolutionError(Status.INTERNAL.withCause(e)
.withDescription("Thrown from handleResolvedAddresses(): " + e));
}
}
});
}
调用PickFirstBalancerFactory$PickFirstBalancer#handleResolvedAddressGroups
方法处理连接:
public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
static final class PickFirstBalancer extends LoadBalancer {
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
// Flatten servers list received from name resolver into single address group. This means that
// as far as load balancer is concerned, there's virtually one single server with multiple
// addresses so the connection will be created only for the first address (pick first).
EquivalentAddressGroup newEag = flattenEquivalentAddressGroup(servers);
//初始情况下,没有连接,需要创建连接
if (subchannel == null) {
subchannel = helper.createSubchannel(newEag, Attributes.EMPTY);
// The channel state does not get updated when doing name resolving today, so for the moment
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
//创建连接
subchannel.requestConnection();
} else {
helper.updateSubchannelAddresses(subchannel, newEag);
}
}
}
}
ManagedChannelImpl$LbHelperImpl#updateBalancingState
--> DelayedClientTransport#reprocess
--> DelayedClientTransport$PendingStream#createRealStream
创建NettyClientStream对象:
private void createRealStream(ClientTransport transport) {
ClientStream realStream;
Context origContext = context.attach();
try {
realStream = transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions());
} finally {
context.detach(origContext);
}
setStream(realStream);
}
setStream则将realStream设置为成员变量:
final void setStream(ClientStream stream) {
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called.
if (realStream != null) {
return;
}
realStream = checkNotNull(stream, "stream");
}
drainPendingCalls();
}
创建连接经历ManagedChannelImpl#requestConnection->InternalSubchannel#obtainActiveTransport->InternalSubchannel#startNewTransport:
private void startNewTransport() {
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
if (addressIndex == 0) {
connectingTimer.reset().start();
}
List<SocketAddress> addrs = addressGroup.getAddresses();
final SocketAddress address = addrs.get(addressIndex);
//创建NettyClientTransport对象
ConnectionClientTransport transport =
transportFactory.newClientTransport(address, authority, userAgent);
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Created {1} for {2}",
new Object[] {logId, transport.getLogId(), address});
}
pendingTransport = transport;
transports.add(transport);
//执行NettyClientTransport#start方法
Runnable runnable = transport.start(new TransportListener(transport, address));
if (runnable != null) {
channelExecutor.executeLater(runnable);
}
}
进入NettyClientTransport#start方法,可以发现,在这里初始化Netty IO连接:
public Runnable start(Listener transportListener) {
lifecycleManager = new ClientTransportLifecycleManager(
Preconditions.checkNotNull(transportListener, "listener"));
EventLoop eventLoop = group.next();
if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
keepAliveManager = new KeepAliveManager(
new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
keepAliveWithoutCalls);
}
handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
NettyHandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);
Bootstrap b = new Bootstrap();
b.group(eventLoop);
b.channel(channelType);
if (NioSocketChannel.class.isAssignableFrom(channelType)) {
b.option(SO_KEEPALIVE, true);
}
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
// Every entry in the map is obtained from
// NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
// so it is safe to pass the key-value pair to b.option().
b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
}
/**
* We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
* is executed in the event loop and we need this handler to be in the pipeline immediately so
* that it may begin buffering writes.
*/
b.handler(negotiationHandler);
ChannelFuture regFuture = b.register();
channel = regFuture.channel();
if (channel == null) {
// Initialization has failed badly. All new streams should be made to fail.
Throwable t = regFuture.cause();
if (t == null) {
t = new IllegalStateException("Channel is null, but future doesn't have a cause");
}
statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
// Use a Runnable since lifecycleManager calls transportListener
return new Runnable() {
@Override
public void run() {
// NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
// an event loop in this case, so nothing should be accessing the lifecycleManager. We
// could use GlobalEventExecutor (which is what regFuture would use for notifying
// listeners in this case), but avoiding on-demand thread creation in an error case seems
// a good idea and is probably clearer threading.
lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
}
};
}
// Start the write queue as soon as the channel is constructed
handler.startWriteQueue(channel);
// Start the connection operation to the server.
channel.connect(address);
// This write will have no effect, yet it will only complete once the negotiationHandler
// flushes any pending writes.
channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// Need to notify of this failure, because NettyClientHandler may not have been added to
// the pipeline before the error occurred.
lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
}
}
});
// Handle transport shutdown when the channel is closed.
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Typically we should have noticed shutdown before this point.
lifecycleManager.notifyTerminated(
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
}
});
if (keepAliveManager != null) {
keepAliveManager.onTransportStarted();
}
return null;
}
到目前为止,客户端已经通过Netty与服务端建立了连接。
5.4 客户端发送HTTP2数据请求
如下,客户端发送了两个HTTP2请求:
helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: i'm sad").build());
helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: i'm happy").build());
调用ClientCalls$CallToStreamObserverAdapter#onNext
方法发送请求:
public final class ClientCalls {
private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
public void onNext(T value) {
call.sendMessage(value);
}
}
}
调用ClientCallImpl#sendMessage方法,将HelloRequest对象转化为InputStream,并开始发送数据:
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
public void sendMessage(ReqT message) {
Preconditions.checkState(stream != null, "Not started");
Preconditions.checkState(!cancelCalled, "call was cancelled");
Preconditions.checkState(!halfCloseCalled, "call was half-closed");
try {
// TODO(notcarl): Find out if messageIs needs to be closed.
//将数据封装成InputStream
InputStream messageIs = method.streamRequest(message);
//发送数据
stream.writeMessage(messageIs);
} catch (Throwable e) {
stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
return;
}
// For unary requests, we don't flush since we know that halfClose should be coming soon. This
// allows us to piggy-back the END_STREAM=true on the last message frame without opening the
// possibility of broken applications forgetting to call halfClose without noticing.
if (!unaryRequest) {
stream.flush();
}
}
}
调用DelayedStream#writeMessage发送消息:
public void writeMessage(final InputStream message) {
checkNotNull(message, "message");
if (passThrough) {
realStream.writeMessage(message);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.writeMessage(message);
}
});
}
}
注意,这里其实并没有直接发送消息,而是执行delayOrExecute方法,将所有请求放入pendingCalls队列中,后续依次发送:
private void delayOrExecute(Runnable runnable) {
synchronized (this) {
if (!passThrough) {
pendingCalls.add(runnable);
return;
}
}
runnable.run();
}
通过5.3节可以知道,创建了realStream后,即NettyClientStream后,会调用drainPendingCalls调用执行所有请求:
final void setStream(ClientStream stream) {
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called.
if (realStream != null) {
return;
}
realStream = checkNotNull(stream, "stream");
}
drainPendingCalls();
}
drainPendingCalls中,第一个task就是创建Stream,一个rpc请求对应一个stream:
public class DefaultHttp2Connection implements Http2Connection {
private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
State state = activeState(streamId, IDLE, isLocal(), halfClosed);
checkNewStreamAllowed(streamId, state);
// Create and initialize the stream.
DefaultStream stream = new DefaultStream(streamId, state);
incrementExpectedStreamId(streamId);
addStream(stream);
stream.activate();
return stream;
}
}
}
创建Stream后,通过Stream发送请求realStream.writeMessage(message);
:
writeMessage将要发送的消息writeQueue中,最后通过scheduleFlush调用flush方法,发送到服务端:
class WriteQueue {
void scheduleFlush() {
if (scheduled.compareAndSet(false, true)) {
// Add the queue to the tail of the event loop so writes will be executed immediately
// inside the event loop. Note DO NOT do channel.write outside the event loop as
// it will not wake up immediately without a flush.
channel.eventLoop().execute(later);
}
}
}
调用flush方法将请求发送给服务端:
private void flush() {
try {
QueuedCommand cmd;
int i = 0;
boolean flushedOnce = false;
while ((cmd = queue.poll()) != null) {
channel.write(cmd, cmd.promise());
if (++i == DEQUE_CHUNK_SIZE) {
i = 0;
// Flush each chunk so we are releasing buffers periodically. In theory this loop
// might never end as new events are continuously added to the queue, if we never
// flushed in that case we would be guaranteed to OOM.
channel.flush();
flushedOnce = true;
}
}
// Must flush at least once, even if there were no writes.
if (i != 0 || !flushedOnce) {
channel.flush();
}
} finally {
// Mark the write as done, if the queue is non-empty after marking trigger a new write.
scheduled.set(false);
if (!queue.isEmpty()) {
scheduleFlush();
}
}
}
5.5 发送第二次rpc请求
StreamObserver<Hello.HelloRequest> helloOKStreamObserver = helloServiceStub.sayOK(streamObserver);
helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: i'm sad").build());
helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: i'm happy").build());
helloServiceStub.sayOK
底层仍会创建ClientCallImpl对象,对象中创建一个新的流,后续两个请求会在这个流中进行发送。
5.6 客户端线程模型总结
观察Debug消息,发现sayOk的rpc方法请求的streamId是3:
sayBad的rpc请求的streamId是5:
观察到线程模型如下:
其中:grpc-default-executor-0就是用于处理sayBad的rpc请求的HTTP包的发送;grpc-default-executor-1就是用于处理sayBad的rpc请求的HTTP包的发送。它们属于同一个线程池。而grpc-default-worker-ELG-1-1、grpc-default-worker-ELG-1-2、grpc-default-worker-ELG-1-3就是NioEventLoop线程,具体活跃的线程就是grpc-default-worker-ELG-1-2。
接受服务端响应时,会信从线程池中新建立一个线程进行逻辑处理。如下所示,新建了两个线程:
其中,grpc-default-executor-2负责处理sayOk对应的rpc响应的HTTP数据包;grpc-default-executor-3负责处理sayBad对应的rpc响应的HTTP数据包。
客户端线程模型总结:
可见,对于GRPC来说,不同的rpc请求,就对应不同的流。
6. 服务端执行流程
6.1 服务端netty启动
启动服务端:
io.grpc.Server server = ServerBuilder.forPort(19999).addService(new StreamHelloServiceImpl()).build().start();
通过NettyServerBuilder构建服务端参数,通过build方法构建服务端ServerImpl对象。在给ServerImpl构造函数传参时,通过buildTransportServer创建NettyServer对象,传给ServerImpl构造函数:
public Server build() {
ServerImpl server = new ServerImpl(
this,
buildTransportServer(Collections.unmodifiableList(getTracerFactories())),
Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
notifyTarget.notifyOnBuild(server);
}
return server;
}
其中,buildTransportServer
就通过NettyServerBuilder#buildTransportServer方法创建服务端NettyServer对象:
protected NettyServer buildTransportServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
ProtocolNegotiator negotiator = protocolNegotiator;
if (negotiator == null) {
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
ProtocolNegotiators.serverPlaintext();
}
return new NettyServer(
address, channelType, bossEventLoopGroup, workerEventLoopGroup,
negotiator, streamTracerFactories, maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
}
通过ServerImpl#start方法启动grpc服务。启动包含两个部分,启动NettyServer和初始化executor:
public ServerImpl start() throws IOException {
synchronized (lock) {
checkState(!started, "Already started");
checkState(!shutdown, "Shutting down");
// Start and wait for any port to actually be bound.
transportServer.start(new ServerListenerImpl());
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
return this;
}
}
启动netty服务端的过程中,新增了childHandler,childHandler中创建NettyServerTransport对象,包装成为ServerTransportListenerImpl,后续ServerTransportListenerImpl用处很大:
class NettyServer implements InternalServer {
public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");
// If using the shared groups, get references to them.
allocateSharedGroups();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(channelType);
if (NioServerSocketChannel.class.isAssignableFrom(channelType)) {
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
}
b.childHandler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
// apply a random jitter of +/-10% to max connection age
maxConnectionAgeInNanos =
(long) ((.9D + Math.random() * .2D) * maxConnectionAgeInNanos);
}
NettyServerTransport transport =
new NettyServerTransport(
ch, protocolNegotiator, streamTracerFactories, maxStreamsPerConnection,
flowControlWindow, maxMessageSize, maxHeaderListSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
ServerTransportListener transportListener;
// This is to order callbacks on the listener, not to guard access to channel.
synchronized (NettyServer.this) {
if (channel != null && !channel.isOpen()) {
// Server already shutdown.
ch.close();
return;
}
// `channel` shutdown can race with `ch` initialization, so this is only safe to increment
// inside the lock.
eventLoopReferenceCounter.retain();
//将transport对象包装成listener,listener是ServerTransportListenerImpl
transportListener = listener.transportCreated(transport);
}
transport.start(transportListener);
ch.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
eventLoopReferenceCounter.release();
}
});
}
});
// Bind and start to accept incoming connections.
ChannelFuture future = b.bind(address);
try {
future.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted waiting for bind");
}
if (!future.isSuccess()) {
throw new IOException("Failed to bind", future.cause());
}
channel = future.channel();
}
}
在完成netty服务端的启动后,通过executorPool.getObject()
创建executor,而这个executor来自executorPool,executorPool如下所示:
private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL =
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
executorPool创建出的executor就是grpc-default-executor
线程池:
public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
new Resource<ExecutorService>() {
private static final String NAME = "grpc-default-executor";
@Override
public ExecutorService create() {
return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
}
@Override
public void close(ExecutorService instance) {
instance.shutdown();
}
@Override
public String toString() {
return NAME;
}
};
6.2 创建HTTP请求
启动完了Netty服务端,接收Netty客户端请求后,出现了grpc-default-worker-ELG-3-1线程,这个就是netty workergroup中的NioEventLoop线程:
该线程中,接受到了HTTP请求数据包后,通过DefaultHttp2FrameReader#processHeaderState进行处理,判断数据包是什么类型:
public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSizePolicy, Configuration {
private void processHeaderState(ByteBuf in) throws Http2Exception {
if (in.readableBytes() < FRAME_HEADER_LENGTH) {
// Wait until the entire frame header has been read.
return;
}
// Read the header and prepare the unmarshaller to read the frame.
payloadLength = in.readUnsignedMedium();
if (payloadLength > maxFrameSize) {
throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,
maxFrameSize);
}
frameType = in.readByte();
flags = new Http2Flags(in.readUnsignedByte());
streamId = readUnsignedInt(in);
// We have consumed the data, next time we read we will be expecting to read the frame payload.
readingHeaders = false;
switch (frameType) {
case DATA:
verifyDataFrame();
break;
case HEADERS:
verifyHeadersFrame();
break;
case PRIORITY:
verifyPriorityFrame();
break;
case RST_STREAM:
verifyRstStreamFrame();
break;
case SETTINGS:
verifySettingsFrame();
break;
case PUSH_PROMISE:
verifyPushPromiseFrame();
break;
case PING:
verifyPingFrame();
break;
case GO_AWAY:
verifyGoAwayFrame();
break;
case WINDOW_UPDATE:
verifyWindowUpdateFrame();
break;
case CONTINUATION:
verifyContinuationFrame();
break;
default:
// Unknown frame type, could be an extension.
verifyUnknownFrame();
break;
}
}
}
6.2.1 收到SETTINGS帧
调试时,发现第一个HTTP帧的类型是SETTINGS,它负责告诉服务端,使用HTTP2协议进行传输,SETTINGS类型的请求使用的streamId为0,不和rpc对应的请求的streamId共用:
后续会经历WINDOW_UPDATE等类型的帧,不太重要,不详细说。
6.2.2 收到rpc请求帧
如下,收到来自streamId为3的rpc请求的HEADERS帧:
此时,直接调用DefaultHttp2FrameReader#readHeadersFrame除了Header帧:
private void readHeadersFrame(final ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
Http2FrameListener listener) throws Http2Exception {
final int headersStreamId = streamId;
final Http2Flags headersFlags = flags;
final int padding = readPadding(payload);
verifyPadding(padding);
// The callback that is invoked is different depending on whether priority information
// is present in the headers frame.
if (flags.priorityPresent()) {
long word1 = payload.readUnsignedInt();
final boolean exclusive = (word1 & 0x80000000L) != 0;
final int streamDependency = (int) (word1 & 0x7FFFFFFFL);
if (streamDependency == streamId) {
throw streamError(streamId, PROTOCOL_ERROR, "A stream cannot depend on itself.");
}
final short weight = (short) (payload.readUnsignedByte() + 1);
final int lenToRead = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
// Create a handler that invokes the listener when the header block is complete.
headersContinuation = new HeadersContinuation() {
@Override
public int getStreamId() {
return headersStreamId;
}
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
Http2FrameListener listener) throws Http2Exception {
final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), streamDependency,
weight, exclusive, padding, headersFlags.endOfStream());
}
}
};
// Process the initial fragment, invoking the listener's callback if end of headers.
headersContinuation.processFragment(flags.endOfHeaders(), payload, lenToRead, listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
return;
}
// The priority fields are not present in the frame. Prepare a continuation that invokes
// the listener callback without priority information.
headersContinuation = new HeadersContinuation() {
@Override
public int getStreamId() {
return headersStreamId;
}
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
Http2FrameListener listener) throws Http2Exception {
final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), padding,
headersFlags.endOfStream());
}
}
};
// Process the initial fragment, invoking the listener's callback if end of headers.
int len = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
//处理Headers请求
headersContinuation.processFragment(flags.endOfHeaders(), payload, len, listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
}
最终,调用ServerImpl$ServerTransportListenerImpl#streamCreated
通过反射在异步线程grpc-default-executor-0中进行rpc对应的方法调用。:
public void streamCreated(
final ServerStream stream, final String methodName, final Metadata headers) {
if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
String encoding = headers.get(MESSAGE_ENCODING_KEY);
Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
if (decompressor == null) {
stream.close(
Status.UNIMPLEMENTED.withDescription(
String.format("Can't find decompressor for %s", encoding)),
new Metadata());
return;
}
stream.setDecompressor(decompressor);
}
final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
stream.statsTraceContext(), "statsTraceCtx not present from stream");
final Context.CancellableContext context = createContext(stream, headers, statsTraceCtx);
final Executor wrappedExecutor;
// This is a performance optimization that avoids the synchronization and queuing overhead
// that comes with SerializingExecutor.
if (executor == directExecutor()) {
wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
} else {
//创建异步线程池,用于执行rpc方法
wrappedExecutor = new SerializingExecutor(executor);
}
final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(
wrappedExecutor, executor, stream, context);
stream.setListener(jumpListener);
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued.
wrappedExecutor.execute(new ContextRunnable(context) {
@Override
public void runInContext() {
ServerStreamListener listener = NOOP_LISTENER;
try {
//通过方法名,找到对应的方法定义
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
}
if (method == null) {
Status status = Status.UNIMPLEMENTED.withDescription(
"Method not found: " + methodName);
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
// memory as a map whose key is the method name, this would allow a misbehaving
// client to blow up the server in-memory stats storage by sending large number of
// distinct unimplemented method
// names. (https://github.com/grpc/grpc-java/issues/2285)
stream.close(status, new Metadata());
context.cancel(null);
return;
}
//执行方法
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx);
} catch (RuntimeException e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
throw e;
} catch (Error e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
throw e;
} finally {
jumpListener.setListener(listener);
}
}
});
}
可以看到,streamId是3,请求的方法名是protobuf.HelloService/SayOK
:
服务端通过ServerCalls$StreamingServerCallHandler#startCall
调用sayOK方法:
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<ReqT, RespT>(call);
StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
responseObserver.freeze();
if (responseObserver.autoFlowControlEnabled) {
call.request(1);
}
return new StreamingServerCallListener(requestObserver, responseObserver, call);
}
HelloServiceGrpc$MethodHandlers#invoke
调用sayOK方法:
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_SAY_HI:
return (io.grpc.stub.StreamObserver<Req>) serviceImpl.sayHi(
(io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse>) responseObserver);
case METHODID_SAY_BAD:
return (io.grpc.stub.StreamObserver<Req>) serviceImpl.sayBad(
(io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse>) responseObserver);
case METHODID_SAY_OK:
return (io.grpc.stub.StreamObserver<Req>) serviceImpl.sayOK(
(io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse>) responseObserver);
default:
throw new AssertionError();
}
}
最后执行到自定义的sayOK方法,返回匿名内部类StreamObserver实例对象。返回StreamObserver<Hello.HelloRequest>是用于处理HelloRequest请求,调用onNext时,通过responseObserver#onNext发送响应:
public StreamObserver<Hello.HelloRequest> sayOK(StreamObserver<Hello.HelloResponse> responseObserver) {
//返回StreamObserver<Hello.HelloRequest>是用于处理HelloRequest请求,调用onNext时,通过responseObserver#onNext发送响应
return new StreamObserver<Hello.HelloRequest>() {
@Override
public void onNext(Hello.HelloRequest value) {
//收到一个请求,流式返回三个响应,这些响应在一个流里面
System.out.println("receive : " + value.getGreeting());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok1: " + value.getGreeting()).build());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok2: " + value.getGreeting()).build());
responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok3: " + value.getGreeting()).build());
}
@Override
public void onError(Throwable t) {
System.out.println("error");
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed");
responseObserver.onCompleted();
}
};
}
最终,将返回的requestObserver和responseObserver封装到StreamingServerCallListener类中,后续将StreamingServerCallListener封装到JumpToApplicationThreadServerStreamListener类中。
6.3 处理HTTP请求
6.2节中,收到RPC请求,创建了requestObserver和responseObserver以处理请求数据,并返回响应。当收到响应时,调用ServerCalls$StreamingServerCallHandler$StreamingServerCallListener#onMessage
方法,继续在grpc-default-executor-0中执行响应:
public void onMessage(ReqT request) {
requestObserver.onNext(request);
// Request delivery of the next inbound message.
if (responseObserver.autoFlowControlEnabled) {
call.request(1);
}
}
执行自定义的onNext方法,然后发送响应,通过ServerCalls$ServerCallStreamObserverImpl#onNext
方法响应:
public void onNext(RespT response) {
if (cancelled) {
throw Status.CANCELLED.asRuntimeException();
}
if (!sentHeaders) {
call.sendHeaders(new Metadata());
sentHeaders = true;
}
call.sendMessage(response);
}
然后再netty workergroup nioEventLoop通过NettyServerHandler#write发送响应帧。
request时,创建了一个新线程发送数据:
public void request(final int numMessages) {
if (channel.eventLoop().inEventLoop()) {
// Processing data read in the event loop so can call into the deframer immediately
transportState().requestMessagesFromDeframer(numMessages);
} else {
//
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
transportState().requestMessagesFromDeframer(numMessages);
}
});
}
}
6.4 服务端处理多rpc请求
不同的rpc请求对应不同的流,服务端为每个rpc请求创建一个线程处理。如下,sayBad和sayOk两个rpc请求对应两个线程:
sayBad方法对应的是streamId为5的流:
最终的执行结果如下,可以看到不同rpc的打印交错执行,说明不同流的处理是并行的:
rpc客户端-服务端线程模型如下图所示:
7. grpc代码研究心得
为了看grpc,前前后后共花费了1个月的时间。总结下遇到的坑。
- 如果要从启动代码,一步一步看调用逻辑,最终看到服务端,估计3个月都看不完。为了急功近利地快点看完,其实通过debug下,找到最核心代码,打断点,然后基本上就能很快了解grpc的执行过程了。看其他框架同理。
- 看源码,先搞懂它的原理,比如netty,HTTP2这些知识。然后调试的时候,直接看服务端、客户端启动了哪些线程,这些线程有什么用,为什么这么设计,结合前面的基础知识,就能比较清晰地了解的组件的运行原理了。