Kotlin协程原理

2022/01/11

前言 :运行模型Loop + Queue[绑定协程context上下文 派发器]

image.png

Kotlin协程不是线程,更像是一个封装框架,实现还是离不开线程,如果你查看编译后的代码,可以发现Kotlin完全是在Java的框架里玩转,没有任何新的技术加入,所以核心就是封装,协程最大的作用是写法上,它只忠于开发人员,协程将耗时任务的回调写法转变成了同步的写法,或者说:简化了阻塞任务写法。 [本文辅助工具jadx]

suspend!=阻塞 = 回调封装

CoroutineSingletons.COROUTINE_SUSPENDED是不同的调用比如delay、await、withcontext等自己实现的返回。

kotlin runBlocking执行原理[BlockingEventLoop]

runBlocking的执行模型是先将Block封装成suspend提交,然后阻塞等待协程执行完毕。runBlocking 的官方解释是:

Runs a new coroutine and blocks the current thread interruptibly until its completion. This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.

大意是:在当前线程启动一个可中断的协程,runBlocking会保证协程中的任务完成后才返回,不过runBlocking一般是用来测试代码的,不应该在正常的编码中使用。runBlocking默认使用 CoroutineDispatcher使用模型是一个Loop,也可以选择其他,先看看简单使用:

fun main() {
    runBlocking {
        delay(500)
        println("Current Thread ++ " + Thread.currentThread().name)
    }
}

runBlocking的实现在Builders.kt中

    public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
        //当前线程
       val currentThread = Thread.currentThread()
        //先看有没有拦截器     companion object Key : CoroutineContext.Key<ContinuationInterceptor>   Key是个单利,它是个单利,每个进程肯定就只有一个对象,ContinuationInterceptor就是个单利对象
       val contextInterceptor = context[ContinuationInterceptor]
       val eventLoop: EventLoop?
       val newContext: CoroutineContext
        //----------①
        if (contextInterceptor == null) {
            //不特别指定的话没有拦截器,使用loop构建Context
            <!--默认是当前线程的,没有loop构建loop-->
            eventLoop = ThreadLocalEventLoop.eventLoop
            <!--这里GlobalScope.newCoroutineContext 在这中场景下,会替换成eventLoop-->
            newContext = GlobalScope.newCoroutineContext(context + eventLoop)
        } else {
            eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
                ?: ThreadLocalEventLoop.currentOrNull()
            newContext = GlobalScope.newCoroutineContext(context)
       }
       //BlockingCoroutine 顾名思义,阻塞的协程
        val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
        //开启
        coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
        //等待协程执行完成----------②
        return coroutine.joinBlocking()
   }

看一下反编译后的生成Java代码,携程block本地会被转化成SuspendLambda Function2 对象,这步依靠kotlin编译插件完成,kotlin最终还是在Java的肩膀上跳舞,没有任何超越java框架的东西:

image.png

而block会被转化成Function2 的实现类,封装了block的执行代码,runBlocking启动的协程代码快,当然也包含核心的suspend状态机,suspendlambda本质就是一个ContinuationImpl对象。

final class RunBlockingTestKt$main$1 extends  implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    int label;

    RunBlockingTestKt$main$1(Continuation<? super RunBlockingTestKt$main$1> continuation) {
        super(2, continuation);
    }

    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    public final Continuation<Unit> create(Object obj, Continuation<?> continuation) {
        return new RunBlockingTestKt$main$1(continuation);
    }

    public final Object invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) {
        return ((RunBlockingTestKt$main$1) create(coroutineScope, continuation)).invokeSuspend(Unit.INSTANCE);
    }

    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    public final Object invokeSuspend(Object obj) {
    <!--****,当然核心的就是状态机-->
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        int i = this.label;
        if (i == 0) {
            ResultKt.throwOnFailure(obj);
            this.label = 1;
            if (DelayKt.delay(500, this) == coroutine_suspended) {
                return coroutine_suspended;
            }
        } else if (i == 1) {
            ResultKt.throwOnFailure(obj);
        } else {
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        System.out.println((Object) Intrinsics.stringPlus("Current Thread ++ ", Thread.currentThread().getName()));
        return Unit.INSTANCE;
    }
}

kotlin库代码也会可被转化为Java方式,main中的调用实现在BuildersKt__BuildersKt中:

public final /* synthetic */ class BuildersKt__BuildersKt {
    public static /* synthetic */ Object runBlocking$default(CoroutineContext coroutineContext, Function2 function2, int i, Object obj) throws InterruptedException {
        if ((i & 1) != 0) {
            coroutineContext = EmptyCoroutineContext.INSTANCE;
        }
        return BuildersKt.runBlocking(coroutineContext, function2);
    }

    public static final <T> T runBlocking(CoroutineContext coroutineContext, Function2<? super CoroutineScope, ? super Continuation<? super T>, ? extends Object> function2) throws InterruptedException {
        CoroutineContext coroutineContext2;
        EventLoop eventLoop;
        <!--找到当前线程-->
        Thread currentThread = Thread.currentThread();
        <!--找当前continuationInterceptor EmptyCoroutineContext就是null -->
        ContinuationInterceptor continuationInterceptor = (ContinuationInterceptor) coroutineContext.get(ContinuationInterceptor.Key);
        <!--查找eventLoop 不存在就构建   new BlockingEventLoop(Thread.currentThread())  -->
        if (continuationInterceptor == null) {
        <!--这里的Loop其实就是BlockingEventLoop-->
            eventLoop = ThreadLocalEventLoop.INSTANCE.getEventLoop$kotlinx_coroutines_core();
            <!--构建coroutineContext2 coroutineContext.plus(eventLoop)这里就是EmptyCoroutineContext.INSTANCE -->
            coroutineContext2 = CoroutineContextKt.newCoroutineContext(GlobalScope.INSTANCE, coroutineContext.plus(eventLoop));
        } else {
            EventLoop eventLoop2 = null;
            EventLoop eventLoop3 = continuationInterceptor instanceof EventLoop ? (EventLoop) continuationInterceptor : null;
            if (eventLoop3 != null && eventLoop3.shouldBeProcessedFromContext()) {
                eventLoop2 = eventLoop3;
            }
            eventLoop = eventLoop2 == null ? ThreadLocalEventLoop.INSTANCE.currentOrNull$kotlinx_coroutines_core() : eventLoop2;
            coroutineContext2 = CoroutineContextKt.newCoroutineContext(GlobalScope.INSTANCE, coroutineContext);
        }
        <!--构建BlockingCoroutine-->
        BlockingCoroutine blockingCoroutine = new BlockingCoroutine(coroutineContext2, currentThread, eventLoop);
        <!--blockingCoroutine准备启动function2,将function2对象加入 function2就是个block的封装,利用CoroutineStart.DEFAULT启动->
        blockingCoroutine.start(CoroutineStart.DEFAULT, blockingCoroutine, function2);
        <!--等啊-->
        return (T) blockingCoroutine.joinBlocking();
    }
}

首先有个eventLoop 的概念,最终会落到BlockingEventLoop,总觉得会跟Looper的机制类似,继续往后看

public final class BlockingEventLoop extends EventLoopImplBase {
    private final Thread thread;

    @Override // kotlinx.coroutines.EventLoopImplPlatform
    protected Thread getThread() {
        return this.thread;
    }

    public BlockingEventLoop(Thread thread) {
        this.thread = thread;
    }
}

它继承了EventLoopImplBase-> EventLoopImplPlatform ->EventLoop->CoroutineDispatcher->AbstractCoroutineContextElement->CoroutineContext->CoroutineContext,所以它也是CoroutineContext,只不过封装了很多信息,了解下CoroutineContext的类结构,看看不同的类型,BlockingEventLoop相对是简单的:

image.png

接着看下coroutineContext2的构建

   public static final CoroutineContext newCoroutineContext(CoroutineScope $this$newCoroutineContext, CoroutineContext context) {
        CoroutineContext combined = foldCopies($this$newCoroutineContext.getCoroutineContext(), context, true);
        CoroutineContext debug = Debug.getDEBUG() ? combined.plus(new CoroutineId(Debug.getCOROUTINE_ID().incrementAndGet())) : combined;
        return (combined == Dispatchers.getDefault() || combined.get(ContinuationInterceptor.Key) != null) ? debug : debug.plus(Dispatchers.getDefault());
    }

之后传递给新构建的BlockingCoroutine,BlockingCoroutine三个参数parentContext【BlockingEventLoop】,blockedThread【检查】,eventLoop[为了joinBlocking]

public BlockingCoroutine(CoroutineContext parentContext, Thread blockedThread, EventLoop eventLoop) {
    super(parentContext, true, true);
    this.blockedThread = blockedThread;
    this.eventLoop = eventLoop;
}

之后启动它blockingCoroutine.start(CoroutineStart.DEFAULT, blockingCoroutine, function2)调用的是CoroutineStart.DEFAULT的invoke函数,最终调用CancellableKt的startCoroutineCancellable进行处理

