您现在的位置是:首页 >技术杂谈 >Kotlin协程学习之路(二):挂起函数网站首页技术杂谈

Kotlin协程学习之路(二):挂起函数

rockyou666 2023-06-13 12:00:03
简介Kotlin协程学习之路(二):挂起函数

前言

挂起函数在协程中相当的重要,正确的理解挂起函数也就显得非常重要了。

一. 挂起函数

协程中最常见的挂起函数就是delay函数了。使用起来也很简单:

runBlocking {
   delay(1000)
   // 一秒之后再执行"test end!"
   println("test end !")
}

我们先来看看delay函数:

// CancellableContinuation是Continuation子接口
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

可以看到delay函数有一个suspend关键字,英文翻译有"暂停; 中止"的意思,它的作用主要是标识该函数是一个挂起函数。

1.1 suspend函数的原理

举一个例子来说明下:

// 自定义一个带返回值suspend函数
private suspend fun test0(): String{
      return "data"
}

// 自定义一个不带返回值suspend函数
private suspend fun test1(){
      // Empty Code
}

然后对上面的suspend函数编译成java代码:

// 这两个函数的Continuation对象由Kotlin编译器传入

// test0函数的反编译
private final Object test0(Continuation $completion) {
      return "data";
}

// test1函数的反编译
private final Object test1(Continuation $completion) {
      return Unit.INSTANCE;
}

如果给挂起函数里面加上delay函数

private suspend fun test(){
     delay(1_000)
}

编译为Java代码:

private final Object test(Continuation $completion) {
       Object var10000 = DelayKt.delay(1000L, $completion);
       return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}

如果返回值是Intrinsics.COROUTINE_SUSPENDED,那么说明该函数被挂起了,如果只是普通函数,就直接返回Unit.INSTANCE或者返回结果。(类似于第一个例子)

更加复杂的例子:返回值+delay函数:

private suspend fun test(): String{
      delay(1_000)
      return "data"
}

相当复杂的反编译代码,给了部分注释。
编译为Java代码:

private final Object test(Continuation var1) {
         Object $continuation;
         label20: {
         	// 不是第一次进入,传入的Continuation是$continuation匿名内部类类型,就强转下
            if (var1 instanceof <undefinedtype>) {
               $continuation = (<undefinedtype>)var1;
               if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
                  ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
                  break label20;
               }
            }
			
			// 如果是第一次进入就以匿名内部类的形式创建
			// ContinuationImpl是一个抽象类,implement了Continuation接口
            $continuation = new ContinuationImpl(var1) {
               // $FF: synthetic field
               Object result;
               int label; // 初始值为0

               @Nullable
               public final Object invokeSuspend(@NotNull Object $result) {
                  this.result = $result;
                  this.label |= Integer.MIN_VALUE;
                  //开启协程状态机
                  return Companion.this.test(this);
               }
            };
         }
		
		 // 获取result执行结果
         Object $result = ((<undefinedtype>)$continuation).result;
         // 挂起的标志,如果挂起的话,就返回这个flag
         Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         switch (((<undefinedtype>)$continuation).label) {
            case 0:
            	// 检测异常
               ResultKt.throwOnFailure($result);
               //将label的状态改成1,方便待会儿执行delay后面的代码
               ((<undefinedtype>)$continuation).label = 1;
//1. DelayKt.delay是一个挂起函数,正常情况下,它会立马返回一个值:IntrinsicsKt.COROUTINE_SUSPENDED(也就是这里的flag),表示该函数已被挂起,这里就直接return了,该函数被挂起
//2. 恢复执行:在DelayKt.delay内部,到了指定的时间后就会调用$continuation这个Callback的invokeSuspend(也就是上面匿名函数实现的方法)
//3. invokeSuspend中又将执行test函数,同时将之前创建好的$continuation传入其中,开始执行后面的逻辑(label为1的逻辑),该函数继续往后面执行(也就是恢复执行)
               if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
                  return var4;
               }
               break;
            case 1:
            	// 检测异常
            	//label 1这里没有return,而是会走到下面的return "data"语句
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }

         return "data";
      }

