OkHttp源码解析
资料
OkHttp源码深度解析
OkHttp源码走心解析(很细 很长)
整体结果
拦截器(责任链模式)
- RetryAndFollowUpInterceptor – 失败和重定向拦截器
- BridgeInterceptor – 封装request和response拦截器
- CacheInterceptor – 缓存相关的过滤器,负责读取缓存直接返回、更新缓存
- ConnectInterceptor – 连接服务,负责和服务器建立连接 这才是真正的请求网络
- CallServerInterceptor – 执行流操作(写出请求体、获得响应数据)负责向服务器发送请求数据、从服务器读取响应数据 进行http请求报文的封装与请求 报文的解析
RetryAndFollowUpInterceptor
从上图中可以看到,RetryAndFollowUpInterceptor开启了一个 while(true)的循环,并在循环内部完成两个重要的判定,如图中的蓝色框:
- 当请求内部抛出异常时,判定是否需要重试
- 当响应结果是3xx重定向时,构建新的请求并发送请求
重试的逻辑相对复杂,有如下判断的判定逻辑(具体代码是RetryAndFollowUpInterceptor类的recover方法)
- 规则1:client的retryOnConnectionFailure参数设置为false,不进行重试
- 规则2:请求的body已经发出,不进行重试
- 规则3:特殊的异常类型不进行重试(如ProtocolException、SSLHandshakeExceptio等)
- 规则4:没有更多的route(包含proxy和inetaddress),不进行重试
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
/**
* Report and attempt to recover from a failure to communicate with a server. Returns true if
* `e` is recoverable, or false if the failure is permanent. Requests with a body can only
* be recovered if the body is buffered or if the failure occurred before the request has been
* sent.
*/
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
// The application layer has forbidden retries.
if (!client.retryOnConnectionFailure) return false
// We can't send the request body again.
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// This exception is fatal.
if (!isRecoverable(e, requestSendStarted)) return false
// No more routes to attempt.
if (!call.retryAfterFailure()) return false
// For failure recovery, use the same route selector with a new connection.
return true
}
private fun requestIsOneShot(e: IOException, userRequest: Request): Boolean {
val requestBody = userRequest.body
return (requestBody != null && requestBody.isOneShot()) ||
e is FileNotFoundException
}
private fun isRecoverable(e: IOException, requestSendStarted: Boolean): Boolean {
// If there was a protocol problem, don't recover.
if (e is ProtocolException) {
return false
}
// If there was an interruption don't recover, but if there was a timeout connecting to a route
// we should try the next route (if there is one).
if (e is InterruptedIOException) {
return e is SocketTimeoutException && !requestSendStarted
}
// Look for known client-side or negotiation errors that are unlikely to be fixed by trying
// again with a different route.
if (e is SSLHandshakeException) {
// If the problem was a CertificateException from the X509TrustManager,
// do not retry.
if (e.cause is CertificateException) {
return false
}
}
if (e is SSLPeerUnverifiedException) {
// e.g. a certificate pinning error.
return false
}
// An example of one we might want to retry with a different route is a problem connecting to a
// proxy and would manifest as a standard IOException. Unless it is one we know we should not
// retry, we return true and try a new route.
return true
}
/**
* Figures out the HTTP request to make in response to receiving [userResponse]. This will
* either add authentication headers, follow redirects or handle a client request timeout. If a
* follow-up is either unnecessary or not applicable, this returns null.
*/
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
val route = exchange?.connection?.route()
val responseCode = userResponse.code
val method = userResponse.request.method
when (responseCode) {
HTTP_PROXY_AUTH -> {
val selectedProxy = route!!.proxy
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
}
return client.proxyAuthenticator.authenticate(route, userResponse)
}
HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
return buildRedirectRequest(userResponse, method)
}
HTTP_CLIENT_TIMEOUT -> {
// 408's are rare in practice, but some servers like HAProxy use this response code. The
// spec says that we may repeat the request without modifications. Modern browsers also
// repeat the request (even non-idempotent ones.)
if (!client.retryOnConnectionFailure) {
// The application layer has directed us not to retry the request.
return null
}
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
// We attempted to retry and got another timeout. Give up.
return null
}
if (retryAfter(userResponse, 0) > 0) {
return null
}
return userResponse.request
}
HTTP_UNAVAILABLE -> {
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
// We attempted to retry and got another timeout. Give up.
return null
}
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
// specifically received an instruction to retry without delay
return userResponse.request
}
return null
}
HTTP_MISDIRECTED_REQUEST -> {
// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See
// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then
// we can retry on a different connection.
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
if (exchange == null || !exchange.isCoalescedConnection) {
return null
}
exchange.connection.noCoalescedConnections()
return userResponse.request
}
else -> return null
}
}
private fun buildRedirectRequest(userResponse: Response, method: String): Request? {
// Does the client allow redirects?
if (!client.followRedirects) return null
val location = userResponse.header("Location") ?: return null
// Don't follow redirects to unsupported protocols.
val url = userResponse.request.url.resolve(location) ?: return null
// If configured, don't follow redirects between SSL and non-SSL.
val sameScheme = url.scheme == userResponse.request.url.scheme
if (!sameScheme && !client.followSslRedirects) return null
// Most redirects don't include a request body.
val requestBuilder = userResponse.request.newBuilder()
if (HttpMethod.permitsRequestBody(method)) {
val responseCode = userResponse.code
val maintainBody = HttpMethod.redirectsWithBody(method) ||
responseCode == HTTP_PERM_REDIRECT ||
responseCode == HTTP_TEMP_REDIRECT
if (HttpMethod.redirectsToGet(method) && responseCode != HTTP_PERM_REDIRECT && responseCode != HTTP_TEMP_REDIRECT) {
requestBuilder.method("GET", null)
} else {
val requestBody = if (maintainBody) userResponse.request.body else null
requestBuilder.method(method, requestBody)
}
if (!maintainBody) {
requestBuilder.removeHeader("Transfer-Encoding")
requestBuilder.removeHeader("Content-Length")
requestBuilder.removeHeader("Content-Type")
}
}
// When redirecting across hosts, drop all authentication headers. This
// is potentially annoying to the application layer since they have no
// way to retain them.
if (!userResponse.request.url.canReuseConnectionFor(url)) {
requestBuilder.removeHeader("Authorization")
}
return requestBuilder.url(url).build()
}
private fun retryAfter(userResponse: Response, defaultDelay: Int): Int {
val header = userResponse.header("Retry-After") ?: return defaultDelay
// https://tools.ietf.org/html/rfc7231#section-7.1.3
// currently ignores a HTTP-date, and assumes any non int 0 is a delay
if (header.matches("\\d+".toRegex())) {
return Integer.valueOf(header)
}
return Integer.MAX_VALUE
}
companion object {
/**
* How many redirects and auth challenges should we attempt? Chrome follows 21 redirects; Firefox,
* curl, and wget follow 20; Safari follows 16; and HTTP/1.0 recommends 5.
*/
private const val MAX_FOLLOW_UPS = 20
}
}
Interceptor和NetworkInterceptors的区别
BridgeInterceptor
CacheInterceptor
ConnectInterceptor
CallServerInterceptor
整体架构
使用方法
private final OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://github.com")
.build()
// 同步请求
Response response = client.newCall(request).execute();
// todo handle response
//异步请求
cient.newCall(request).enqueue(new Callback(){
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException{
}
});
OkHttpClient
一些配置参数
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
constructor() : this(Builder())
class Builder constructor() {
//调度器
internal var dispatcher: Dispatcher = Dispatcher()
//连接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//整体流程拦截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络流程拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//流程监听器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
//连接失败时是否重连
internal var retryOnConnectionFailure = true
//服务器认证设置
internal var authenticator: Authenticator = Authenticator.NONE
//是否重定向
internal var followRedirects = true
//是否从HTTP重定向到HTTPS
internal var followSslRedirects = true
//cookie设置
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
//缓存设置
internal var cache: Cache? = null
//DNS设置
internal var dns: Dns = Dns.SYSTEM
//代理设置
internal var proxy: Proxy? = null
//代理选择器设置
internal var proxySelector: ProxySelector? = null
//代理服务器认证设置
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
//socket配置
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
//https socket配置
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
//协议
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
//域名校验
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
//请求超时
internal var callTimeout = 0
//连接超时
internal var connectTimeout = 10_000
//读取超时
internal var readTimeout = 10_000
//写入超时
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
···省略代码···
Request
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
internal val tags: Map<Class<*>, Any>
) {
open class Builder {
//请求的URL
internal var url: HttpUrl? = null
//请求方法,如:GET、POST..
internal var method: String
//请求头
internal var headers: Headers.Builder
//请求体
internal var body: RequestBody? = null
···省略代码···
Call
interface Call : Cloneable {
/** 返回发起此调用的原始请求 */
fun request(): Request
/**
* 同步请求,立即执行。
*
* 抛出两种异常:
* 1. 请求失败抛出IOException;
* 2. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/
@Throws(IOException::class)
fun execute(): Response
/**
* 异步请求,将请求安排在将来的某个时间点执行。
* 如果在执行过一回的前提下再次执行抛出IllegalStateException */
fun enqueue(responseCallback: Callback)
/** 取消请求。已经完成的请求不能被取消 */
fun cancel()
/** 是否已被执行 */
fun isExecuted(): Boolean
/** 是否被取消 */
fun isCanceled(): Boolean
/** 一个完整Call请求流程的超时时间配置,默认选自[OkHttpClient.Builder.callTimeout] */
fun timeout(): Timeout
/** 克隆这个call,创建一个新的相同的Call */
public override fun clone(): Call
/** 利用工厂模式来让 OkHttpClient 来创建 Call对象 */
fun interface Factory {
fun newCall(request: Request): Call
}
}
RealCall
OkHttpClient.kt
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
AsyncCall
inner class AsyncCall(
//用户传入的响应回调方法
private val responseCallback: Callback
) : Runnable {
//同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
···省略代码···
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//调用线程池执行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//请求失败,调用调度器finish方法
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//请求成功,获取到服务器返回的response
val response = getResponseWithInterceptorChain()
signalledCallback = true
//调用 Callback.onResponse() 方法,将 response 传递出去
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
//请求出现异常,调用cancel方法来取消请求
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
//请求结束,调用调度器finish方法
client.dispatcher.finished(this)
}
}
}
}
Dispatcher
class Dispatcher constructor() {
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
//创建一个缓存线程池,来处理请求调用
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** 已准备好的异步请求队列 */
@get:Synchronized
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在运行的异步请求队列, 包含取消但是还未finish的AsyncCall */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在运行的同步请求队列, 包含取消但是还未finish的RealCall */
private val runningSyncCalls = ArrayDeque<RealCall>()
···省略代码···
}
总结
对象 | 作用 |
---|---|
Call | 请求调用接口,表示这个请求已经准备好可以执行,也可以被取消,只能执行一次 |
RealCall | Call接口的具体实现类,是应用于网络层之间的连接桥,包含OkHttp于Requst信息 |
AsyncCall | 异步请求调用,其实就是个Runnable,会被放到线程池中进行处理。 |
Dispatcher | 调度器,用来调度Call对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall对象。 |
Request | 请求类,包含url、method、headers、body。 |
Response | 网络层返回的响应数据。 |
Callback | 响应回调函数接口,包含onFailure、onResponse 两个方法。 |
同步请求
RealCall.kt
override fun execute(): Response {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//请求超时开始计时
timeout.enter()
//开启请求监听
callStart()
try {
//调用调度器中的 executed() 方法,调度器只是将 call 加入到了runningSyncCalls队列中
client.dispatcher.executed(this)
//调用getResponseWithInterceptorChain 方法拿到 response
return getResponseWithInterceptorChain()
} finally {
//执行完毕,调度器将该 call 从 runningSyncCalls队列中移除
client.dispatcher.finished(this)
}
}
异步请求
RealCall.kt
override fun enqueue(responseCallback: Callback) {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//开启请求监听
callStart()
//新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
Dispatcher.kt
internal fun enqueue(call: AsyncCall) {
//加锁,保证线程安全
synchronized(this) {
//将该请求调用加入到 readyAsyncCalls 队列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
//通过域名来查找有没有相同域名的请求,有则复用。
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//执行请求
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
//判断是否有请求正在执行
val isRunning: Boolean
//加锁,保证线程安全
synchronized(this) {
//遍历 readyAsyncCalls 队列
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//runningAsyncCalls 的数量不能大于最大并发请求数 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//同域名最大请求数5,同一个域名最多允许5条线程同时执行请求
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
//从 readyAsyncCalls 队列中移除,并加入到 executableCalls 及 runningAsyncCalls 队列中
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
//通过运行队列中的请求数量来判断是否有请求正在执行
isRunning = runningCallsCount() > 0
}
//遍历可执行队列,调用线程池来执行AsyncCall
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
获取Response
internal fun getResponseWithInterceptorChain(): Response {
//拦截器列表
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
//构建拦截器责任链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
//如果call请求完成,那就意味着交互完成了,没有更多的东西来交换了
var calledNoMoreExchanges = false
try {
//执行拦截器责任链来获取 response
val response = chain.proceed(originalRequest)
//如果被取消,关闭响应,抛出异常
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
Interceptor
fun interface Interceptor {
/** 拦截方法 */
@Throws(IOException::class)
fun intercept(chain: Chain): Response
interface Chain {
/** 原始请求数据 */
fun request(): Request
/** 核心方法,处理请求,获取response */
@Throws(IOException::class)
fun proceed(request: Request): Response
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}
RealInterceptorChain
class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
···省略代码···
private var calls: Int = 0
override fun call(): Call = call
override fun request(): Request = request
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
//index+1, 复制创建新的责任链,也就意味着调用责任链中的下一个处理者,也就是下一个拦截器
val next = copy(index = index + 1, request = request)
//取出当前拦截器
val interceptor = interceptors[index]
//执行当前拦截器的拦截方法
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
}
拦截器
- client.interceptors:这是由开发者设置的,会在所有的拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义header、自定义log等等。
- RetryAndFollowUpInterceptor:这里会对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。
- BridgeInterceptor:是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应。
- CacheInterceptor:这里主要是缓存的相关处理,会根据用户在OkHttpClient里定义的缓存配置,然后结合请求新建一个缓存策略,由它来判断是使用网络还是缓存来构建response。
- ConnectInterceptor:这里主要就是负责建立连接,会建立TCP连接或者TLS连接。
- client.networkInterceptors:这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。
- CallServerInterceptor:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,将请求头与请求体发送给服务器,以及解析服务器返回的response。
client.interceptors
class HeaderInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request().newBuilder()
.addHeader("device-android", "xxxxxxxxxxx")
.addHeader("country-code", "ZH")
.build();
return chain.proceed(request);
}
}
//然后在 OkHttpClient 中加入
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.cookieJar(new MyCookieJar())
.addInterceptor(new HeaderInterceptor())//添加自定义Header拦截器
.build();
RetryAndFollowUpInterceptor
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
//这里会新建一个ExchangeFinder,ConnectInterceptor会使用到
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
//尝试通过路由连接失败。该请求将不会被发送。
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
//尝试与服务器通信失败。该请求可能已发送。
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
// Attach the prior response if it exists. Such responses never have a body.
//尝试关联上一个response,注意:body是为null
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
//会根据 responseCode 来判断,构建一个新的request并返回来重试或者重定向
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
//如果请求体是一次性的,不需要再次重试
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
//最大重试次数,不同的浏览器是不同的,比如:Chrome为21,Safari则是16
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
/** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
//客户端禁止重试
if (!client.retryOnConnectionFailure) return false
//不能再次发送该请求体
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
//发生的异常是致命的,无法恢复,如:ProtocolException
if (!isRecoverable(e, requestSendStarted)) return false
//没有更多的路由来尝试重连
if (!call.retryAfterFailure()) return false
// 对于失败恢复,使用带有新连接的相同路由选择器
return true
}
···省略代码···
BridgeInterceptor
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
//获取原始请求数据
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
//重新构建请求头,请求体信息
val body = userRequest.body
val contentType = body.contentType()
requestBuilder.header("Content-Type", contentType.toString())
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.header("Host", userRequest.url.toHostHeader())
requestBuilder.header("Connection", "Keep-Alive")
···省略代码···
//添加cookie
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
//添加user-agent
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
//重新构建一个Request,然后执行下一个拦截器来处理该请求
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
//创建一个新的responseBuilder,目的是将原始请求数据构建到response中
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
//修改response header信息,移除Content-Encoding,Content-Length信息
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
//修改response body信息
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
···省略代码···
CacheInterceptor
class CacheInterceptor(internal val cache: Cache?) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
//通过request从OkHttpClient.cache中获取缓存
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//创建一个缓存策略,用来确定怎么使用缓存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
//为空表示不使用网络,反之,则表示使用网络
val networkRequest = strategy.networkRequest
//为空表示不使用缓存,反之,则表示使用缓存
val cacheResponse = strategy.cacheResponse
//追踪网络与缓存的使用情况
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
//有缓存但不适用,关闭它
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}
//如果网络被禁止,但是缓存又是空的,构建一个code为504的response,并返回
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
//如果我们禁用了网络不使用网络,且有缓存,直接根据缓存内容构建并返回response
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
//为缓存添加监听
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
//责任链往下处理,从服务器返回response 赋值给 networkResponse
networkResponse = chain.proceed(networkRequest)
} finally {
//捕获I/O或其他异常,请求失败,networkResponse为空,且有缓存的时候,不暴露缓存内容。
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
//如果有缓存
if (cacheResponse != null) {
//且网络返回response code为304的时候,使用缓存内容新构建一个Response返回。
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
//否则关闭缓存响应体
cacheResponse.body?.closeQuietly()
}
}
//构建网络请求的response
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
//如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络请求response存到cache中
if (cache != null) {
//根据response的code,header以及CacheControl.noStore来判断是否可以缓存
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 将该response存入缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
//根据请求方法来判断缓存是否有效,只对Get请求进行缓存,其它方法的请求则移除
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
//缓存无效,将该请求缓存从client缓存配置中移除
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
···省略代码···
ConnectInterceptor
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
//初始化一个exchange对象
val exchange = realChain.call.initExchange(chain)
//根据这个exchange对象来复制创建一个新的连接责任链
val connectedChain = realChain.copy(exchange = exchange)
//执行该连接责任链
return connectedChain.proceed(realChain.request)
}
}
拦截方法也就只有三步:
- 初始化一个exchange对象
- 然后根据这个exchange对象来复制创建一个新的连接责任链
- 执行该连接责任链
那这个exchange对象又是什么呢?
RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...省略代码...
//这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创建的
val exchangeFinder = this.exchangeFinder!!
//返回一个ExchangeCodec(是个编码器,为request编码以及为response解码)
val codec = exchangeFinder.find(client, chain)
//根据exchangeFinder与codec新构建一个Exchange对象,并返回
val result = Exchange(this, eventListener, exchangeFinder, codec)
...省略代码...
return result
}
具体看看ExchangeFinder.find()这一步,
ExchangeFinder.kt
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
//查找合格可用的连接,返回一个 RealConnection 对象
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
//根据连接,创建并返回一个请求响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,分别对应Http1协议与Http2协议
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
继续往下看findHealthyConnection方法
ExchangeFinder.kt
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
//重点:查找连接
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
//检查该连接是否合格可用,合格则直接返回该连接
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
//如果该连接不合格,标记为不可用,从连接池中移除
candidate.noNewExchanges()
...省略代码...
}
}
所以核心方法就是findConnection,我们继续深入看看该方法:
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
//第一次,尝试重连 call 中的 connection,不需要去重新获取连接
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
//如果 call 中的 connection 还没有释放,就重用它。
if (call.connection != null) {
check(toClose == null)
return callConnection
}
//如果 call 中的 connection 已经被释放,关闭Socket.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
//需要一个新的连接,所以重置一些状态
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
//第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
//连接池中是空的,准备下次尝试连接的路由
val routes: List<Route>?
val route: Route
...省略代码...
//第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
//第四次,手动创建一个新连接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
//第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。
//这一步主要是为了校验一下,比如已经有了一条连接了,就可以直接复用,而不用使用手动创建的新连接。
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
//将手动创建的新连接放入连接池
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
在代码中可以看出,一共做了5次尝试得到连接:
- 第一次,尝试重连call中的connection,不需要重新获取连接
- 第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用
- 第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用
- 第四次,手动创建一个新连接。
- 第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。
CallServerInterceptor
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
try {
//写入请求头
exchange.writeRequestHeaders(request)
//如果不是GET请求,并且请求体不为空
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
//当请求头为"Expect: 100-continue"时,在发送请求体之前需要等待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。
//POST请求,先发送请求头,在获取到100继续状态后继续发送请求体
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
//刷新请求,即发送请求头
exchange.flushRequest()
//解析响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
//写入请求体
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
//如果请求体是双公体,就先发送请求头,稍后在发送请求体
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
//写入请求体
requestBody.writeTo(bufferedRequestBody)
} else {
//如果获取到了"Expect: 100-continue"响应,写入请求体
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
···省略代码···
//请求结束,发送请求体
exchange.finishRequest()
···省略代码···
try {
if (responseBuilder == null) {
//读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
···省略代码···
//构建一个response
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
···省略代码···
return response
···省略代码···