废话时刻
~又到了跟大家分享知识的时候了,过去在项目中笔者时常会使用到网络请求,okHttp可以算是android 网络请求框架中的老大哥了,当然啦,它的小弟Retrofit 也是非常值得一谈的,后面笔者也是打算粗写一篇相关的文章。现在就先跟着笔者初步了解一下Okhttp的大概工作原理。
注 : 文内源码来自当前最新 Okhttp 4.9.3版本,所以使用的是Kotlin语言,笔者也是在努力学习Kotlin,大家也要加油!
全文代码较多,笔者尽量按照逻辑给大家提供更少的代码,可以跟着笔者的注释粗读一下,有问题自己再去细读源码
Okhttp的简单使用
//这是官方给的GET的标准写法
//Java
//创建client
OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
//构建请求体
Request request = new Request.Builder()
.url(url)
.build();
//发送请求
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
//Kotlin
val client = OkHttpClient()
fun run(url : String) : String {
val request = Request.Builder()
.url(url)
.build()
try{
val response = client.newCall(request).execute()
return response?.body.string()
}catch (e : Exception){
e.printstackTrace()
}
}
当然了上述代码的运行,在项目中我们是放在了子线程或者协程中运行的。
Okhttp的核心原理
想要了解Okhttp的核心机制,我们就得从client.newCall(request).execute()这行代码开始了解
首先newCall(request)方法实际上是返回的一个按照我们需求构造的Call对象,我们通过调用该对象的execute()方法发送网络请求。
这里需要注意的是 execute方法是一个同步方法,相对应还存在一个异步的方法 : enqueue() 这个方法我们需要传入一个回调,当网络数据返回,或者请求失败时都会调用这个回调对象。但是在文章中,我们就取execute方法来讲解,因为核心原理是一致的。
这里推荐大家在看完本文章后再去看看源码 去github搜 okhttp就行。
那么现在我们就进入到execute方法看下:
#RealCall
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
在这里我们发现代码走到 client.dispatcher.executed(this) 这一步 出现了一个 dispatcher 。这是个什么东西呢? 翻译过来是一个调度器,并且这个调度器是client拥有的,那么结合上面我们使用client创建了一个Call对象去完成我们的网络请求,就可以知道这个调度器是用来管理调度网络请求的(可能有点牵强 哈哈~)那为什么需要调度器呢?因为我们的网络请求可能是大量的,并且由于存在同步和异步请求,那么我们就需要更好的去管理这些网络请求,比如说异步请求可以存在需要线程池去执行任务,但是线程池也不可能无限大,那么就可能出现异步等待处理的网络请求,这些请求我们就得想办法将它存储起来,等到有空闲的线程时我们还得将它拿出来执行。
总结起来Dispatcher的功能如下:
- 记录同步任务、异步任务及等待执行的异步任务。
- 线程池管理异步任务。
- 发起/取消网络请求API:execute、enqueue、cancel。
Dispatcher
那我们就先来了解了解Dispatcher这个类
点开Dispatcher这个类 我们会发现里面有三个双端队列
#Dispatcher
/** Ready async calls in the order they'll be run. */
//等待的异步请求队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
//正在运行的异步请求队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
//同步请求队列
private val runningSyncCalls = ArrayDeque<RealCall>()
由于源码内容可能有点多,我就给出几个比较关键的方法:
#Dispatcher
// enqueue方法 实际上在Call对象中调用的enqueue方法就是对该方法进行了一层封装,实际上是调用了该方法。
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//将对象加入到异步等待队列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//该方法会将满足条件的处在异步等待队列中的Call对象移到异步运行队列中。
promoteAndExecute()
}
#Dispacher
//executed方法 Call对象中调用execute方法后实际上会调用该方法
/** Used by [Call.execute] to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
//将Call对象加入到同步队列中
runningSyncCalls.add(call)
}
#Dispatcher
//当一次网络请求完成,调用该方法将Call对象移除
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将Call对象移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//根据在运行的Call数量是否大于0 返回Bool值
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
# Dispatcher
/**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
* @return true if the dispatcher is currently running calls.
*/
//根据上面的源码我们会发现 这个方法分别在enqueue方法和finished方法中都进行了调用
//其实该方法就是实现将异步等待队列中具备条件可运行的Call移到运行队列中的核心实现。
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
//创建临时缓存
val executableCalls = mutableListOf<AsyncCall>()
//定义返回值,与可运行的Call数量挂钩
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
//遍历ready队列将满足条件的放到running中
while (i.hasNext()) {
val asyncCall = i.next()
//如果running队列满了 直接退出该循环。
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//如果同一host的并发达到最大阈值,跳过该次循环 继续下层。
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
//将Call从ready中移除
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
//为返回值赋值
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//提交线程池任务
asyncCall.executeOn(executorService)
}
return isRunning
}
Dispatcher的相关方法咱们就介绍到这,我们继续跟着代码走:
上次是说走到了client.dispatcher.executed(this) ,我们可以从上面的源代码中看到,dispatcher.executed(this)其实就是将Call加入到了队列中。这一步也就走完了。我们开始看下一步:
getResponseWithInterceptorChain() 这个方法是RealCall.kt中的一个方法,我们来看看它的源码:
#RealCall
/**
* 该方法就是整个okhttp责任链的入口
* 并且网络数据返回后也会通过该方法最终返回为Respose类型
*/
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
//创建拦截器容器
val interceptors = mutableListOf<Interceptor>()
//添加应用拦截器
interceptors += client.interceptors
//添加重试与重定向拦截器
interceptors += RetryAndFollowUpInterceptor(client)
//添加桥接拦截器
interceptors += BridgeInterceptor(client.cookieJar)
//添加缓存拦截器
interceptors += CacheInterceptor(client.cache)
//添加连接拦截器
interceptors += ConnectInterceptor
if (!forWebSocket) {
//添加网络拦截器
interceptors += client.networkInterceptors
}
//添加请求拦截器
interceptors += CallServerInterceptor(forWebSocket)
//创建责任链
//记住这里传入的exchange的初始值为null 后面会用到
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
//调用 责任链的proceed方法
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
//返回响应体
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
各个拦截器的具体源码就不给出了,避免篇幅太长,这里给出一个拦截器的作用对应表(来自网络):
接着跟着代码走:在这个方法中 我们主要看val response = chain.proceed(originalRequest) 这行代码,在这里我们是调用了责任链的proceed方法并且传入了原始请求这么个东西,那我们就进入到这个方法看看到底发生了什么(根据chain的构造方法,我们进入到RealInterceptorChain.kt中查看方法实现):
#RealInterceptorChain
//统计当前拦截器在当前链中执行 proceed 的次数
private var calls : Int = 0
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
//还记得前面我让大家记下这个值的初始值吗?null
//这个值在执行连接拦截器前都是为null的,它是对请求流的一个封装
if (exchange != null) {
//当请求流不为null时,我们需要检查Url或者端口是否被改变,如果被改变直接报错,因为在建立连接后它不允许被改变
check(exchange.finder.routePlanner.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
//连接拦截器及其后续拦截器只能执行一次proceed方法
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// Call the next interceptor in the chain.
//创建下层新的责任链
val next = copy(index = index + 1, request = request)
//取当前拦截器
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
//执行当前拦截器的拦截方法,传入新的责任链
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
//确保连接拦截器及后面的拦截器(除最后一个拦截器外)必须执行一次
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
return response
}
这里不给出拦截器内部的实现了,但是可以大概说一下拦截器内部做了什么
由于每个拦截器的功能不同,所以我就说一下它们的公共行为,除了最后一个拦截器外,它们内部会在最后调用下一个责任链的proceed方法,以确保整个责任链运行起来。最后在请求拦截器发送请求得到响应后,response会根据回溯一层一层回到最初的地方。
到这里其实数据的获取就已经走完了,但是整个请求并未结束,我们知道在RealCall的execute方法中如果没出现不可抗因素最后一定会执行client.dispatcher.finished(this),该方法我们在前面介绍Dispatcher时就已经给出了,实际上就是将已经完成的Call从队列中移除,然后再调用一次promoteAndExecute()方法。不过根据请求时调用的方法不同 中间调用的方法会有所不同,但最终会回到我们上面给出的方法中。
这里给出调用 enqueue方法和execute方法时调用的不同的finished方法:
/** Used by [AsyncCall.run] to signal completion. */
//异步方法
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** Used by [Call.execute] to signal completion. */
//同步方法
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
我们发现确实最终是调用的finished方法,只不过传入的队列不同罢了。
好了整个核心流程走完了,但是想要进一步了解okhttp那么我们就必须得去了解它的各个拦截器中的原理,后面也是会出续集。大家敬请期待!
参考
okhttp原理
okhttp-github