通过上面的分析可以了解挂起函数的挂起和恢复的逻辑。

  1. 第一次调用test函数的时候,会在test函数内创建一个ContinuationImpl的匿名内部类,里面包含label(协程状态机当前的状态)和result(保存invokeSuspend回调的返回结果),此时$continuationlabel为0
  2. 因为第一次创建的时候label状态为0,就会进入case 0:逻辑,执行delay函数,同时判断返回值是否和IntrinsicsKt.getCOROUTINE_SUSPENDED()相同,也就是是否挂起,并且将$continuation匿名内部类对象传入delay函数,还有就是需要将label 状态被修改为1
  3. $continuation匿名内部类对象有一个invokeSuspend抽象方法,当协程从挂起状态想要恢复时,就得调用这个invokeSuspend,会调用Companion.this.test(this),再次执行test方法内部的逻辑,由于第一次已经创建$continuation匿名内部类对象,并且label已经被修改为1,则直接进入case 1:逻辑,在检测异常后,就直接return "data"

以上就是挂起函数的挂起和恢复的简要的逻辑分析。

1.2 suspend和Continuation

上面的代码都会出现Continuation对象,那么这个对象到底是个啥?

Continuation实际上是一个接口

@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

这个接口主要由两部分构成:contextresumeWith函数。

  1. context:协程上下文,主要作用是保存信息,它本质上是一个特殊的集合,有一个key对应一个element。由于内部做了运算符重载,可以直接用+组合element,具体介绍后面的文章会讲到。
  2. resumeWith:看注释可以知道,本质是一个传递一个成功或者失败的方法。

通过编译成Java代码得知,被suspend修饰的函数会被转换成一个带有 Callback 的函数。这里的Callback就是Continuation对象,这种转换就是所谓的CPS转换

我们在使用suspend修饰的挂起函数的时候,需要注意的两点:

  1. 挂起函数需要在协程中使用
  2. 挂起函数需要在其他的挂起函数中使用

之所以有以上的两点,其实还是和Continuation有关,上面通过编译为Java代码可以看到,会有一个Continuation对象的回调传入进去,而这个Continuation需要在协程或者其他的挂起函数中获取,所以导致必须在协程中使用或者在其他挂起函数中使用。

二.suspendCoroutine和suspendCancellableCoroutine

suspendCoroutinesuspendCancellableCoroutine的作用是kotlin将回调函数转换为suspend函数,直观上,可以将异步调用形式改为同步调用形式。

2.1 suspendCoroutine

举个栗子:

// NetWork类
class NetWork {

	// 回调接口
    interface CallBack{
		// 回调成功
        fun onSuccess(data: String)

		// 回调失败
        fun onError(e: Exception)
    }

    companion object{
    
    	// 经典调用
        fun request(callBack: CallBack?){
            thread {
            	// 模拟耗时
                Thread.sleep(1_000 * 2)
                if (Random.nextBoolean()){
                    // 模拟获取数据
                    callBack?.onSuccess("data")
                }else{
                    // 模拟网络异常
                    callBack?.onError(Exception("网络异常!"))
                }
            }
        }
    }
}
fun main() {
	
	// 调用getData函数
    NetWork.request(object :NetWork.CallBack{
    
          override fun onSuccess(data: String) {
               println("get Data:: $data")
           }

          override fun onError(e: Exception) {
               println("e= ${e.message}")
          }
    })
}

上面的写法在Java中很常见,现在我们通过suspendCoroutine将其转换为挂起函数。

// NetWork
class NetWork {

    interface CallBack{

        fun onSuccess(data: String)

        fun onError(e: Exception)
    }

