Home OkHttp使用和解析
Post
Cancel

OkHttp使用和解析

使用

添加依赖

包含okhttp和其使用的okio库

1
implementation "com.squareup.okhttp3:okhttp:4.9.3"

创建OkHttpClient对象

1
2
3
4
5
6
7
8
9
10
11
12
13
val httpClient = OkHttpClient.Builder()
    .connectTimeout(15, TimeUnit.SECONDS) // 链接超时
    .writeTimeout(15, TimeUnit.SECONDS) // 写入超时
    .readTimeout(15, TimeUnit.SECONDS) // 读取超时
    .addInterceptor( // 添加拦截器
        Interceptor { chain ->
            val request = chain.request()
            // 继续执行
            val response = chain.proceed(request)
            response
        }
    )
    .cache(Cache(context.cacheDir, 10 * 1024 * 1024))// 设置储存缓存

创建承载请求的Request对象

空的request对象

1
val request = Request.Builder().build()

设置url

1
2
3
val request = Request.Builder()
    .url("http://coding.jiangker.cn/demo?name=jiangker&age=18")
    .build()

使用HttpUrl来构建url的params参数

1
2
3
4
5
6
val urlBuilder = "http://coding.jiangker.cn/demo".toHttpUrlOrNull()!!.newBuilder()
urlBuilder.addQueryParameter("name", name)
urlBuilder.addQueryParameter("age", age.toString())
val request = Request.Builder()
    .url(urlBuilder.build())
    .build()

默认的请求是get请求,若要发送post请求,则需要添加post

json形式发送body对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 设置body传递类型
val jsonBody: MediaType? = "application/json; charset=utf-8".toMediaTypeOrNull()
// 构建传递的body
val builder: RequestBody = """
{
    "name": "jiangker32",
    "age": 15
}
""".trimIndent()
    .toRequestBody(jsonBody)
// 构建post类型的request
val request = Request.Builder()
    .url("http://coding.jiangker.cn/demo")
    .post(builder)
    .build()

等同于postman中配置

Inspectors

body中携带form-data提交表单

1
2
3
4
5
6
7
8
val requestBody = FormBody.Builder()
    .add("name", "jiangker")
    .add("age", "18")
    .build()
val request = Request.Builder()
    .url("http://coding.jiangker.cn/demo")
    .post(requestBody)
    .build()

等同于postman中发送

Inspectors

发送请求

同步请求

1
2
3
4
5
val response: Response = client.newCall(request).execute()
if (response.isSuccessful) {
    val result: String? = response.body?.string() 
    return result ?: "empty error"
}

异步发送请求

1
2
3
4
5
6
7
8
9
client.newCall(request).enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
    }

    // 子线程的回调
    override fun onResponse(call: Call, response: Response) {
        val result = response.body?.string()
    }
})

其他读取方式

1
2
3
4
// 输入流
val resultStream: InputStream? = response.body()?.byteStream()
// 缓冲流
val resultSource: BufferedSource? = response.body()?.source()

添加header

直接添加到request中

1
2
3
4
5
val request = Request.Builder()
    .url("http://coding.jiangker.cn/demo")
    .addHeader("token","xxxx-xxxx")
    .post(builder)
    .build()

通过headers添加

1
2
3
4
5
6
7
val headers = Headers.Builder()
    .add("token", "xxxx-xxxx")
    .build()
val request = Request.Builder()
    .url(urlBuilder.build())
    .headers(headers)
    .build()

headers中的key可能会重复的,可以使用header来替换对应的value

1
2
3
4
5
6
7
8
9
10
11
12
val headers = Headers.Builder()
    // 直接添加
    .add("token", "xxxx-xxxx")
    // 清除相同的key的,然后添加
    .set("token", "xxxx-xxxx")
    .build()
val request = Request.Builder()
    .url(urlBuilder.build())
    .headers(headers)
    // 清除相同的key的,然后添加
    .header("token", "xxxx-xxxx")
    .build()

源码解析

OKHttpClient

OkHttpClient的构建兼顾Java,使用的Builder模式,有众多参数可以设置,保存着OkHttp的主要公共配置,例如前面的拦截器、超时时间、缓存配置等。在创建Call时会将自己传入,提供公共的配置。

1
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

Request

这个没说明好说的,主要保存的是这个请求的一些关键参数

1
2
3
4
5
6
7
class Request internal constructor(
    @get:JvmName("url") val url: HttpUrl,
    @get:JvmName("method") val method: String,
    @get:JvmName("headers") val headers: Headers,
    @get:JvmName("body") val body: RequestBody?,
    internal val tags: Map<Class<*>, Any>
)

RealCall

看前面的Call创建,知道OKHttpClient调用newCall时返回的Call实际上是RealCall对象。并且把OkHttpClient和Request都传入了进去。

接下来看看请求部分

同步请求execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    // 设置超时器
    timeout.enter()
    callStart()
    try {
        // 把请求添加到同步队列中
        client.dispatcher.executed(this)
        // 获取责任链结果
        return getResponseWithInterceptorChain()
    } finally {
        // 请求结束后去尝试触发空闲回调
        client.dispatcher.finished(this)
    }
}

internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors // 添加自定义的拦截器
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache) // 缓存拦截器
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
        interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)// 真正强求的拦截器

    // 构建责任链
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,//当前拦截器
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
        val response = chain.proceed(originalRequest)
        if (isCanceled()) {
            response.closeQuietly()
            throw IOException("Canceled")
        }
        return response
    } catch (e: IOException) {
        calledNoMoreExchanges = true
        throw noMoreExchanges(e) as Throwable
    } finally {
        if (!calledNoMoreExchanges) {
            noMoreExchanges(null)
        }
    }
}