public final class CancellableKt {
 
 public static final <R, T> void startCoroutineCancellable(Function2<? super R, ? super Continuation<? super T>, ? extends Object> function2, R r, Continuation<? super T> continuation, Function1<? super Throwable, Unit> function1) {
    try {
        Continuation intercepted = IntrinsicsKt.intercepted(IntrinsicsKt.createCoroutineUnintercepted(function2, r, continuation));
        Result.Companion companion = Result.Companion;
        DispatchedContinuationKt.resumeCancellableWith(intercepted, Result.m4764constructorimpl(Unit.INSTANCE), function1);
    } catch (Throwable e$iv) {
        dispatcherFailure(continuation, e$iv);
    }
}   

而在这里会调用之前Function的create构造 ,因为function1 是一个suspendlambda,所以他也是一个 BaseContinuationImpl

 public static final <T> Continuation<Unit> createCoroutineUnintercepted(Function1<? super Continuation<? super T>, ? extends Object> function1, Continuation<? super T> continuation) {
    C1725xa50de661 intrinsicsKt__IntrinsicsJvmKt$createCoroutineUnintercepted$$inlined$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt$2;
    Intrinsics.checkNotNullParameter(function1, "<this>");
    Intrinsics.checkNotNullParameter(continuation, "completion");
    Continuation probeCompletion = DebugProbes.probeCoroutineCreated(continuation);
    
    <!--runblocking会走入这个分支 -->
    if (function1 instanceof BaseContinuationImpl) {
        return ((BaseContinuationImpl) function1).create(probeCompletion);

而create其实就回到了最初的生成类,这里只构建,不执行,

image.png

后续调用看看是否有拦截器,如果有则返回封装的拦截器,否则封装一个关于intercepted,注意的是上述createCoroutineUnintercepted返回的BaseContinuationImpl,其内部用的context是BlockCorotine的context,而context其实是BlockingEventLoop, BlockingEventLoop存在intercepter

public CoroutineDispatcher() {
    super(ContinuationInterceptor.Key);
}

image.png

之后调用continuationImpl.intercepted继续封装处理

    public static final <T> Continuation<T> intercepted(Continuation<? super T> continuation) {
        Continuation<T> continuation2;
      <!--满足 ContinuationImpl-->
        ContinuationImpl continuationImpl = continuation instanceof ContinuationImpl ? (ContinuationImpl) continuation : null;
        <!--这里走的是continuation2 continuationImpl.intercepted()   -->
        return (continuationImpl == null || (continuation2 = (Continuation<T>) continuationImpl.intercepted()) == null) ? continuation : continuation2;
    }

而ContinuationImpl的intercepted 调用的是continuationInterceptor.interceptContinuation

public final Continuation<Object> intercepted() {
    ContinuationImpl it = this.intercepted;
    if (it == null) {
        ContinuationInterceptor continuationInterceptor = (ContinuationInterceptor) getContext().get(ContinuationInterceptor.Key);
        if (continuationInterceptor == null || (it = continuationInterceptor.interceptContinuation(this)) == null) {
            it = this;
        }
        this.intercepted = it;
    }
    return it;
}

而最终BlockingEventLoop是一个CoroutineDispatcher,它会构造一个DispatchedContinuation,传递给resumeCancellableWith

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

最后调用DispatchedContinuationKt.resumeCancellableWith 处理,

     public static final <T> void resumeCancellableWith(Continuation<? super T> continuation, Object obj, Function1<? super Throwable, Unit> function1) {
        boolean z;
        UndispatchedCoroutine<?> undispatchedCoroutine;
        <!--因为是DispatchedContinuation,走派发的路径-->
        if (continuation instanceof DispatchedContinuation) {
            DispatchedContinuation dispatchedContinuation = (DispatchedContinuation) continuation;
            Object state = CompletionStateKt.toState(obj, function1);
            if (dispatchedContinuation.dispatcher.isDispatchNeeded(dispatchedContinuation.getContext())) {
                dispatchedContinuation._state = state;
                dispatchedContinuation.resumeMode = 1;
                dispatchedContinuation.dispatcher.dispatch(dispatchedContinuation.getContext(), dispatchedContinuation);
                return;

走派发的话,无非就是任务入队

image.png

同时入队的任务带着回调,其实也可以认为是之前Continue的封装,封装着整个协程block的逻辑,之后利用 coroutine.joinBlocking()会阻塞等待所有的任务完成才会结束,执行runBlocking后面的。 包括自己内部的直接协程,还有子协程,当然delay负责挂起协程,真正负责阻塞的是joinBlocking自身,而不是delay函数自身, delay只是将协程体挂起而已,等待被某个契机唤醒,runblock有自己的唤醒手段。

fun joinBlocking(): T {
    registerTimeLoopThread()
    try {
        eventLoop?.incrementUseCount()
        try {
            while (true) {
                @Suppress("DEPRECATION")
                 val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                // note: process next even may loose unpark flag, so check if completed before parking
                if (isCompleted) break
                parkNanos(this, parkNanos)
            }
        } finally { // paranoia
            eventLoop?.decrementUseCount()
        }
    } finally { // paranoia
        unregisterTimeLoopThread()
    }
    // now return result
    val state = this.state.unboxState()
    (state as? CompletedExceptionally)?.let { throw it.cause }
    return state as T

以上就是runblocking的简单分析,可以看到有队列的影子、有封装的影子、有阻塞的影子,并不是说协程不会阻塞,也会,只是写成可以认为比较方便的处理阻塞,在runblocking这个模型里,runblocking构建了一个Loop模型,将runblocking中的suspend任务挂在这个Loop队列中,然后循环执行完毕后才继续后面的任务,如果中间有子任务挂靠同一个Corontine,同样需要等待子任务,无论子任务是否跟当前写成同一个派发,但是如果不是当前的子协程,就不受限制

    runBlocking {
        println("Current Thread ++ " + Thread.currentThread().name)
        delay(500)
        <!--非子协程 不受限制-->
        CoroutineScope(Dispatchers.IO).launch(Dispatchers.Default) {
            delay(5000)
            println("Current Thread ++ " + Thread.currentThread().name)
        }
        println("Current Thread ++ " + Thread.currentThread().name)
    }


runBlocking {
    println("Current Thread ++ " + Thread.currentThread().name)
    delay(500)
    <!--子协程,受限制-->
    launch(Dispatchers.Default) {
        delay(5000)
        println("Current Thread ++ " + Thread.currentThread().name)
    }
    println("Current Thread ++ " + Thread.currentThread().name)
}

可以看到,runblocking的协程是单线程、单次的,所以不需要太多复杂的东西,队列 + 阻塞循环就可以了,有Loop跟队列的概念。如果其中掺杂await,runblocking不仅挂起当前协程,还可能会挂起当前线程

image.png

这个跟它使用BlockingCoroutine的joinBlocking有关系,未完成,就可以调用parkNanos挂起,这里如果是要用awarit主动等待子协程的完成的话,就会睡眠

                val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                // note: process next even may loose unpark flag, so check if completed before parking
                if (isCompleted) break
                parkNanos(this, parkNanos)

当然,如果制定派发器,那么会有个唤起,afterCompletion ,统一调用,会被其他线程唤醒。

override fun afterCompletion(state: Any?) {
    // wake up blocked thread
    if (Thread.currentThread() != blockedThread)
        unpark(blockedThread)
}

GlobalScope.launch 原理 [Dispatchers.Default]

CoroutineScope明示Corotine的范围与归属,ContextScope内部含有coroutineContext:明示Corotine的执行主体是谁,谁来处理。

fun main() {
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L)
        println("World!")
    }
    这里不阻塞,协程无法执行
    println("Hello,") // 主线程中的代码会立即执行
    
    Thread.sleep(2000L)
}

GlobalScope被废弃,容易内存泄漏,简单看下实现

public object GlobalScope : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

GlobalScope是个单例,实现了CoroutineScope接口,CoroutineScope.launch是CoroutineScope的扩展函数,

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

launch首先利用newCoroutineContext构建上下文,这里返回的默认是internal object DefaultScheduler : SchedulerCoroutineDispatcher【派发对象】,然后创建StandaloneCoroutine,不过这里需要注意的是通runblocking不同,StandaloneCoroutine在start之后,并没有joinBlocking这样的操作可以调用,所以可以预料,默认情况下,它只负责将任务提交上去,并不会在当前线程中等待执行完毕。 当然如若你需要等待可以换种写法,主动join。到这里有了StandaloneCoroutine、Context[DefaultScheduler],接着看start,启动器跟之前runblocking类似,没什么新的,产生分歧的地方在构建协程体的时候传递的Context不同,这里传递是Dispatchers.Default

image.png

当然,interceptContinuation获取的仍旧是DispatchedContinuation,后续的resume流程跟runblocking类似,走dispatch

image.png

不过这里没有joinBlocking,谁会负责执行呢?注意这里不是BlockingEventLoop,而是DefaultScheduler,从字面上看就具备执行能力,

image.png

可以看到,走到了与BlockingEventLoop不同的dispatch流程中去,BlockingEventLoop只是入队,而DefaultScheduler除了入队,还有线程池的动作,也从代码上解释了不同的Context背后不同派发的机制。最终任务会被加到一个全局的队列中去,与这个全局队列映射的有个Loop线程

image.png

简单看下DefaultScheduler的机制,它跟Java的线程池就很像了,加入队列后,通知线程池执行

image.png

到这里GlobalScope.launch的执行就结束了,其实还是Java线程池那一套,只不过还是将Block进行了封装,将后续需要执行的通过状态机封装成Function对象,在处理耗时任务的时候,封装了线程阻塞耗时操作。有线程池+Loop+队列的概念

image.png

可以看到 协程的基本框架是:loop+queue+线程 + block封装回调。

671677657685_.pic_副本.jpg

kotlin的delay原理 :delay的目的是挂起协程,当然也可能会伴随线程睡眠,但是睡眠往往不是delay导致的

kotlin的delay是针对协程的,目的是挂起协程,不是线程,delay的挂起时间是相对于协程自身的执行时间,而跟线程没关系,任何一个delay都是协程启程启后才有意义,一个同一个协程中,不会有多个delay并发,同一个线程中,也不会有多个协程同时执行,所以单线程中delay一定顺序执行,

image.png

从输出看出delay不会直接挂起线程,如果是的话,两者的输出时间就不应该一致,应该有先后执行顺序,delay的代码如下:

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
        <!--用到了context.delay-->
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

所以可以猜想不同的context对于delay的处理应该会不同

image.png

在runblocking中,使用的是BlockingEventLoop的scheduleResumeAfterDelay

image.png

挂起点挂起协程

image.png

如果是在GlobleScope中调用呢?

image.png

context是Dispatch.default不是Delay,所以用默认delay

image.png

协程嵌套中的delay是怎么处理的

public static final Object delay(long j, Continuation<? super Unit> continuation) {
    if (j <= 0) {
        return Unit.INSTANCE;
    }
    CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted(continuation), 1);
    cancellableContinuationImpl.initCancellability();
    CancellableContinuationImpl cancellableContinuationImpl2 = cancellableContinuationImpl;
    if (j < Long.MAX_VALUE) {
        getDelay(cancellableContinuationImpl2.getContext()).scheduleResumeAfterDelay(j, cancellableContinuationImpl2);
    }
    Object result = cancellableContinuationImpl.getResult();
    if (result == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
        DebugProbes.probeCoroutineSuspended(continuation);
    }
    return result == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? result : Unit.INSTANCE;
}

构建CancellableContinuationImpl,调用cancellableContinuationImpl.getResult,这个第一次会返回IntrinsicsKt.getCOROUTINE_SUSPENDED,成为挂起态,

public final Object getResult() {
    Job job;
    boolean isReusable = isReusable();
    if (trySuspend()) {
        if (this.parentHandle == null) {
            installParentHandle();
        }
        if (isReusable) {
            releaseClaimedReusableContinuation();
        }
        return IntrinsicsKt.getCOROUTINE_SUSPENDED();
    }

第一次执行trySuspend,会返回true,

private final boolean trySuspend() {
    do {
        int i = this._decision;
        if (i != 0) {
            if (i == 2) {
                return false;
            }
        }
    } while (!_decision$FU.compareAndSet(this, 0, 1));
    return true;
}

最终流程还是一样会回到EventLoopImplBase创建DelayedResumeTask,EventLoopImplBase 有两个_delayed + _queue

internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
    // null | CLOSED_EMPTY | task | Queue<Runnable>
    private val _queue = atomic<Any?>(null)

    // Allocated only only once
    private val _delayed = atomic<DelayedTaskQueue?>(null)

    private val _isCompleted = atomic(false)

631677635785_.pic_副本.jpg

processNextEvent 先处理延迟delay的, _queue,处理的时候存在不断的更新机制。

image.png

任何时候不可能处理两个DelayedResumeTask,只会处理一个 。EventLoopImplBase正常情况下会不断的执行当前Loop的所有协程,直到碰到delay,碰到delay后才会执行睡眠,如果有些协程在delay之前启动,那就会先执行之前的协程,只要有可执行的协程,就不可能睡眠,除非所有的协程都执行完了,只剩下delay的时候,才会书面这个时候会计算睡眠时间nextTime,统一从delayList中计算。

protected override val nextTime: Long
    get() {
        if (super.nextTime == 0L) return 0L
        val queue = _queue.value
        when {
            queue === null -> {} // empty queue -- proceed
            queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
            queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
            else -> return 0 // non-empty queue
        }
        val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
        return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
    }

睡眠唤醒之后,会继续执行之前delay后的任务,因为之前DelayResumeTask将回调对象塞进去了

image.png

在DelayResumeTask run的时候,会逐步回溯到原来协程对象,最终调用其invokeSuspend完成回调。

image.png

image.png

每个delay必定属于某个协程,只有协程启动,才会启动delay,所以只要_queue有任务,就不会执行delay导致的休眠,一直要等到没有正常任务了才会执行休眠,delay才是kotlin中线程睡眠的不二手段。将协程Task插入队列,执行,每个suspend函数都内含回调意思,在每个节点处理回调,同一个协程的block suspend调用必定是顺序的,suspend的意义是耗时跟挂起,本身没有一定会睡眠的意思,也就是说,在执行中如果发生delay,那一定还是需要回调唤起,如果是耗时操作,则还是会阻塞当前协程执行,如果不想阻塞协程继续执行,可以启动新协程,当然如果一定要等待返回,当前协程还是可能挂起的,甚至相应的线程会睡眠。

如果使用的lifecycleScope,它最终会接着HandlerContext的scheduleResumeAfterDelay调用,其实就是handler.postDelayed

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val block = Runnable {
        with(continuation) { resumeUndispatched(Unit) }
    }
    if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    } else {
        cancelOnRejection(continuation.context, block)
    }
}

suspend与唤起与线程睡眠[协程Task] async、withContext挂起操作

suspend与线程睡眠没有任何关系,协程的挂起并不一定导致线程的休眠,对于当前协程所处的线程,大部分时候其实都是被会被处理成回调,至于睡眠,往往是另一个技术的加持,只要当前协程上下文没有亲自处理耗时阻塞任务,就不会导致对应线程挂起,挂起线程大部分不是协程导致的。任何suspend调用都是一个新的回调入口,后续的操作都会被处理成回调后续的操作。

    lifecycleCoroutineScope.launch {
        var def = async {
            for (i in 1..1000 * 1000 * 100) {
                print("")
            }
        }
        //协程会挂起,不会阻塞 ,kotlin suspend函数,不会造成阻塞 不会Never
        def.await()

         def = CoroutineScope(Dispatchers.Default).async {
            for (i in 1..1000 * 1000 * 100) {
                print("")
            }
        }
        //协程会挂起,但是任务不是当前协程线程执行,当前线程不会阻塞
        def.await()
    }

正如上面的两种await,都会当值当前协程挂起,但是第一个会造成阻塞,第二个不会,即使都是回调处理,但是执行主体不同 。同样的使用还有withContext,看看有没有切换协程上下文,UI线程的耗时操作,必须切换到非UI线程的协程体重去,否则会阻塞UI线程,导致卡顿。

    lifecycleCoroutineScope.launch {

        //协程会挂起,并且任务是当前协程线程执行,当前线程会阻塞
        var reponse = withContext(this.coroutineContext) {
            for (i in 1..1000 * 1000 * 100) {
                print("")
            }
        }
        //协程会挂起,但是任务不是当前协程线程执行,当前线程不会阻塞
        reponse = withContext(Dispatchers.Default) {
            for (i in 1..1000 * 1000 * 100) {
                print("")
            }
        }
    }

其实直观上也好理解,耗时任务在哪个线程执行,会阻塞哪个线程,其他线程也没必要强等待,靠隐性实现的回调就可。在kotlin的协程框架中,await不等于阻塞,只是挂起,等待恢复,看看原理。

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

正常使用DeferredCoroutine,假设用的也是默认的Dispatchers.Default,看看其执行过程,start流程跟之前类似,依旧是加入到队列中,等待执行,如果没有await操作,其实完全就跟当前协程关系不大了,如果有await,会触发suspend类的封装。

image.png

可以看到是需要挂起的,等待回调,回调传入的还有父级协程的this,不过DeferredCoroutine是如何跟当前的节点衔接起来的呢?之前suspend是同一个协程上下文比如delay

public interface Deferred<T> extends Job {
    Object await(Continuation<? super T> continuation);

DeferredCoroutine传入的参数其实是外部协程,它封装成了Continuation对象,awaitSuspend

public final Object awaitSuspend(Continuation<Object> continuation) {
    AwaitContinuation cont = new AwaitContinuation(IntrinsicsKt.intercepted(continuation), this);
    cont.initCancellability();
    <!--这里应该是给相应的Corotine添加了回调-->
    CancellableContinuationKt.disposeOnCancellation(cont, invokeOnCompletion(new ResumeAwaitOnCompletion(cont)));
    Object result = cont.getResult();
    if (result == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
        DebugProbes.probeCoroutineSuspended(continuation);
    }
    return result;
}

invokeOnCompletion类似于添加回调,等待当前任务执行完毕的时候,回调参数里面的任务,参考写法如下,大概技术这个原理,这里其实就是之前DeferredCoroutine完成。

    launch {
        delay(2000)
    }.invokeOnCompletion { 
        println("回调")
    }

完后调用ResumeAwaitOnCompletion 的invoke 恢复执行,invokeOnCompletion是个典型的回调,它不会有新的协程出现,而是添加回调对象,

public final override fun invokeOnCompletion(
    onCancelling: Boolean,
    invokeImmediately: Boolean,
    handler: CompletionHandler
): DisposableHandle {
    // Create node upfront -- for common cases it just initializes JobNode.job field,
    // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
    val node: JobNode = makeNode(handler, onCancelling)
    loopOnState { state ->
        when (state) {
            is Empty -> { // EMPTY_X state -- no completion handlers
                if (state.isActive) {
                    // try move to SINGLE state
                    if (_state.compareAndSet(state, node)) return node
                } else
                    promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
            }

_state.compareAndSet(state, node) 这句话就是用来添加回调对象的 即 JobNode ,在该任务结束之后会处理尾部工作,调用state.invoke(cause)处理回调

image.png

// suppressed == true when any exceptions were suppressed while building the final completion cause
private fun completeStateFinalization(state: Incomplete, update: Any?) {
     
     <!--state-->
    if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
        try {
            state.invoke(cause)
        } catch (ex: Throwable) {
            handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
        }
    } else {
        state.list?.notifyCompletion(cause)
    }
}

之前的JobNode算是completion handler ,在这里处理回调,当然如果在await的时候,已经完成了,会直接处理其中的任务

  else -> { // is complete
                
                if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
                return NonDisposableHandle
            }

关于线程的切换,在构建state这个回调对象的时候,会传递父协程体进去,而且生成的JobNode并不相同,依靠这个区分invoke,是否直接唤起,还是重新走派发

image.png

再次走到Task的dispatch中,派发到JobNode对应的context上下文,将任务插入相应Queue,其实还是Loop+queue那套

image.png

最后回到调用await之后的协程线程

image.png

可以看到协程线程的切换还是要依靠协程那套context Loop queue的模型,在一个协程结束的时候,通过回调往另一个Loop Queue中插入消息。

对于withcontext,它是直接在构造协程Block的时候,将外部协程回调传递进去,在结束的时候利用 completion.resumeWith(outcome);

重新换气外部协程体状态机,

    public final Object invokeSuspend(Object $result) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                this.label = 1;
                if (BuildersKt.withContext(Dispatchers.getDefault(), new 
                <!--这里设置回调-->
                CoroutinesTestModule$getNetInfo$1$reponse$1(null), this) != coroutine_suspended) {
                    break;
                } else {
                    return coroutine_suspended;
                }
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        System.out.println((Object) Intrinsics.stringPlus(LiveLiterals$CoroutinesTestModuleKt.INSTANCE.m3718x9ea3ba64(), Unit.INSTANCE));
        return Unit.INSTANCE;
    }
}

lifecycleScope.launch 中的suspend

通过上述的分析,我们知道协程的执行还是Loop线程+Queue,那么Android的UI线程怎么做,它本身已经是一个Loop线程了,协程会被抽象成Task或者数说runnable,既然UI线程已经有了Loop跟Queue,那直接提供给协程机制用就可以了,我们知道,核心是context对应的dispatcher,那么lifecycleScope对应的是谁呢?看一下两个扩展属性:

public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope

 
 public val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }

可以看到是SupervisorJob() + Dispatchers.Main.immediate,这里有个 +重载,最终落到

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                // We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            @Suppress("ConstantConditionIf")
            factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }
}

Android平台上

internal class AndroidDispatcherFactory : MainDispatcherFactory {

override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
    val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
    return HandlerContext(mainLooper.asHandler(async = true))
}

override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

override val loadPriority: Int
    get() = Int.MAX_VALUE / 2
}

