0
点赞
收藏
分享

微信扫一扫

OkHttpClient源码分析(基于4.9.3)

田妞的读书笔记 2022-04-27 阅读 17

1. 源码分析

1.1 整体执行流程:

1.1.1 同步和异步请求

    /** 同步请求 */
    public void syncCall() {
        //1.创建OkHttpClient
        OkHttpClient okHttp = new OkHttpClient.Builder().build();
        //2.创建request
        Request request = new Request.Builder().url("http://www.baidu.com/").build();
        //3.得到RealCall
        Call call = okHttp.newCall(request);
        try {
            //4.执行同步请求,获取response,会阻塞当前执行线程。
            Response response = call.execute();
            ResponseBody body = response.body();
            if (body != null) {
                String result = body.toString();
                Log.i("OkHttp","syncCall:"+result);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
/** 异步请求 */
    public void asyncCall() {
        //1.获取OkhttpClient
        OkHttpClient okHttp = new OkHttpClient.Builder().build();
        //2.构建reqest对象
        Request request = new Request.Builder().url("http://www.baidu.com/").build();
        //3.获取RealCall
        Call call = okHttp.newCall(request);
        //4.执行异步请求
        call.enqueue(new Callback() {
            @Override
            public void onFailure(@NonNull Call call, @NonNull IOException e) {
                //在子线程中执行
            }

            @Override
            public void onResponse(@NonNull Call call, @NonNull Response response) throws IOException {
                //在子线程中执行
            }
        });
    }

同步和异步请求中,1,2,3步都是一样的,最后调用RealCall的execute执行同步请求,调用enqueue执行异步请求

代码中的1和2都是通过Builder模式,构建出对象,没什么好说的。

newCall()方法生成了一个RealCall

  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

1.1.1 同步请求RealCall.execute()方法

  override fun execute(): Response {
   ......
    try {
        //1. 调用dispatcher的executed,将当前call加入同步执行队列
      client.dispatcher.executed(this)
        //2. 调用拦截器链获取Response
      return getResponseWithInterceptorChain()
    } finally {
        //3. 执行结束后的从同步执行队列中移除,并将等待执行的异步队列添加到正在执行到异步队列中执行
      client.dispatcher.finished(this)
    }
  }

上边代码中2是调用的核心方法,放在最后分析。1和3都是Dispatcher中到方法,Dispatcher是调度去,主要维护了同步异步的任务队列。client.dispathcer是中OkHttpClient构造时传入的。
它的execute方法逻辑很简单,就是将当前call添加到同步执行队列中

  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }

finish()代码如下

internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    //1. 从队列中移除
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }
    //2. 任务结束后执行触发异步队列执行
    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

promoteAndExecute主要操作到是异步队列,放在异步请求的分析中。

1.1.2 异步请求 RealCall.enqueue()

  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    //先构建了一个AysncCall,再执行dispatcher的enqueue方法
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

AsyncCall是RealCall的内部类,集成自Runnable,其run方法代码如下:

override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          //1. 在线程池中执行拦截器链接,获取response
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          // 2.执行回调
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          ......
        } catch (t: Throwable) {
          ......
        } finally {
        // 3. 执行结束的触发
          client.dispatcher.finished(this)
        }
      }
    }

无论的异步还是同步请求,调用dispatcher的finish()方法后,最终都会调用到promoteAndExecute函数

dispatcher.enqueue()源码如下:

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        //1.先加入异步等待队列
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      // 2.如果有域名相同的请求,则共享之前请求的计数器
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    //3. 将等待队列加入到异步执行队列中执行,直到异步队列已满。
    promoteAndExecute()
  }

promoteAndExecute函数

