0
点赞
收藏
分享

微信扫一扫

Okhttp核心流程概述

芝婵 2022-04-13 阅读 54
android

废话时刻

~又到了跟大家分享知识的时候了,过去在项目中笔者时常会使用到网络请求,okHttp可以算是android 网络请求框架中的老大哥了,当然啦,它的小弟Retrofit 也是非常值得一谈的,后面笔者也是打算粗写一篇相关的文章。现在就先跟着笔者初步了解一下Okhttp的大概工作原理。

注 : 文内源码来自当前最新 Okhttp 4.9.3版本,所以使用的是Kotlin语言,笔者也是在努力学习Kotlin,大家也要加油!

全文代码较多,笔者尽量按照逻辑给大家提供更少的代码,可以跟着笔者的注释粗读一下,有问题自己再去细读源码

Okhttp的简单使用

//这是官方给的GET的标准写法
//Java

//创建client
OkHttpClient client = new OkHttpClient();

String run(String url) throws IOException {
//构建请求体
  Request request = new Request.Builder()
      .url(url)
      .build();
//发送请求
  try (Response response = client.newCall(request).execute()) {
    return response.body().string();
  }
}


//Kotlin
val client = OkHttpClient()

fun run(url : String) : String {
    val request = Request.Builder()
                         .url(url)
                         .build()
     try{
         val response = client.newCall(request).execute()
         return response?.body.string()
     }catch (e : Exception){
         e.printstackTrace()
     }
}

当然了上述代码的运行,在项目中我们是放在了子线程或者协程中运行的。

Okhttp的核心原理

想要了解Okhttp的核心机制,我们就得从client.newCall(request).execute()这行代码开始了解

首先newCall(request)方法实际上是返回的一个按照我们需求构造的Call对象,我们通过调用该对象的execute()方法发送网络请求。

这里需要注意的是 execute方法是一个同步方法,相对应还存在一个异步的方法 : enqueue() 这个方法我们需要传入一个回调,当网络数据返回,或者请求失败时都会调用这个回调对象。但是在文章中,我们就取execute方法来讲解,因为核心原理是一致的。

这里推荐大家在看完本文章后再去看看源码 去github搜 okhttp就行。

那么现在我们就进入到execute方法看下:

#RealCall
override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

在这里我们发现代码走到 client.dispatcher.executed(this) 这一步 出现了一个 dispatcher 。这是个什么东西呢? 翻译过来是一个调度器,并且这个调度器是client拥有的,那么结合上面我们使用client创建了一个Call对象去完成我们的网络请求,就可以知道这个调度器是用来管理调度网络请求的(可能有点牵强 哈哈~)那为什么需要调度器呢?因为我们的网络请求可能是大量的,并且由于存在同步和异步请求,那么我们就需要更好的去管理这些网络请求,比如说异步请求可以存在需要线程池去执行任务,但是线程池也不可能无限大,那么就可能出现异步等待处理的网络请求,这些请求我们就得想办法将它存储起来,等到有空闲的线程时我们还得将它拿出来执行。

总结起来Dispatcher的功能如下:

  • 记录同步任务、异步任务及等待执行的异步任务。
  • 线程池管理异步任务。
  • 发起/取消网络请求API:execute、enqueue、cancel。

Dispatcher

那我们就先来了解了解Dispatcher这个类

点开Dispatcher这个类 我们会发现里面有三个双端队列

  #Dispatcher
  /** Ready async calls in the order they'll be run. */
  //等待的异步请求队列
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  //正在运行的异步请求队列
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  //同步请求队列
  private val runningSyncCalls = ArrayDeque<RealCall>()

由于源码内容可能有点多,我就给出几个比较关键的方法:

#Dispatcher
// enqueue方法  实际上在Call对象中调用的enqueue方法就是对该方法进行了一层封装,实际上是调用了该方法。
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      //将对象加入到异步等待队列中
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    //该方法会将满足条件的处在异步等待队列中的Call对象移到异步运行队列中。
    promoteAndExecute()
  }
 #Dispacher
 //executed方法  Call对象中调用execute方法后实际上会调用该方法
 /** Used by [Call.execute] to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    //将Call对象加入到同步队列中
    runningSyncCalls.add(call)
  }
#Dispatcher
//当一次网络请求完成,调用该方法将Call对象移除
private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
    //将Call对象移除
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }
    //根据在运行的Call数量是否大于0 返回Bool值
    val isRunning = promoteAndExecute()
  
    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }
 # Dispatcher
 /**
   * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
   * executor service. Must not be called with synchronization because executing calls can call
   * into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
   //根据上面的源码我们会发现 这个方法分别在enqueue方法和finished方法中都进行了调用
   //其实该方法就是实现将异步等待队列中具备条件可运行的Call移到运行队列中的核心实现。
  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()
    //创建临时缓存
    val executableCalls = mutableListOf<AsyncCall>()
    //定义返回值,与可运行的Call数量挂钩
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      //遍历ready队列将满足条件的放到running中
      while (i.hasNext()) {
        val asyncCall = i.next()
        //如果running队列满了 直接退出该循环。
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        //如果同一host的并发达到最大阈值,跳过该次循环 继续下层。        
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
        //将Call从ready中移除
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
       //为返回值赋值
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      //提交线程池任务
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

Dispatcher的相关方法咱们就介绍到这,我们继续跟着代码走:

上次是说走到了client.dispatcher.executed(this) ,我们可以从上面的源代码中看到,dispatcher.executed(this)其实就是将Call加入到了队列中。这一步也就走完了。我们开始看下一步:

getResponseWithInterceptorChain() 这个方法是RealCall.kt中的一个方法,我们来看看它的源码:

#RealCall
/**
* 该方法就是整个okhttp责任链的入口
* 并且网络数据返回后也会通过该方法最终返回为Respose类型
*/
internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    //创建拦截器容器
    val interceptors = mutableListOf<Interceptor>()
    //添加应用拦截器
    interceptors += client.interceptors
    //添加重试与重定向拦截器
    interceptors += RetryAndFollowUpInterceptor(client)
    //添加桥接拦截器
    interceptors += BridgeInterceptor(client.cookieJar)
    //添加缓存拦截器
    interceptors += CacheInterceptor(client.cache)
    //添加连接拦截器
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
    //添加网络拦截器
      interceptors += client.networkInterceptors
    }
    //添加请求拦截器
    interceptors += CallServerInterceptor(forWebSocket)
    //创建责任链
    //记住这里传入的exchange的初始值为null 后面会用到
    val chain = RealInterceptorChain(
      call = this,
      interceptors = interceptors,
      index = 0,
      exchange = null,
      request = originalRequest,
      connectTimeoutMillis = client.connectTimeoutMillis,
      readTimeoutMillis = client.readTimeoutMillis,
      writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
    //调用 责任链的proceed方法
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      //返回响应体
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

各个拦截器的具体源码就不给出了,避免篇幅太长,这里给出一个拦截器的作用对应表(来自网络):

请添加图片描述

接着跟着代码走:在这个方法中 我们主要看val response = chain.proceed(originalRequest) 这行代码,在这里我们是调用了责任链的proceed方法并且传入了原始请求这么个东西,那我们就进入到这个方法看看到底发生了什么(根据chain的构造方法,我们进入到RealInterceptorChain.kt中查看方法实现):

#RealInterceptorChain

//统计当前拦截器在当前链中执行 proceed 的次数
private var calls : Int = 0

@Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    //还记得前面我让大家记下这个值的初始值吗?null
    //这个值在执行连接拦截器前都是为null的,它是对请求流的一个封装
    if (exchange != null) {
      //当请求流不为null时,我们需要检查Url或者端口是否被改变,如果被改变直接报错,因为在建立连接后它不允许被改变
      check(exchange.finder.routePlanner.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      //连接拦截器及其后续拦截器只能执行一次proceed方法
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // Call the next interceptor in the chain.
    //创建下层新的责任链
    val next = copy(index = index + 1, request = request)
    //取当前拦截器
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    //执行当前拦截器的拦截方法,传入新的责任链
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")
    //确保连接拦截器及后面的拦截器(除最后一个拦截器外)必须执行一次
    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    return response
  }

这里不给出拦截器内部的实现了,但是可以大概说一下拦截器内部做了什么

由于每个拦截器的功能不同,所以我就说一下它们的公共行为,除了最后一个拦截器外,它们内部会在最后调用下一个责任链的proceed方法,以确保整个责任链运行起来。最后在请求拦截器发送请求得到响应后,response会根据回溯一层一层回到最初的地方。

到这里其实数据的获取就已经走完了,但是整个请求并未结束,我们知道在RealCall的execute方法中如果没出现不可抗因素最后一定会执行client.dispatcher.finished(this),该方法我们在前面介绍Dispatcher时就已经给出了,实际上就是将已经完成的Call从队列中移除,然后再调用一次promoteAndExecute()方法。不过根据请求时调用的方法不同 中间调用的方法会有所不同,但最终会回到我们上面给出的方法中。

这里给出调用 enqueue方法和execute方法时调用的不同的finished方法:

 /** Used by [AsyncCall.run] to signal completion. */
 //异步方法
  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

  /** Used by [Call.execute] to signal completion. */
  //同步方法
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

我们发现确实最终是调用的finished方法,只不过传入的队列不同罢了。

好了整个核心流程走完了,但是想要进一步了解okhttp那么我们就必须得去了解它的各个拦截器中的原理,后面也是会出续集。大家敬请期待!

参考

okhttp原理
okhttp-github

举报

相关推荐

0 条评论