1. 源码分析
1.1 整体执行流程:
1.1.1 同步和异步请求
/** 同步请求 */
public void syncCall() {
//1.创建OkHttpClient
OkHttpClient okHttp = new OkHttpClient.Builder().build();
//2.创建request
Request request = new Request.Builder().url("http://www.baidu.com/").build();
//3.得到RealCall
Call call = okHttp.newCall(request);
try {
//4.执行同步请求,获取response,会阻塞当前执行线程。
Response response = call.execute();
ResponseBody body = response.body();
if (body != null) {
String result = body.toString();
Log.i("OkHttp","syncCall:"+result);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/** 异步请求 */
public void asyncCall() {
//1.获取OkhttpClient
OkHttpClient okHttp = new OkHttpClient.Builder().build();
//2.构建reqest对象
Request request = new Request.Builder().url("http://www.baidu.com/").build();
//3.获取RealCall
Call call = okHttp.newCall(request);
//4.执行异步请求
call.enqueue(new Callback() {
@Override
public void onFailure(@NonNull Call call, @NonNull IOException e) {
//在子线程中执行
}
@Override
public void onResponse(@NonNull Call call, @NonNull Response response) throws IOException {
//在子线程中执行
}
});
}
同步和异步请求中,1,2,3步都是一样的,最后调用RealCall的execute执行同步请求,调用enqueue执行异步请求
代码中的1和2都是通过Builder模式,构建出对象,没什么好说的。
newCall()方法生成了一个RealCall
/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
1.1.1 同步请求RealCall.execute()方法
override fun execute(): Response {
......
try {
//1. 调用dispatcher的executed,将当前call加入同步执行队列
client.dispatcher.executed(this)
//2. 调用拦截器链获取Response
return getResponseWithInterceptorChain()
} finally {
//3. 执行结束后的从同步执行队列中移除,并将等待执行的异步队列添加到正在执行到异步队列中执行
client.dispatcher.finished(this)
}
}
上边代码中2是调用的核心方法,放在最后分析。1和3都是Dispatcher中到方法,Dispatcher是调度去,主要维护了同步异步的任务队列。client.dispathcer是中OkHttpClient构造时传入的。
它的execute方法逻辑很简单,就是将当前call添加到同步执行队列中
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
finish()代码如下
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
//1. 从队列中移除
synchronized(this) {
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//2. 任务结束后执行触发异步队列执行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
promoteAndExecute主要操作到是异步队列,放在异步请求的分析中。
1.1.2 异步请求 RealCall.enqueue()
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
//先构建了一个AysncCall,再执行dispatcher的enqueue方法
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
AsyncCall是RealCall的内部类,集成自Runnable,其run方法代码如下:
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//1. 在线程池中执行拦截器链接,获取response
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 2.执行回调
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
......
} catch (t: Throwable) {
......
} finally {
// 3. 执行结束的触发
client.dispatcher.finished(this)
}
}
}
无论的异步还是同步请求,调用dispatcher的finish()方法后,最终都会调用到promoteAndExecute函数
dispatcher.enqueue()源码如下:
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//1.先加入异步等待队列
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
// 2.如果有域名相同的请求,则共享之前请求的计数器
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//3. 将等待队列加入到异步执行队列中执行,直到异步队列已满。
promoteAndExecute()
}
promoteAndExecute函数
/**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
//1.循环遍历异步等待队列,将其中任务加入到正在执行的异步队列中,知道异步执行队列满了。
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
//2.将可以执行的请求放到线程池上执行
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//这里调用AsyncCall的执行方法
asyncCall.executeOn(executorService)
}
return isRunning
}
1.1.3 RealCall的getResponseWithInterceptorChain()
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
//1.将所有的拦截器串成拦截器链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
//2. 调用proceed方法执行请求
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
各类拦截器组成拦截器链,顺序执行各个拦截器,最终获取Response.
RealInterceptorChain.proceed()方法源码
@Throws(IOException::class)
override fun proceed(request: Request): Response {
······
// Call the next interceptor in the chain.
//1.构造下一个拦截器链,next的proceed方法是在interceptor.intercept里边调用的。
val next = copy(index = index + 1, request = request)
//2. 获取当前要执行的拦截器
val interceptor = interceptors[index]
//3. 执行intercept方法获取response,intercept方法里边会构建一个新的RealInterceptorChain,并调用其procced方法,以此来实现责任链
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
······
return response
}
1.2 拦截器
1.2.1 RetryAndFollowUpInterceptor
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
//1. 这行代码初始化一个ExchangeFinder,用来找connection的
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
//2. 调用拦截器链的procced方法
response = realChain.proceed(request)
//注意这里newExchangeFinder变量赋值为true了,也就是如果第一次成功的话,后续注释1那里不会再重新new一个ExchangeFinder。
newExchangeFinder = true
} catch (e: RouteException) {
//如果不是可恢复的异常,则直接抛出异常
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
//如果不是可恢复的异常,则直接抛出异常
// An attempt to communicate with a server failed. The request may have been sent.
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
//根据不同的responsecode重新构建request对象
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
//超过最新重试次数(20次)则直接抛出异常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
//进入下一个循环继续执行
request = followUp
priorResponse = response
} finally {
//call的interceptorScopedExchange置空
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
/**
* Figures out the HTTP request to make in response to receiving [userResponse]. This will
* either add authentication headers, follow redirects or handle a client request timeout. If a
* follow-up is either unnecessary or not applicable, this returns null.
*/
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
val route = exchange?.connection?.route()
val responseCode = userResponse.code
val method = userResponse.request.method
when (responseCode) {
HTTP_PROXY_AUTH -> {
val selectedProxy = route!!.proxy
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
}
return client.proxyAuthenticator.authenticate(route, userResponse)
}
HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
return buildRedirectRequest(userResponse, method)
}
HTTP_CLIENT_TIMEOUT -> {
// 408's are rare in practice, but some servers like HAProxy use this response code. The
// spec says that we may repeat the request without modifications. Modern browsers also
// repeat the request (even non-idempotent ones.)
if (!client.retryOnConnectionFailure) {
// The application layer has directed us not to retry the request.
return null
}
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
// We attempted to retry and got another timeout. Give up.
return null
}
if (retryAfter(userResponse, 0) > 0) {
return null
}
return userResponse.request
}
HTTP_UNAVAILABLE -> {
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
// We attempted to retry and got another timeout. Give up.
return null
}
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
// specifically received an instruction to retry without delay
return userResponse.request
}
return null
}
HTTP_MISDIRECTED_REQUEST -> {
// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See
// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then
// we can retry on a different connection.
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
if (exchange == null || !exchange.isCoalescedConnection) {
return null
}
exchange.connection.noCoalescedConnections()
return userResponse.request
}
else -> return null
}
}
总结:RetryAndFollowUpInterceptor的作用主要是做失败重试,它最大的重试次数是20次,对于特定的异常会做恢复重试,对于响应成功后的一些特定的responseCode会决定是要构建新的Request对象重试,还是之前返回null,结束重试。
1.2.2 BridgeInterceptor
/**
* Bridges from application code to network code. First it builds a network request from a user
* request. Then it proceeds to call the network. Finally it builds a user response from the network
* response.
*/
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
//1.根据用户的请求构建一个新的requestBuilder,用于完善请求头信息
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
//2.1设置contentType和Content-Length
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
//2.2 设置Host
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
//2.3 设置Keep-Alive 保持连接
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
//2.4 设置支持gzip
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
//2.5 设置cookie相关
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
//2.6 设置User-Agent
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
//3. 构建新的request,调用procced方法
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
//4. response的body做gzip解压缩处理
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
/** Returns a 'Cookie' HTTP request header with all cookies, like `a=b; c=d`. */
private fun cookieHeader(cookies: List<Cookie>): String = buildString {
cookies.forEachIndexed { index, cookie ->
if (index > 0) append("; ")
append(cookie.name).append('=').append(cookie.value)
}
}
}
总结:BridgeInterceptor的作用,在源码类注释里边写的很清楚了,它是应用代码到网络代码到桥梁,主要处理:
- 1.根据用户传入到request对象,补充完善一些请求头信息,构建一个新到request;
- 2.将这个新构建到reuqest对象发出去;
- 3.最终对返回对response做一些处理,主要是Gzip解压缩,构建一个新对response
1.2.3 CacheInterceptor
TODO
1.2.4 ConnectInterceptor
/**
* Opens a connection to the target server and proceeds to the next interceptor. The network might
* be used for the returned response, or to validate a cached response with a conditional GET.
*/
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
//1.这里主要是找到一个可以服用到socket链接,或创建一个新到链接,为下一步真正执行网络请求做准备。
val exchange = realChain.call.initExchange(chain)
//InterceptorChain一般都是上一个InterceptorChain创建下一个,这里因为要传入exchange,而InterceptorChain但所有成员都是val不可变但,所以通过复制重新创建了一个。
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
RealCall.initExchage(chain)源码如下:
#RealCall
internal fun initExchange(chain: RealInterceptorChain): Exchange {
······
val exchangeFinder = this.exchangeFinder!!
// 找到一个可用的链接,并使用该连接构造ExchangeCodec对象
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
......
return result
}
ExchangeFinder的find方法源码如下:
#ExchangeFinder
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
//1. 找到一个可用的连接
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
//2. 构建ExchangeCodec对象
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
#ExchangeFinder
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
//1.找到一个候选连接
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Confirm that the connection is good.
//2.确认是否可用,可用则直接返回
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
// If it isn't, take it out of the pool.
//3. 不可用则从连接池中移除
candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy.
if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue
throw IOException("exhausted all routes")
}
}
#ExchangeFinder
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*
* This checks for cancellation before each blocking operation.
*/
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
// 1.如果当前call已经又connection则判断这个connection是否可用重用,RetryAndFellow过程中,可能会多次调用,connection可能已经设置过
// Attempt to reuse the connection from the call.
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
//连接是否不能在用了,或者call里边的connection重定向后新的地址的域名和的端口不一样了,如果是则释放之前的连接
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if (call.connection != null) {
//连接可用则直接返回
check(toClose == null)
return callConnection
}
//关闭连接
// The call's connection was released.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
//2.从连接池中获取一个新连接
// Attempt to get a connection from the pool.
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
//3.1 连接池中没有找到匹配的连接,则收集路由信息,构建新的connection
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
//耗时
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
//这时再检查下连接池是否有已经连接好的可用的连接,如果有则复用,合并连接
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
//3.2 创建新的connection,并connect到服务器
// Connect. Tell the call about the connecting call so async cancels work.
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
//3.3 再次判断连接池中是否还有可用到连接,如果有则合并连接,将新创建到connection关闭,返回池中到连接
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
//3.4 走到这里,将新创建的连接,加入连接池中,赋给当前call,并返回。
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
RealConnectionPool
RealCall中的ExchangeFinder是在RetryAndFollowUpInterceptor中调用**call.enterNetworkInterceptorExchange(request, newExchangeFinder)**时创建的。
代码如下:
# RealCall
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
······
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}
其中传入的connectionPool是RealCall的成员变量
# RealCall
private val connectionPool: RealConnectionPool = client.connectionPool.delegate
#OkHttpClient
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool
#OkHttpClient.Builder
internal var connectionPool: ConnectionPool = ConnectionPool()
/**
* Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
* share the same [Address] may share a [Connection]. This class implements the policy
* of which connections to keep open for future use.
*
* @constructor Create a new connection pool with tuning parameters appropriate for a single-user
* application. The tuning parameters in this pool are subject to change in future OkHttp releases.
* Currently this pool holds up to 5 idle connections which will be evicted after 5 minutes of
* inactivity.
*/
class ConnectionPool internal constructor(
internal val delegate: RealConnectionPool
) {
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit
))
constructor() : this(5, 5, TimeUnit.MINUTES)
/** Returns the number of idle connections in the pool. */
fun idleConnectionCount(): Int = delegate.idleConnectionCount()
/** Returns total number of connections in the pool. */
fun connectionCount(): Int = delegate.connectionCount()
/** Close and remove all idle connections in the pool. */
fun evictAll() {
delegate.evictAll()
}
}
(存)进入连接池
ConnectPool真正调用的是RealConnectionPool,从上边可用看出,连接池最大的空闲连接是5个,最大空闲时间是5分钟。
上一小节中,新创建的连接会放入连接池中,看下它的put方法源码:
# RealConnectionPool
fun put(connection: RealConnection) {
connection.assertThreadHoldsLock()
//1.放入连接池队列(ConcurrentLinkedQueue)中
connections.add(connection)
//2.添加新的清理任务
cleanupQueue.schedule(cleanupTask)
}
(删)移出连接池
有三个场景会触发清理连接池操作
场景一
上边的2中会启动一个清理任务,任务最终执行了RealConnectPool的cleanup方法
# RealConnectionPool
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
override fun runOnce() = cleanup(System.nanoTime())
}
······
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
*
* Returns the duration in nanoseconds to sleep until the next scheduled call to this method.
* Returns -1 if no further cleanups are required.
*/
fun cleanup(now: Long): Long {
var inUseConnectionCount = 0
var idleConnectionCount = 0
var longestIdleConnection: RealConnection? = null
var longestIdleDurationNs = Long.MIN_VALUE
//1.找到空闲时间最长的connection
// Find either a connection to evict, or the time that the next eviction is due.
for (connection in connections) {
synchronized(connection) {
// If the connection is in use, keep searching.
//重新计算connection上执行call的数量
if (pruneAndGetAllocationCount(connection, now) > 0) {
//每个connection都维护了一个在这个连接上执行的Call的弱引用队列,如果队列size> 0,则说明正在执行
inUseConnectionCount++
} else {
//没有正在执行的连接,则空闲连接数+1;
idleConnectionCount++
// If the connection is ready to be evicted, we're done.
val idleDurationNs = now - connection.idleAtNs
if (idleDurationNs > longestIdleDurationNs) {
//得到空闲时间最长的连接,及其空闲时间
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
} else {
Unit
}
}
}
}
when {
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
//最长的空闲连接的空闲时长大于设定的值(默认5分钟)或者空闲连接数大于设定的值(默认5个),就移除该连接
// We've chosen a connection to evict. Confirm it's still okay to be evict, then close it.
val connection = longestIdleConnection!!
synchronized(connection) {
if (connection.calls.isNotEmpty()) return 0L // No longer idle.
if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest.
connection.noNewExchanges = true
connections.remove(longestIdleConnection)
}
connection.socket().closeQuietly()
if (connections.isEmpty()) cleanupQueue.cancelAll()
// Clean up again immediately.
return 0L
}
idleConnectionCount > 0 -> {
//如果空闲连接时间和空闲连接数都没有超限,就计算下一次需要执行清理任务的时间间隔
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs
}
inUseConnectionCount > 0 -> {
//如果所有连接都在使用,则等下一个最长默认空闲时间后,再执行清理任务
// All connections are in use. It'll be at least the keep alive duration 'til we run
// again.
return keepAliveDurationNs
}
else -> {
// 空闲连接和在用连接都为0,说明连接池为空,也没有任务执行,不用再执行清理任务。
// No connections, idle or in use.
return -1
}
}
}
# RealConnectionPool
/**
* Prunes any leaked calls and then returns the number of remaining live calls on [connection].
* Calls are leaked if the connection is tracking them but the application code has abandoned
* them. Leak detection is imprecise and relies on garbage collection.
*/
private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
connection.assertThreadHoldsLock()
val references = connection.calls
var i = 0
while (i < references.size) {
val reference = references[i]
if (reference.get() != null) { //1.不为空,说明Call还则执行中
i++
continue
}
// We've discovered a leaked call. This is an application bug.
val callReference = reference as CallReference
//2. CallReference集成自弱引用
val message = "A connection to ${connection.route().address.url} was leaked. " +
"Did you forget to close a response body?"
Platform.get().logCloseableLeak(message, callReference.callStackTrace)
//3. 从引用队列中移除
references.removeAt(i)
connection.noNewExchanges = true
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
//最后一个call,这样计算idleAtNs,会导致该连接立即执行移除连接池。
connection.idleAtNs = now - keepAliveDurationNs
return 0
}
}
return references.size
}
#RealCall
internal class CallReference(
referent: RealCall,
/**
* Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
* identifying the origin of connection leaks.
*/
val callStackTrace: Any?
) : WeakReference<RealCall>(referent)
加入连接池后,会立即执行清理连接池的操作,执行规则如下:
- 如果连接池中的空闲连接数或连接的最大空闲超限,就将空闲时间最长的那个连接移除
- 不满足1,如果空闲连接数大于0,则根据得到的连接的最大空闲时间,计算下一次需要执行清理操作的时间间隔;
- 如果所有的连接都中使用,就等默认的最大空闲时间之后再执行清理任务
- 如果连接池为空,则返回-1,不再安排执行清理操作
场景二
# RealConnectionPool
/**
* Notify this pool that [connection] has become idle. Returns true if the connection has been
* removed from the pool and should be closed.
*/
fun connectionBecameIdle(connection: RealConnection): Boolean {
connection.assertThreadHoldsLock()
return if (connection.noNewExchanges || maxIdleConnections == 0) {
connection.noNewExchanges = true
connections.remove(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
true
} else {
cleanupQueue.schedule(cleanupTask)
false
}
}
connectionBecameIdle()也会导致执行cleanupTask或将连接移出连接池,该方法被RealCall的releaseConnectionNoEvents()调用。
场景三
# RealConnectionPool
fun evictAll() {
val i = connections.iterator()
while (i.hasNext()) {
val connection = i.next()
val socketToClose = synchronized(connection) {
if (connection.calls.isEmpty()) {
i.remove()
connection.noNewExchanges = true
return@synchronized connection.socket()
} else {
return@synchronized null
}
}
socketToClose?.closeQuietly()
}
if (connections.isEmpty()) cleanupQueue.cancelAll()
}
主动调用 okHttp.connectionPool().evictAll()也会清理连接池中的所有连接。
(取)从连接池中查找对应的连接
# RealConnectionPool
/**
* Attempts to acquire a recycled connection to [address] for [call]. Returns true if a connection
* was acquired.
*
* If [routes] is non-null these are the resolved routes (ie. IP addresses) for the connection.
* This is used to coalesce related domains to the same HTTP/2 connection, such as `square.com`
* and `square.ca`.
*/
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
for (connection in connections) {
synchronized(connection) {
if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
if (!connection.isEligible(address, routes)) return@synchronized
call.acquireConnectionNoEvents(connection)
return true
}
}
return false
}
这个方法,主要是ExchangeFinder的findConnection()方法中执行,里边有三处调用。
总结
ConnectInterceptor的主要工作,是为下一步真正请求网络,找到一个合适的RealConnection,根据这个RealConnect创建一个ExchageCodec,再用这个ExchageCodec和ExchangeFinder最终构建一个新的Exchange对象,供下一个拦截链使用。核心流程是ExchangeFinder中的findConnection方法,它查找soket的流程如下:
- 看当前的RealCall里边持有的RealConnect是否可用,可用则返回,不可用进入2.
- 从RealConnectionPool中找到一个合适的RealConnect,找到就讲这个连接返回,并赋值给当前的RealCall对象,找不到则进入3
- 收集Route信息,构建一个新的RealConnect,并调用它的connect方法,连接成功后,再次从ConnectionPool中查找对应的url有没有已经创建好的连接,如果有则将新创建的RealConnect关闭,复用从连接池中得到的RealConnect(连接合并),并返回,没有则进入4.
- 将3中创建的连接,放入RealConnectionPool中,并返回该连接。
1.2.5 CallServerInterceptor
/** This is the last interceptor in the chain. It makes a network call to the server. */
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
//1.1 写入Request的header信息
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
//不是HEAD或GET请求
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
//flush请求
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
// 1.2 完成Request
exchange.finishRequest()
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
//2.1 读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake()) //握手
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
//2.2 响应头结束
exchange.responseHeadersEnd(response)
// 2.3 生成最终的response
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
}
总结:CallServerIntercpter主要负责利用ConnectionIntercepter生成好的Exchange来执行发送请求和读取响应的操作。
参考资料
你想要的系列:网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点
OkHttp 4源码(4)— 连接机制分析
2. http请求头压缩
基本思路
- 在请求头添加
header("Content-Encoding", "gzip")
- 服务器ng收到请求后,查看是否有Content-Encoding字段,有则启用请求正文解压