/**
   * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
   * executor service. Must not be called with synchronization because executing calls can call
   * into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()
    //1.循环遍历异步等待队列,将其中任务加入到正在执行的异步队列中,知道异步执行队列满了。
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
    //2.将可以执行的请求放到线程池上执行
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      //这里调用AsyncCall的执行方法
      asyncCall.executeOn(executorService)
    }
    
    return isRunning
  }

1.1.3 RealCall的getResponseWithInterceptorChain()

@Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)
    //1.将所有的拦截器串成拦截器链
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
        //2. 调用proceed方法执行请求
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

各类拦截器组成拦截器链,顺序执行各个拦截器,最终获取Response.
RealInterceptorChain.proceed()方法源码

@Throws(IOException::class)
  override fun proceed(request: Request): Response {
    ······
    // Call the next interceptor in the chain.
    //1.构造下一个拦截器链,next的proceed方法是在interceptor.intercept里边调用的。
    val next = copy(index = index + 1, request = request)
    //2. 获取当前要执行的拦截器
    val interceptor = interceptors[index]

    //3. 执行intercept方法获取response,intercept方法里边会构建一个新的RealInterceptorChain,并调用其procced方法,以此来实现责任链
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")
    ······
    return response
  }

1.2 拦截器

1.2.1 RetryAndFollowUpInterceptor

@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    var request = chain.request
    val call = realChain.call
    var followUpCount = 0
    var priorResponse: Response? = null
    var newExchangeFinder = true
    var recoveredFailures = listOf<IOException>()
    while (true) {
      //1. 这行代码初始化一个ExchangeFinder,用来找connection的
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)

      var response: Response
      var closeActiveExchange = true
      try {
        if (call.isCanceled()) {
          throw IOException("Canceled")
        }

        try {
            //2. 调用拦截器链的procced方法
          response = realChain.proceed(request)
          //注意这里newExchangeFinder变量赋值为true了,也就是如果第一次成功的话,后续注释1那里不会再重新new一个ExchangeFinder。
          newExchangeFinder = true
        } catch (e: RouteException) {
            //如果不是可恢复的异常,则直接抛出异常
          // The attempt to connect via a route failed. The request will not have been sent.
          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
         //如果不是可恢复的异常,则直接抛出异常
          // An attempt to communicate with a server failed. The request may have been sent.
          if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
            throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          newExchangeFinder = false
          continue
        }

        // Attach the prior response if it exists. Such responses never have a body.
        if (priorResponse != null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        val exchange = call.interceptorScopedExchange
        //根据不同的responsecode重新构建request对象
        val followUp = followUpRequest(response, exchange)

        if (followUp == null) {
          if (exchange != null && exchange.isDuplex) {
            call.timeoutEarlyExit()
          }
          closeActiveExchange = false
          return response
        }

        val followUpBody = followUp.body
        if (followUpBody != null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          return response
        }

        response.body?.closeQuietly()
        //超过最新重试次数(20次)则直接抛出异常
        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }
        //进入下一个循环继续执行
        request = followUp
        priorResponse = response
      } finally {
        //call的interceptorScopedExchange置空
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }
  }
/**
   * Figures out the HTTP request to make in response to receiving [userResponse]. This will
   * either add authentication headers, follow redirects or handle a client request timeout. If a
   * follow-up is either unnecessary or not applicable, this returns null.
   */
  @Throws(IOException::class)
  private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
    val route = exchange?.connection?.route()
    val responseCode = userResponse.code

    val method = userResponse.request.method
    when (responseCode) {
      HTTP_PROXY_AUTH -> {
        val selectedProxy = route!!.proxy
        if (selectedProxy.type() != Proxy.Type.HTTP) {
          throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
        }
        return client.proxyAuthenticator.authenticate(route, userResponse)
      }

      HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)

      HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
        return buildRedirectRequest(userResponse, method)
      }

      HTTP_CLIENT_TIMEOUT -> {
        // 408's are rare in practice, but some servers like HAProxy use this response code. The
        // spec says that we may repeat the request without modifications. Modern browsers also
        // repeat the request (even non-idempotent ones.)
        if (!client.retryOnConnectionFailure) {
          // The application layer has directed us not to retry the request.
          return null
        }

        val requestBody = userResponse.request.body
        if (requestBody != null && requestBody.isOneShot()) {
          return null
        }
        val priorResponse = userResponse.priorResponse
        if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
          // We attempted to retry and got another timeout. Give up.
          return null
        }

        if (retryAfter(userResponse, 0) > 0) {
          return null
        }

        return userResponse.request
      }

      HTTP_UNAVAILABLE -> {
        val priorResponse = userResponse.priorResponse
        if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
          // We attempted to retry and got another timeout. Give up.
          return null
        }

        if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
          // specifically received an instruction to retry without delay
          return userResponse.request
        }

        return null
      }

      HTTP_MISDIRECTED_REQUEST -> {
        // OkHttp can coalesce HTTP/2 connections even if the domain names are different. See
        // RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then
        // we can retry on a different connection.
        val requestBody = userResponse.request.body
        if (requestBody != null && requestBody.isOneShot()) {
          return null
        }

        if (exchange == null || !exchange.isCoalescedConnection) {
          return null
        }

        exchange.connection.noCoalescedConnections()
        return userResponse.request
      }

      else -> return null
    }
  }