最终是HandlerContext,可以看到它内含Handler,这两个是其他线程与UI线程切换的关键,因为UI线程的协程都能拿到HandlerContext,也就能被其他协程通过invokeOnCompletion回调的方式发射回到main线程

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
    return !invokeImmediately || Looper.myLooper() != handler.looper
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
    if (!handler.post(block)) {
        cancelOnRejection(context, block)
    }
}

可以看到handler.post发射block。具体不再过多分析。总之还是一个Loop+一个Queue,对应相应的context 【Dispatcher】, withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T)函数也是同样道理它主要做的就是上下文切换,withContext发起的协程在参数context限定的上下文执行,然后切回原来的congtext,withContext是有返回值的,其实跟asyn await类似,只不过做了进一步的封装。

image.png

Java自己实现协程类的做法??

kotlin的协程看起来是个新概念,其核心实现还是Java的封装,它好像干掉了回调,但是其实不过是封装成了阻塞,但是Java中如果是阻塞,则必定会牵扯Task及线程,由于Java不是函数式语言,任务也必须封装成对象,所以使用起来有些繁琐,但是如果加以对比,其实发现Java也可以做到:

<!--Java框架中的实现-->
fun javaCoroutines() {
    val executor: ExecutorService = Executors.newCachedThreadPool()
    <!--类似于启动协程-->
    executor.execute {
        println(
        <!--相比而言多了一层,因为需要获取一个可以阻塞等待的对象-->
        executor.submit(Callable {
            println("子线程在进行计算");
            Thread.sleep(2000)
            "结果" + System.currentTimeMillis()
        }).get()
        )
    }
}