    companion object{
    
    	// 经典调用
        private fun request(callBack: CallBack?){
            thread {
            	// 模拟耗时
                Thread.sleep(1_000 * 2)
                if (Random.nextBoolean()){
                    // 模拟获取数据
                    callBack?.onSuccess("data")
                }else{
                    // 模拟网络异常
                    callBack?.onError(Exception("网络异常!"))
                }
            }
        }

		// 形式上直接获取数据
        suspend fun requestDefault(): String{
            return suspendCoroutine {
                request(object :CallBack{
                    override fun onSuccess(data: String) {
                        it.resume(data)
                    }

                    override fun onError(e: Exception) {
                        it.resumeWithException(e)
                    }
                })
            }
        }
    }
}
fun main() {
    runBlocking {
      println(NetWork.requestDefault())
    }
}

输出结果:

test

Exception in thread "main" java.lang.Exception: 网络异常!
	at com.work.kotlin.practice.NetWork$Companion$request$1.invoke(NetWork.kt:30)
	at com.work.kotlin.practice.NetWork$Companion$request$1.invoke(NetWork.kt:22)
	at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)

Process finished with exit code 1

通过输出结果可知,resume返回成功请求的数据,resumeWithException返回异常数据。
resumeresumeWithException都是调用了resumeWith方法,并且封装了Result对象

resume & resumeWithException方法源码:

@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

2.2 suspendCancellableCoroutine

suspendCancellableCoroutine的使用和suspendCoroutine用法基本一致,区别在于suspendCancellableCoroutine 可以通过 cancel() 方法手动取消协程。

class NetWork {

    interface CallBack{

        fun onSuccess(data: String)

        fun onError(e: Exception)
    }

    companion object{

        // 经典调用
        private fun request(callBack: CallBack?){
            thread {
                // 模拟耗时
                Thread.sleep(1_000 * 2)
                if (Random.nextBoolean()){
                    // 模拟获取数据
                    callBack?.onSuccess("data")
                }else{
                    // 模拟网络异常
                    callBack?.onError(Exception("网络异常!"))
                }
            }
        }

        suspend fun requestDefault(): String{
       		// 这里直接将suspendCoroutine替换为suspendCancellableCoroutine
            return suspendCancellableCoroutine {
            	it.invokeOnCancellation {
                    // 相当于调用cancel后的回调
                    println("invokeOnCancellation: cancel the request!")
                }
                request(object :CallBack{
                    override fun onSuccess(data: String) {
                        it.resume(data)
                    }

                    override fun onError(e: Exception) {
                        it.resumeWithException(e)
                    }
                })
            }
        }
    }
		fun main() {
            runBlocking {
             	// launch也是一种构建协程的方式,CoroutineScope的扩展函数
                val job = launch {
                    val data = NetWork.requestDefault()
                    println(data)
                }
                // 注意这里的delay时间小于上面request的sleep时间
                delay(100)
                // 协程取消
                job.cancel()
            }
        }

输出结果:

invokeOnCancellation: cancel the request!

Exception in thread "main" java.lang.Exception: 网络异常!
	at com.work.kotlin.practice.NetWork$Companion$request$1.invoke(NetWork.kt:30)
	at com.work.kotlin.practice.NetWork$Companion$request$1.invoke(NetWork.kt:22)
	at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)

上面的结果可以看到,通过调用cancel,可以取消协程。如果使用前面的suspendCoroutine则无法实现该功能,原因在于suspendCancellableCoroutine的block体中提供一个CancellableContinuation对象。
suspendCancellableCoroutine源代码:

public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        /*
         * For non-atomic cancellation we setup parent-child relationship immediately
         * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
         * properly supports cancellation.
         */
        cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

其中的block变量的类型是(CancellableContinuation<T>) -> Unit,代表其传入参数是CancellableContinuation类型的变量,该参数可以调用自身的cancel方法。

总结

挂起函数比普通函数多了suspend关键字,Kotlin编译器会对其特殊处理。将该函数转换成一个带有Callback的函数(Continuation接口),如果挂起就会返回Intrinsics.COROUTINE_SUSPENDED,否则,则会直接返回结果。suspendCoroutinesuspendCancellableCoroutine的区别在于suspendCancellableCoroutine可以cancel掉。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。