总结:RetryAndFollowUpInterceptor的作用主要是做失败重试,它最大的重试次数是20次,对于特定的异常会做恢复重试,对于响应成功后的一些特定的responseCode会决定是要构建新的Request对象重试,还是之前返回null,结束重试。

1.2.2 BridgeInterceptor

/**
 * Bridges from application code to network code. First it builds a network request from a user
 * request. Then it proceeds to call the network. Finally it builds a user response from the network
 * response.
 */
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    //1.根据用户的请求构建一个新的requestBuilder,用于完善请求头信息
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    if (body != null) {
    //2.1设置contentType和Content-Length
      val contentType = body.contentType()
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }
    //2.2 设置Host
    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    //2.3 设置Keep-Alive 保持连接
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    //2.4 设置支持gzip
    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }
    
    //2.5 设置cookie相关
    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }
    //2.6 设置User-Agent
    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }
    
    //3. 构建新的request,调用procced方法
    val networkResponse = chain.proceed(requestBuilder.build())

    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        //4. response的body做gzip解压缩处理
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
  }

  /** Returns a 'Cookie' HTTP request header with all cookies, like `a=b; c=d`. */
  private fun cookieHeader(cookies: List<Cookie>): String = buildString {
    cookies.forEachIndexed { index, cookie ->
      if (index > 0) append("; ")
      append(cookie.name).append('=').append(cookie.value)
    }
  }
}

总结:BridgeInterceptor的作用,在源码类注释里边写的很清楚了,它是应用代码到网络代码到桥梁,主要处理:

  • 1.根据用户传入到request对象,补充完善一些请求头信息,构建一个新到request;
  • 2.将这个新构建到reuqest对象发出去;
  • 3.最终对返回对response做一些处理,主要是Gzip解压缩,构建一个新对response

1.2.3 CacheInterceptor

TODO

1.2.4 ConnectInterceptor


/**
 * Opens a connection to the target server and proceeds to the next interceptor. The network might
 * be used for the returned response, or to validate a cached response with a conditional GET.
 */
object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //1.这里主要是找到一个可以服用到socket链接,或创建一个新到链接,为下一步真正执行网络请求做准备。
    val exchange = realChain.call.initExchange(chain)
    //InterceptorChain一般都是上一个InterceptorChain创建下一个,这里因为要传入exchange,而InterceptorChain但所有成员都是val不可变但,所以通过复制重新创建了一个。
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}

RealCall.initExchage(chain)源码如下:

#RealCall

  internal fun initExchange(chain: RealInterceptorChain): Exchange {
        ······
    val exchangeFinder = this.exchangeFinder!!
    // 找到一个可用的链接,并使用该连接构造ExchangeCodec对象
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    ......
    return result
  }

ExchangeFinder的find方法源码如下:

#ExchangeFinder

 fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
        //1. 找到一个可用的连接
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
        //2. 构建ExchangeCodec对象
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }
#ExchangeFinder