如果将协程函数看做是一个Task对象其实就很好跟Java对应起来,

	suspend fun task() {
    println("子线程在进行计算");
    delay(2000)
    "结果" + System.currentTimeMillis()
}
<!--kotlin实现-->
fun kotlinCoroutines() {
<!--kotlin的协程框架将上述Java的工作进行了封装,所以写起来简化-->
    runBlocking(Dispatchers.IO) {
        val result = task()
        println(result)
    }
}  

通过对比其实可以看出:kotlin的协程在用法上,其实就是更好的Task封装,或者说少了一层,Java没法直接传递函数,只能用Runable封装一下。一个更有效的解释就是doAsync,类似一个Java跟kotlin之间的缓冲:

suspend与挂起的关系【suspend只会挂起协程,不会挂起线程】

默认情况自,自定义的函数是没必要添加suspend参数的,除非有suspend的可能比如内部调用的await,withcontext等协程框架中挂起的函数,如果加了会有提示冗余

image.png

所以只有挂起能力的才会添加suspend,换句话说,一定会变相调用kotlin库里的某个suspend函数,相应的内部函数一定存在自己的封装,牵扯到回调添加,任务的重新唤起插入等,如await,suspend只会挂起协程,不会挂起线程。

suspendCancellableCoroutine:将异步回调转换为不通调用,同时添加取消,携程被取消的时候,可以添加回调

