使用
添加依赖
包含okhttp和其使用的okio库
1
implementation "com.squareup.okhttp3:okhttp:4.9.3"
创建OkHttpClient对象
1
2
3
4
5
6
7
8
9
10
11
12
13
val httpClient = OkHttpClient.Builder()
.connectTimeout(15, TimeUnit.SECONDS) // 链接超时
.writeTimeout(15, TimeUnit.SECONDS) // 写入超时
.readTimeout(15, TimeUnit.SECONDS) // 读取超时
.addInterceptor( // 添加拦截器
Interceptor { chain ->
val request = chain.request()
// 继续执行
val response = chain.proceed(request)
response
}
)
.cache(Cache(context.cacheDir, 10 * 1024 * 1024))// 设置储存缓存
创建承载请求的Request对象
空的request对象
1
val request = Request.Builder().build()
设置url
1
2
3
val request = Request.Builder()
.url("http://coding.jiangker.cn/demo?name=jiangker&age=18")
.build()
使用HttpUrl来构建url的params参数
1
2
3
4
5
6
val urlBuilder = "http://coding.jiangker.cn/demo".toHttpUrlOrNull()!!.newBuilder()
urlBuilder.addQueryParameter("name", name)
urlBuilder.addQueryParameter("age", age.toString())
val request = Request.Builder()
.url(urlBuilder.build())
.build()
默认的请求是get请求,若要发送post请求,则需要添加post
json形式发送body对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 设置body传递类型
val jsonBody: MediaType? = "application/json; charset=utf-8".toMediaTypeOrNull()
// 构建传递的body
val builder: RequestBody = """
{
"name": "jiangker32",
"age": 15
}
""".trimIndent()
.toRequestBody(jsonBody)
// 构建post类型的request
val request = Request.Builder()
.url("http://coding.jiangker.cn/demo")
.post(builder)
.build()
等同于postman中配置
Inspectors
body中携带form-data提交表单
1
2
3
4
5
6
7
8
val requestBody = FormBody.Builder()
.add("name", "jiangker")
.add("age", "18")
.build()
val request = Request.Builder()
.url("http://coding.jiangker.cn/demo")
.post(requestBody)
.build()
等同于postman中发送
Inspectors
发送请求
同步请求
1
2
3
4
5
val response: Response = client.newCall(request).execute()
if (response.isSuccessful) {
val result: String? = response.body?.string()
return result ?: "empty error"
}
异步发送请求
1
2
3
4
5
6
7
8
9
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
}
// 子线程的回调
override fun onResponse(call: Call, response: Response) {
val result = response.body?.string()
}
})
其他读取方式
1
2
3
4
// 输入流
val resultStream: InputStream? = response.body()?.byteStream()
// 缓冲流
val resultSource: BufferedSource? = response.body()?.source()
添加header
直接添加到request中
1
2
3
4
5
val request = Request.Builder()
.url("http://coding.jiangker.cn/demo")
.addHeader("token","xxxx-xxxx")
.post(builder)
.build()
通过headers添加
1
2
3
4
5
6
7
val headers = Headers.Builder()
.add("token", "xxxx-xxxx")
.build()
val request = Request.Builder()
.url(urlBuilder.build())
.headers(headers)
.build()
headers中的key可能会重复的,可以使用header来替换对应的value
1
2
3
4
5
6
7
8
9
10
11
12
val headers = Headers.Builder()
// 直接添加
.add("token", "xxxx-xxxx")
// 清除相同的key的,然后添加
.set("token", "xxxx-xxxx")
.build()
val request = Request.Builder()
.url(urlBuilder.build())
.headers(headers)
// 清除相同的key的,然后添加
.header("token", "xxxx-xxxx")
.build()
源码解析
OKHttpClient
OkHttpClient的构建兼顾Java,使用的Builder模式,有众多参数可以设置,保存着OkHttp的主要公共配置,例如前面的拦截器、超时时间、缓存配置等。在创建Call时会将自己传入,提供公共的配置。
1
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
Request
这个没说明好说的,主要保存的是这个请求的一些关键参数
1
2
3
4
5
6
7
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
internal val tags: Map<Class<*>, Any>
)
RealCall
看前面的Call创建,知道OKHttpClient调用newCall时返回的Call实际上是RealCall对象。并且把OkHttpClient和Request都传入了进去。
接下来看看请求部分
同步请求execute
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
// 设置超时器
timeout.enter()
callStart()
try {
// 把请求添加到同步队列中
client.dispatcher.executed(this)
// 获取责任链结果
return getResponseWithInterceptorChain()
} finally {
// 请求结束后去尝试触发空闲回调
client.dispatcher.finished(this)
}
}
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors // 添加自定义的拦截器
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache) // 缓存拦截器
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)// 真正强求的拦截器
// 构建责任链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,//当前拦截器
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
这里可以看出是拿出所有的拦截器添加到一个列表中。然后创建RealInterceptorChain这个真正的责任链来依次执行拦截器,最后调用他的proceed方法依次执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// 创建下一个拦截器
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
// 执行拦截器真正的拦截方法,然后把下一个拦截器传入。拦截器可以决定是否触发下一个拦截器proceed方法。
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
拦截器执行的流程图大概为
在RealInterceptorChain会构建出真正的下一个RealInterceptorChain元素,然后调用拦截器的intercept方法传入下一个RealInterceptorChain,当拦截器需要继续执行时,就在拦截器中调用proceed方法传入RealInterceptorChain。否则直接返回response结束拦截器流程。
异步请求enqueue
1
2
3
4
5
6
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
异步的请求实际会构建一个继承至Runnable的可执行对象,然后交给调度器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 添加到异步容器中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
// 设置同时对同一个主机的请求关联同一个变量
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 开始执行
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
// host count + 1
asyncCall.callsPerHost.incrementAndGet()
// 添加到执行队列
executableCalls.add(asyncCall)
// 添加到真正运行的队列
runningAsyncCalls.add(asyncCall)
}
// 同步或者异步请求是否有正在运行的
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 去启动请求
asyncCall.executeOn(executorService)
}
return isRunning
}
大致流程是先把请求添加到待运行队列中,然后再到待执行队列中把元素添加到执行队列,然后触发AsyncCall的executeOn方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 使用线程池启动
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
// 启动超时计时
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
internal fun finished(call: AsyncCall) {
// 对同一个主机的请求减一
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
executeOn方法就直接把AsyncCall放到线程池中去执行了,当开始执行时,就会启动超时计时。最后流程和同步请求一样,都是调用getResponseWithInterceptorChain()来执行真正的拦截器和请求。最后等待结果的返回,然后回调回去,因为这里是在线程池中,并且回调是没有切换线程的,所以是在子线程返回。最后看一下执行任务的线程池是什么样的。
1
2
3
4
5
6
7
8
val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
这里的线程池是调度器传入的,是使用Java传统的线程池来创建的。是一个无核心线程的非核心线程最大的线程池,比较适合完成这种大量耗时比较短的任务。
Dispatcher
1
2
3
4
5
6
7
8
9
10
// 最大异步请求数
var maxRequests = 64
// 对同一个主机的最大同时请求数
var maxRequestsPerHost = 5
// 准备的异步请求
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
// 正在运行的异步请求
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
// 正在运行的同步请求
private val runningSyncCalls = ArrayDeque<RealCall>()
总结
- 公共的配置参数主要放在OkHttpClient中、单个请求的信息主要配置在Request中,newCall返回的是一个RealCall对象。
- OkHttp中有一个默认的调度器,设置了最大的异步请求,对一个主机的最大同时请求,以及准备的异步请求、正在运行的异步请求和同步请求队列。
- 当发起同步请求时,会启动超时检测,然后把添加到同步请求队列中,然后调用责任链开始请求。
- 当发起异步请求时,会先把请求添加到准备队列中,然后去尝试把请求放入正在运行的异步请求队列,若放入成功就会把请求放入线程池开始执行。开始执行后会开始计时以及和同步请求类似的过程,最后回调结果。
- 责任链的执行首先会收集所有拦截器,然后包装成RealInterceptorChain,主要带入了所有拦截器以及和当前index,然后调用他的process方法开始执行。在process方法中会去构建下一个RealInterceptorChain,然后取出当前拦截器触发intercept方法。依次执行这个流程。在最后的真正的请求的拦截器中会去执行真正的请求,不会再执行next.process方法。然后拦截器也都依次放回结果回去。