/**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  @Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {  
     //1.找到一个候选连接
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // Confirm that the connection is good.
      //2.确认是否可用,可用则直接返回
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }

      // If it isn't, take it out of the pool.
      //3. 不可用则从连接池中移除
      candidate.noNewExchanges()

      // Make sure we have some routes left to try. One example where we may exhaust all the routes
      // would happen if we made a new connection and it immediately is detected as unhealthy.
      if (nextRouteToTry != null) continue

      val routesLeft = routeSelection?.hasNext() ?: true
      if (routesLeft) continue

      val routesSelectionLeft = routeSelector?.hasNext() ?: true
      if (routesSelectionLeft) continue

      throw IOException("exhausted all routes")
    }
  }
#ExchangeFinder
/**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   *
   * This checks for cancellation before each blocking operation.
   */
  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    // 1.如果当前call已经又connection则判断这个connection是否可用重用,RetryAndFellow过程中,可能会多次调用,connection可能已经设置过
    // Attempt to reuse the connection from the call.
    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        //连接是否不能在用了,或者call里边的connection重定向后新的地址的域名和的端口不一样了,如果是则释放之前的连接
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

      // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
      // because we already acquired it.
      if (call.connection != null) {
      //连接可用则直接返回
        check(toClose == null)
        return callConnection
      }

        //关闭连接
      // The call's connection was released.
      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    // We need a new connection. Give it fresh stats.
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    //2.从连接池中获取一个新连接
    // Attempt to get a connection from the pool.
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }
    //3.1 连接池中没有找到匹配的连接,则收集路由信息,构建新的connection
    // Nothing in the pool. Figure out what route we'll try next.
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      // Use a route from a preceding coalesced connection.
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      // Use a route from an existing route selection.
      routes = null
      route = routeSelection!!.next()
    } else {
      // Compute a new route selection. This is a blocking operation!
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      //耗时
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")
      //这时再检查下连接池是否有已经连接好的可用的连接,如果有则复用,合并连接
      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. We have a better chance of matching thanks to connection coalescing.
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }
    
    //3.2 创建新的connection,并connect到服务器
    // Connect. Tell the call about the connecting call so async cancels work.
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())
    
    //3.3 再次判断连接池中是否还有可用到连接,如果有则合并连接,将新创建到connection关闭,返回池中到连接
    // If we raced another call connecting to this host, coalesce the connections. This makes for 3
    // different lookups in the connection pool!
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }
    //3.4 走到这里,将新创建的连接,加入连接池中,赋给当前call,并返回。
    synchronized(newConnection) {
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }

RealConnectionPool

RealCall中的ExchangeFinder是在RetryAndFollowUpInterceptor中调用**call.enterNetworkInterceptorExchange(request, newExchangeFinder)**时创建的。
代码如下:

# RealCall

  fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    ······
    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

其中传入的connectionPool是RealCall的成员变量

# RealCall 

private val connectionPool: RealConnectionPool = client.connectionPool.delegate
#OkHttpClient
  @get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool
#OkHttpClient.Builder

internal var connectionPool: ConnectionPool = ConnectionPool()
/**
 * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
 * share the same [Address] may share a [Connection]. This class implements the policy
 * of which connections to keep open for future use.
 *
 * @constructor Create a new connection pool with tuning parameters appropriate for a single-user
 * application. The tuning parameters in this pool are subject to change in future OkHttp releases.
 * Currently this pool holds up to 5 idle connections which will be evicted after 5 minutes of
 * inactivity.
 */
class ConnectionPool internal constructor(
  internal val delegate: RealConnectionPool
) {
  constructor(
    maxIdleConnections: Int,
    keepAliveDuration: Long,
    timeUnit: TimeUnit
  ) : this(RealConnectionPool(
      taskRunner = TaskRunner.INSTANCE,
      maxIdleConnections = maxIdleConnections,
      keepAliveDuration = keepAliveDuration,
      timeUnit = timeUnit
  ))

  constructor() : this(5, 5, TimeUnit.MINUTES)

  /** Returns the number of idle connections in the pool. */
  fun idleConnectionCount(): Int = delegate.idleConnectionCount()

  /** Returns total number of connections in the pool. */
  fun connectionCount(): Int = delegate.connectionCount()

  /** Close and remove all idle connections in the pool. */
  fun evictAll() {
    delegate.evictAll()
  }
}
(存)进入连接池

ConnectPool真正调用的是RealConnectionPool,从上边可用看出,连接池最大的空闲连接是5个,最大空闲时间是5分钟。
上一小节中,新创建的连接会放入连接池中,看下它的put方法源码:

# RealConnectionPool
  fun put(connection: RealConnection) {
    connection.assertThreadHoldsLock()
    //1.放入连接池队列(ConcurrentLinkedQueue)中
    connections.add(connection)
    //2.添加新的清理任务
    cleanupQueue.schedule(cleanupTask)
  }
(删)移出连接池

有三个场景会触发清理连接池操作

场景一

上边的2中会启动一个清理任务,任务最终执行了RealConnectPool的cleanup方法

# RealConnectionPool

 private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
    override fun runOnce() = cleanup(System.nanoTime())
  }
  
  ······
  