suspend fun fetchDataCancellable(): String = suspendCancellableCoroutine { cont ->
    //  开启异步,处理,并且,在结束后唤醒
    try {
        getNetWorkData(object : OnCallBack {
            override fun onSuccess() {
                cont.resumeWith(Result.success("success"))
            }

            override fun onFail() {
                cont.resumeWith(Result.failure(Exception("fail")))
            }
        })
    } catch (e: Exception) {
        cont.resumeWithException(e)
    }

    // 添加取消回调,resumeWith 不会再回掉
    cont.invokeOnCancellation {

        println("Fetch data operation was cancelled")
    }
}

使用 :

   	  xx.launch{
         ret = fetchDataCancellable()
         if(ret == ""){
         
         }
    }

上述代码 将getNetWorkData 从异步改为协程同步,当然,其实协程的本质还是异步。只是借助写法,编译工具提升代码编译的效率。

总结

  • launch后的每个协程Block都会被抽象是一个Function+suspendlambda协程Task对象,该协程中的每个suspend调用都是一个label分支,用于调用
  • 协程的提交不会挂起,协程的执行可能挂起「如果中间有挂起函数」【好比是任务入队】
  • launch是提交任务的过程,suspend调用是添加回调或者说另一个切换提交的过程
  • 协程的执行可能会发生挂起或者阻塞线程
  • Delay不一定立刻挂起,但是有很大可能会挂起线程【存在睡眠唤醒】
  • 耗时操作只要不是在当前协程上下文执行,就不会造成当前协程线程的阻塞【会被转化成回调】
  • kotlin协程最大的辅助机制是高阶函数,简化回调传递,运行核心还是将Block或者函数体抽象成对象
  • suspend才只会挂起协程,不会阻塞线程 suspend全是回调
  • 协程的返回值不是返回值,是回调入口

Search

    Table of Contents