这里可以看出是拿出所有的拦截器添加到一个列表中。然后创建RealInterceptorChain这个真正的责任链来依次执行拦截器,最后调用他的proceed方法依次执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
        check(exchange.finder.sameHostAndPort(request.url)) {
            "network interceptor ${interceptors[index - 1]} must retain the same host and port"
        }
        check(calls == 1) {
            "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
        }
    }

    // 创建下一个拦截器
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    // 执行拦截器真正的拦截方法,然后把下一个拦截器传入。拦截器可以决定是否触发下一个拦截器proceed方法。
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
        check(index + 1 >= interceptors.size || next.calls == 1) {
            "network interceptor $interceptor must call proceed() exactly once"
        }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
}

拦截器执行的流程图大概为

在RealInterceptorChain会构建出真正的下一个RealInterceptorChain元素,然后调用拦截器的intercept方法传入下一个RealInterceptorChain,当拦截器需要继续执行时,就在拦截器中调用proceed方法传入RealInterceptorChain。否则直接返回response结束拦截器流程。

异步请求enqueue

1
2
3
4
5
6
override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
}

异步的请求实际会构建一个继承至Runnable的可执行对象,然后交给调度器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        // 添加到异步容器中
        readyAsyncCalls.add(call)

        // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
        // the same host.
        if (!call.call.forWebSocket) {
            // 设置同时对同一个主机的请求关联同一个变量
            val existingCall = findExistingCallWithHost(call.host)
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
        }
    }
    // 开始执行
    promoteAndExecute()
}

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
        val i = readyAsyncCalls.iterator()
        while (i.hasNext()) {
            val asyncCall = i.next()

            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

            i.remove()
            // host count + 1
            asyncCall.callsPerHost.incrementAndGet()
            // 添加到执行队列
            executableCalls.add(asyncCall)
            // 添加到真正运行的队列
            runningAsyncCalls.add(asyncCall)
        }
        // 同步或者异步请求是否有正在运行的
        isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
        val asyncCall = executableCalls[i]
        // 去启动请求
        asyncCall.executeOn(executorService)
    }

    return isRunning
}

大致流程是先把请求添加到待运行队列中,然后再到待执行队列中把元素添加到执行队列,然后触发AsyncCall的executeOn方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
fun executeOn(executorService: ExecutorService) {
    client.dispatcher.assertThreadDoesntHoldLock()

    var success = false
    try {
        // 使用线程池启动
        executorService.execute(this)
        success = true
    } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
    } finally {
        if (!success) {
            client.dispatcher.finished(this) // This call is no longer running!
        }
    }
}
override fun run() {
    threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        // 启动超时计时
        timeout.enter()
        try {
            val response = getResponseWithInterceptorChain()
            signalledCallback = true
            responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
            if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
            } else {
                responseCallback.onFailure(this@RealCall, e)
            }
        } catch (t: Throwable) {
            cancel()
            if (!signalledCallback) {
                val canceledException = IOException("canceled due to $t")
                canceledException.addSuppressed(t)
                responseCallback.onFailure(this@RealCall, canceledException)
            }
            throw t
        } finally {
            client.dispatcher.finished(this)
        }
    }
}

internal fun finished(call: AsyncCall) {
    // 对同一个主机的请求减一
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
}

executeOn方法就直接把AsyncCall放到线程池中去执行了,当开始执行时,就会启动超时计时。最后流程和同步请求一样,都是调用getResponseWithInterceptorChain()来执行真正的拦截器和请求。最后等待结果的返回,然后回调回去,因为这里是在线程池中,并且回调是没有切换线程的,所以是在子线程返回。最后看一下执行任务的线程池是什么样的。

1
2
3
4
5
6
7
8
val executorService: ExecutorService
    get() {
        if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
}

这里的线程池是调度器传入的,是使用Java传统的线程池来创建的。是一个无核心线程的非核心线程最大的线程池,比较适合完成这种大量耗时比较短的任务。

Dispatcher

1
2
3
4
5
6
7
8
9
10
// 最大异步请求数
var maxRequests = 64
// 对同一个主机的最大同时请求数
var maxRequestsPerHost = 5
// 准备的异步请求
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
// 正在运行的异步请求
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
// 正在运行的同步请求
private val runningSyncCalls = ArrayDeque<RealCall>()

总结

  • 公共的配置参数主要放在OkHttpClient中、单个请求的信息主要配置在Request中,newCall返回的是一个RealCall对象。
  • OkHttp中有一个默认的调度器,设置了最大的异步请求,对一个主机的最大同时请求,以及准备的异步请求、正在运行的异步请求和同步请求队列。
  • 当发起同步请求时,会启动超时检测,然后把添加到同步请求队列中,然后调用责任链开始请求。
  • 当发起异步请求时,会先把请求添加到准备队列中,然后去尝试把请求放入正在运行的异步请求队列,若放入成功就会把请求放入线程池开始执行。开始执行后会开始计时以及和同步请求类似的过程,最后回调结果。
  • 责任链的执行首先会收集所有拦截器,然后包装成RealInterceptorChain,主要带入了所有拦截器以及和当前index,然后调用他的process方法开始执行。在process方法中会去构建下一个RealInterceptorChain,然后取出当前拦截器触发intercept方法。依次执行这个流程。在最后的真正的请求的拦截器中会去执行真正的请求,不会再执行next.process方法。然后拦截器也都依次放回结果回去。

参考:

Okhttp Recipes

This post is licensed under CC BY 4.0 by the author.