/**
   * Performs maintenance on this pool, evicting the connection that has been idle the longest if
   * either it has exceeded the keep alive limit or the idle connections limit.
   *
   * Returns the duration in nanoseconds to sleep until the next scheduled call to this method.
   * Returns -1 if no further cleanups are required.
   */
  fun cleanup(now: Long): Long {
    var inUseConnectionCount = 0
    var idleConnectionCount = 0
    var longestIdleConnection: RealConnection? = null
    var longestIdleDurationNs = Long.MIN_VALUE

    //1.找到空闲时间最长的connection
    // Find either a connection to evict, or the time that the next eviction is due.
    for (connection in connections) {
      synchronized(connection) {
        // If the connection is in use, keep searching.
        //重新计算connection上执行call的数量
        if (pruneAndGetAllocationCount(connection, now) > 0) {
            //每个connection都维护了一个在这个连接上执行的Call的弱引用队列,如果队列size> 0,则说明正在执行
          inUseConnectionCount++
        } else {
        //没有正在执行的连接,则空闲连接数+1;
          idleConnectionCount++

          // If the connection is ready to be evicted, we're done.
          val idleDurationNs = now - connection.idleAtNs
          if (idleDurationNs > longestIdleDurationNs) {
            //得到空闲时间最长的连接,及其空闲时间
            longestIdleDurationNs = idleDurationNs
            longestIdleConnection = connection
          } else {
            Unit
          }
        }
      }
    }

    when {
      longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections -> {
          //最长的空闲连接的空闲时长大于设定的值(默认5分钟)或者空闲连接数大于设定的值(默认5个),就移除该连接
        // We've chosen a connection to evict. Confirm it's still okay to be evict, then close it.
        val connection = longestIdleConnection!!
        synchronized(connection) {
          if (connection.calls.isNotEmpty()) return 0L // No longer idle.
          if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest.
          connection.noNewExchanges = true
          connections.remove(longestIdleConnection)
        }

        connection.socket().closeQuietly()
        if (connections.isEmpty()) cleanupQueue.cancelAll()

        // Clean up again immediately.
        return 0L
      }

      idleConnectionCount > 0 -> {
        //如果空闲连接时间和空闲连接数都没有超限,就计算下一次需要执行清理任务的时间间隔
        // A connection will be ready to evict soon.
        return keepAliveDurationNs - longestIdleDurationNs
      }

      inUseConnectionCount > 0 -> {
      //如果所有连接都在使用,则等下一个最长默认空闲时间后,再执行清理任务
        // All connections are in use. It'll be at least the keep alive duration 'til we run
        // again.
        return keepAliveDurationNs
      }

      else -> {
        // 空闲连接和在用连接都为0,说明连接池为空,也没有任务执行,不用再执行清理任务。
        // No connections, idle or in use.
        return -1
      }
    }
  }
# RealConnectionPool

  /**
   * Prunes any leaked calls and then returns the number of remaining live calls on [connection].
   * Calls are leaked if the connection is tracking them but the application code has abandoned
   * them. Leak detection is imprecise and relies on garbage collection.
   */
  private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
    connection.assertThreadHoldsLock()

    val references = connection.calls
    var i = 0
    while (i < references.size) {
      val reference = references[i]
      
      if (reference.get() != null) { //1.不为空,说明Call还则执行中
        i++
        continue
      }

      // We've discovered a leaked call. This is an application bug.
      val callReference = reference as CallReference
      //2. CallReference集成自弱引用
      val message = "A connection to ${connection.route().address.url} was leaked. " +
          "Did you forget to close a response body?"
      Platform.get().logCloseableLeak(message, callReference.callStackTrace)
      //3. 从引用队列中移除
      references.removeAt(i)
      connection.noNewExchanges = true

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        //最后一个call,这样计算idleAtNs,会导致该连接立即执行移除连接池。
        connection.idleAtNs = now - keepAliveDurationNs
        return 0
      }
    }

    return references.size
  }
#RealCall

 internal class CallReference(
    referent: RealCall,
    /**
     * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
     * identifying the origin of connection leaks.
     */
    val callStackTrace: Any?
  ) : WeakReference<RealCall>(referent)

加入连接池后,会立即执行清理连接池的操作,执行规则如下:

  1. 如果连接池中的空闲连接数或连接的最大空闲超限,就将空闲时间最长的那个连接移除
  2. 不满足1,如果空闲连接数大于0,则根据得到的连接的最大空闲时间,计算下一次需要执行清理操作的时间间隔;
  3. 如果所有的连接都中使用,就等默认的最大空闲时间之后再执行清理任务
  4. 如果连接池为空,则返回-1,不再安排执行清理操作
