您现在的位置是:首页 >技术杂谈 >Kotlin协程学习之路(二):挂起函数网站首页技术杂谈
Kotlin协程学习之路(二):挂起函数
文章目录
前言
挂起函数在协程中相当的重要,正确的理解挂起函数也就显得非常重要了。
一. 挂起函数
协程中最常见的挂起函数就是delay函数了。使用起来也很简单:
runBlocking {
delay(1000)
// 一秒之后再执行"test end!"
println("test end !")
}
我们先来看看delay
函数:
// CancellableContinuation是Continuation子接口
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
可以看到delay
函数有一个suspend
关键字,英文翻译有"暂停; 中止"的意思,它的作用主要是标识该函数是一个挂起函数。
1.1 suspend函数的原理
举一个例子来说明下:
// 自定义一个带返回值suspend函数
private suspend fun test0(): String{
return "data"
}
// 自定义一个不带返回值suspend函数
private suspend fun test1(){
// Empty Code
}
然后对上面的suspend函数编译成java代码:
// 这两个函数的Continuation对象由Kotlin编译器传入
// test0函数的反编译
private final Object test0(Continuation $completion) {
return "data";
}
// test1函数的反编译
private final Object test1(Continuation $completion) {
return Unit.INSTANCE;
}
如果给挂起函数里面加上delay函数
private suspend fun test(){
delay(1_000)
}
编译为Java代码:
private final Object test(Continuation $completion) {
Object var10000 = DelayKt.delay(1000L, $completion);
return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}
如果返回值是Intrinsics.COROUTINE_SUSPENDED,那么说明该函数被挂起了,如果只是普通函数,就直接返回Unit.INSTANCE
或者返回结果。(类似于第一个例子)
更加复杂的例子:返回值+delay函数:
private suspend fun test(): String{
delay(1_000)
return "data"
}
相当复杂的反编译代码,给了部分注释。
编译为Java代码:
private final Object test(Continuation var1) {
Object $continuation;
label20: {
// 不是第一次进入,传入的Continuation是$continuation匿名内部类类型,就强转下
if (var1 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var1;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
// 如果是第一次进入就以匿名内部类的形式创建
// ContinuationImpl是一个抽象类,implement了Continuation接口
$continuation = new ContinuationImpl(var1) {
// $FF: synthetic field
Object result;
int label; // 初始值为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
//开启协程状态机
return Companion.this.test(this);
}
};
}
// 获取result执行结果
Object $result = ((<undefinedtype>)$continuation).result;
// 挂起的标志,如果挂起的话,就返回这个flag
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (((<undefinedtype>)$continuation).label) {
case 0:
// 检测异常
ResultKt.throwOnFailure($result);
//将label的状态改成1,方便待会儿执行delay后面的代码
((<undefinedtype>)$continuation).label = 1;
//1. DelayKt.delay是一个挂起函数,正常情况下,它会立马返回一个值:IntrinsicsKt.COROUTINE_SUSPENDED(也就是这里的flag),表示该函数已被挂起,这里就直接return了,该函数被挂起
//2. 恢复执行:在DelayKt.delay内部,到了指定的时间后就会调用$continuation这个Callback的invokeSuspend(也就是上面匿名函数实现的方法)
//3. invokeSuspend中又将执行test函数,同时将之前创建好的$continuation传入其中,开始执行后面的逻辑(label为1的逻辑),该函数继续往后面执行(也就是恢复执行)
if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
return var4;
}
break;
case 1:
// 检测异常
//label 1这里没有return,而是会走到下面的return "data"语句
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "data";
}
通过上面的分析可以了解挂起函数的挂起和恢复的逻辑。
- 第一次调用test函数的时候,会在test函数内创建一个ContinuationImpl的匿名内部类,里面包含label(协程状态机当前的状态)和result(保存invokeSuspend回调的返回结果),此时
$continuation
的label为0 - 因为第一次创建的时候
label
状态为0,就会进入case 0:
逻辑,执行delay
函数,同时判断返回值是否和IntrinsicsKt.getCOROUTINE_SUSPENDED()
相同,也就是是否挂起,并且将$continuation匿名内部类对象传入delay
函数,还有就是需要将label 状态被修改为1。 $continuation
匿名内部类对象有一个invokeSuspend
抽象方法,当协程从挂起状态想要恢复时,就得调用这个invokeSuspend
,会调用Companion.this.test(this)
,再次执行test
方法内部的逻辑,由于第一次已经创建$continuation
匿名内部类对象,并且label已经被修改为1,则直接进入case 1:
逻辑,在检测异常后,就直接return "data"
以上就是挂起函数的挂起和恢复的简要的逻辑分析。
1.2 suspend和Continuation
上面的代码都会出现Continuation
对象,那么这个对象到底是个啥?
Continuation
实际上是一个接口
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
这个接口主要由两部分构成:context
和resumeWith
函数。
- context:协程上下文,主要作用是保存信息,它本质上是一个特殊的集合,有一个key对应一个element。由于内部做了运算符重载,可以直接用
+
组合element,具体介绍后面的文章会讲到。 - resumeWith:看注释可以知道,本质是一个传递一个成功或者失败的方法。
通过编译成Java代码得知,被suspend
修饰的函数会被转换成一个带有 Callback
的函数。这里的Callback
就是Continuation
对象,这种转换就是所谓的CPS转换。
我们在使用suspend修饰的挂起函数的时候,需要注意的两点:
- 挂起函数需要在协程中使用
- 挂起函数需要在其他的挂起函数中使用
之所以有以上的两点,其实还是和Continuation
有关,上面通过编译为Java代码可以看到,会有一个Continuation
对象的回调传入进去,而这个Continuation
需要在协程或者其他的挂起函数中获取,所以导致必须在协程中使用或者在其他挂起函数中使用。
二.suspendCoroutine和suspendCancellableCoroutine
suspendCoroutine
和suspendCancellableCoroutine
的作用是kotlin将回调函数转换为suspend函数,直观上,可以将异步调用形式改为同步调用形式。
2.1 suspendCoroutine
举个栗子:
// NetWork类
class NetWork {
// 回调接口
interface CallBack{
// 回调成功
fun onSuccess(data: String)
// 回调失败
fun onError(e: Exception)
}
companion object{
// 经典调用
fun request(callBack: CallBack?){
thread {
// 模拟耗时
Thread.sleep(1_000 * 2)
if (Random.nextBoolean()){
// 模拟获取数据
callBack?.onSuccess("data")
}else{
// 模拟网络异常
callBack?.onError(Exception("网络异常!"))
}
}
}
}
}
fun main() {
// 调用getData函数
NetWork.request(object :NetWork.CallBack{
override fun onSuccess(data: String) {
println("get Data:: $data")
}
override fun onError(e: Exception) {
println("e= ${e.message}")
}
})
}
上面的写法在Java中很常见,现在我们通过suspendCoroutine
将其转换为挂起函数。
// NetWork
class NetWork {
interface CallBack{
fun onSuccess(data: String)
fun onError(e: Exception)
}
companion object{
// 经典调用
private fun request(callBack: CallBack?){
thread {
// 模拟耗时
Thread.sleep(1_000 * 2)
if (Random.nextBoolean()){
// 模拟获取数据
callBack?.onSuccess("data")
}else{
// 模拟网络异常
callBack?.onError(Exception("网络异常!"))
}
}
}
// 形式上直接获取数据
suspend fun requestDefault(): String{
return suspendCoroutine {
request(object :CallBack{
override fun onSuccess(data: String) {
it.resume(data)
}
override fun onError(e: Exception) {
it.resumeWithException(e)
}
})
}
}
}
}
fun main() {
runBlocking {
println(NetWork.requestDefault())
}
}
输出结果:
test
或
Exception in thread "main" java.lang.Exception: 网络异常!
at com.work.kotlin.practice.NetWork$Companion$request$1.invoke(NetWork.kt:30)
at com.work.kotlin.practice.NetWork$Companion$request$1.invoke(NetWork.kt:22)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Process finished with exit code 1
通过输出结果可知,resume
返回成功请求的数据,resumeWithException
返回异常数据。
resume
和resumeWithException
都是调用了resumeWith
方法,并且封装了Result
对象
resume
& resumeWithException
方法源码:
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
2.2 suspendCancellableCoroutine
suspendCancellableCoroutine
的使用和suspendCoroutine
用法基本一致,区别在于suspendCancellableCoroutine
可以通过 cancel()
方法手动取消协程。
class NetWork {
interface CallBack{
fun onSuccess(data: String)
fun onError(e: Exception)
}
companion object{
// 经典调用
private fun request(callBack: CallBack?){
thread {
// 模拟耗时
Thread.sleep(1_000 * 2)
if (Random.nextBoolean()){
// 模拟获取数据
callBack?.onSuccess("data")
}else{
// 模拟网络异常
callBack?.onError(Exception("网络异常!"))
}
}
}
suspend fun requestDefault(): String{
// 这里直接将suspendCoroutine替换为suspendCancellableCoroutine
return suspendCancellableCoroutine {
it.invokeOnCancellation {
// 相当于调用cancel后的回调
println("invokeOnCancellation: cancel the request!")
}
request(object :CallBack{
override fun onSuccess(data: String) {
it.resume(data)
}
override fun onError(e: Exception) {
it.resumeWithException(e)
}
})
}
}
}
fun main() {
runBlocking {
// launch也是一种构建协程的方式,CoroutineScope的扩展函数
val job = launch {
val data = NetWork.requestDefault()
println(data)
}
// 注意这里的delay时间小于上面request的sleep时间
delay(100)
// 协程取消
job.cancel()
}
}
输出结果:
invokeOnCancellation: cancel the request!
或
Exception in thread "main" java.lang.Exception: 网络异常!
at com.work.kotlin.practice.NetWork$Companion$request$1.invoke(NetWork.kt:30)
at com.work.kotlin.practice.NetWork$Companion$request$1.invoke(NetWork.kt:22)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
上面的结果可以看到,通过调用cancel
,可以取消协程。如果使用前面的suspendCoroutine
则无法实现该功能,原因在于suspendCancellableCoroutine
的block体中提供一个CancellableContinuation
对象。
suspendCancellableCoroutine
源代码:
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
/*
* For non-atomic cancellation we setup parent-child relationship immediately
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
* properly supports cancellation.
*/
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
其中的block变量的类型是(CancellableContinuation<T>) -> Unit
,代表其传入参数是CancellableContinuation
类型的变量,该参数可以调用自身的cancel
方法。
总结
挂起函数比普通函数多了suspend关键字,Kotlin编译器会对其特殊处理。将该函数转换成一个带有Callback
的函数(Continuation
接口),如果挂起就会返回Intrinsics.COROUTINE_SUSPENDED
,否则,则会直接返回结果。suspendCoroutine
和suspendCancellableCoroutine
的区别在于suspendCancellableCoroutine
可以cancel
掉。