场景二
# RealConnectionPool

  /**
   * Notify this pool that [connection] has become idle. Returns true if the connection has been
   * removed from the pool and should be closed.
   */
  fun connectionBecameIdle(connection: RealConnection): Boolean {
    connection.assertThreadHoldsLock()

    return if (connection.noNewExchanges || maxIdleConnections == 0) {
      connection.noNewExchanges = true
      connections.remove(connection)
      if (connections.isEmpty()) cleanupQueue.cancelAll()
      true
    } else {
      cleanupQueue.schedule(cleanupTask)
      false
    }
  }

connectionBecameIdle()也会导致执行cleanupTask或将连接移出连接池,该方法被RealCall的releaseConnectionNoEvents()调用。

场景三
# RealConnectionPool 

  fun evictAll() {
    val i = connections.iterator()
    while (i.hasNext()) {
      val connection = i.next()
      val socketToClose = synchronized(connection) {
        if (connection.calls.isEmpty()) {
          i.remove()
          connection.noNewExchanges = true
          return@synchronized connection.socket()
        } else {
          return@synchronized null
        }
      }
      socketToClose?.closeQuietly()
    }

    if (connections.isEmpty()) cleanupQueue.cancelAll()
  }

主动调用 okHttp.connectionPool().evictAll()也会清理连接池中的所有连接。

(取)从连接池中查找对应的连接
# RealConnectionPool 
/**
   * Attempts to acquire a recycled connection to [address] for [call]. Returns true if a connection
   * was acquired.
   *
   * If [routes] is non-null these are the resolved routes (ie. IP addresses) for the connection.
   * This is used to coalesce related domains to the same HTTP/2 connection, such as `square.com`
   * and `square.ca`.
   */
  fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
    for (connection in connections) {
      synchronized(connection) {
        if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
        if (!connection.isEligible(address, routes)) return@synchronized
        call.acquireConnectionNoEvents(connection)
        return true
      }
    }
    return false
  }

这个方法,主要是ExchangeFinder的findConnection()方法中执行,里边有三处调用。

总结

ConnectInterceptor的主要工作,是为下一步真正请求网络,找到一个合适的RealConnection,根据这个RealConnect创建一个ExchageCodec,再用这个ExchageCodec和ExchangeFinder最终构建一个新的Exchange对象,供下一个拦截链使用。核心流程是ExchangeFinder中的findConnection方法,它查找soket的流程如下:

  1. 看当前的RealCall里边持有的RealConnect是否可用,可用则返回,不可用进入2.
  2. 从RealConnectionPool中找到一个合适的RealConnect,找到就讲这个连接返回,并赋值给当前的RealCall对象,找不到则进入3
  3. 收集Route信息,构建一个新的RealConnect,并调用它的connect方法,连接成功后,再次从ConnectionPool中查找对应的url有没有已经创建好的连接,如果有则将新创建的RealConnect关闭,复用从连接池中得到的RealConnect(连接合并),并返回,没有则进入4.
  4. 将3中创建的连接,放入RealConnectionPool中,并返回该连接。

1.2.5 CallServerInterceptor

/** This is the last interceptor in the chain. It makes a network call to the server. */
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()
    //1.1 写入Request的header信息
    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    //不是HEAD或GET请求
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        //flush请求
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if (!exchange.connection.isMultiplexed) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }

    if (requestBody == null || !requestBody.isDuplex()) {
      // 1.2 完成Request
      exchange.finishRequest()
    }
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      // Server sent a 100-continue even though we did not request one. Try again to read the actual
      // response status.
      //2.1 读取响应头
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake()) //握手
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }
    
    //2.2 响应头结束
    exchange.responseHeadersEnd(response)

    // 2.3 生成最终的response
    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
    }
    return response
  }
}

总结:CallServerIntercpter主要负责利用ConnectionIntercepter生成好的Exchange来执行发送请求和读取响应的操作。

参考资料

你想要的系列:网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点
OkHttp 4源码(4)— 连接机制分析

2. http请求头压缩

基本思路

  • 在请求头添加
header("Content-Encoding", "gzip")
  • 服务器ng收到请求后,查看是否有Content-Encoding字段,有则启用请求正文解压
举报

相关推荐

0 条评论