前言 :运行模型Loop + Queue[绑定协程context上下文 派发器]
Kotlin协程不是线程,更像是一个封装框架,实现还是离不开线程,如果你查看编译后的代码,可以发现Kotlin完全是在Java的框架里玩转,没有任何新的技术加入,所以核心就是封装,协程最大的作用是写法上,它只忠于开发人员,协程将耗时任务的回调写法转变成了同步的写法,或者说:简化了阻塞任务写法。 [本文辅助工具jadx]
suspend!=阻塞 = 回调封装
CoroutineSingletons.COROUTINE_SUSPENDED是不同的调用比如delay、await、withcontext等自己实现的返回。
kotlin runBlocking执行原理[BlockingEventLoop]
runBlocking的执行模型是先将Block封装成suspend提交,然后阻塞等待协程执行完毕。runBlocking 的官方解释是:
Runs a new coroutine and blocks the current thread interruptibly until its completion. This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.
大意是:在当前线程启动一个可中断的协程,runBlocking会保证协程中的任务完成后才返回,不过runBlocking一般是用来测试代码的,不应该在正常的编码中使用。runBlocking默认使用 CoroutineDispatcher使用模型是一个Loop,也可以选择其他,先看看简单使用:
fun main() {
runBlocking {
delay(500)
println("Current Thread ++ " + Thread.currentThread().name)
}
}
runBlocking的实现在Builders.kt中
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
//当前线程
val currentThread = Thread.currentThread()
//先看有没有拦截器 companion object Key : CoroutineContext.Key<ContinuationInterceptor> Key是个单利,它是个单利,每个进程肯定就只有一个对象,ContinuationInterceptor就是个单利对象
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
//----------①
if (contextInterceptor == null) {
//不特别指定的话没有拦截器,使用loop构建Context
<!--默认是当前线程的,没有loop构建loop-->
eventLoop = ThreadLocalEventLoop.eventLoop
<!--这里GlobalScope.newCoroutineContext 在这中场景下,会替换成eventLoop-->
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
//BlockingCoroutine 顾名思义,阻塞的协程
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
//开启
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
//等待协程执行完成----------②
return coroutine.joinBlocking()
}
看一下反编译后的生成Java代码,携程block本地会被转化成SuspendLambda Function2 对象,这步依靠kotlin编译插件完成,kotlin最终还是在Java的肩膀上跳舞,没有任何超越java框架的东西:
而block会被转化成Function2 的实现类,封装了block的执行代码,runBlocking启动的协程代码快,当然也包含核心的suspend状态机,suspendlambda本质就是一个ContinuationImpl对象。
final class RunBlockingTestKt$main$1 extends implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
RunBlockingTestKt$main$1(Continuation<? super RunBlockingTestKt$main$1> continuation) {
super(2, continuation);
}
@Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
public final Continuation<Unit> create(Object obj, Continuation<?> continuation) {
return new RunBlockingTestKt$main$1(continuation);
}
public final Object invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) {
return ((RunBlockingTestKt$main$1) create(coroutineScope, continuation)).invokeSuspend(Unit.INSTANCE);
}
@Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
public final Object invokeSuspend(Object obj) {
<!--****,当然核心的就是状态机-->
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
int i = this.label;
if (i == 0) {
ResultKt.throwOnFailure(obj);
this.label = 1;
if (DelayKt.delay(500, this) == coroutine_suspended) {
return coroutine_suspended;
}
} else if (i == 1) {
ResultKt.throwOnFailure(obj);
} else {
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
System.out.println((Object) Intrinsics.stringPlus("Current Thread ++ ", Thread.currentThread().getName()));
return Unit.INSTANCE;
}
}
kotlin库代码也会可被转化为Java方式,main中的调用实现在BuildersKt__BuildersKt中:
public final /* synthetic */ class BuildersKt__BuildersKt {
public static /* synthetic */ Object runBlocking$default(CoroutineContext coroutineContext, Function2 function2, int i, Object obj) throws InterruptedException {
if ((i & 1) != 0) {
coroutineContext = EmptyCoroutineContext.INSTANCE;
}
return BuildersKt.runBlocking(coroutineContext, function2);
}
public static final <T> T runBlocking(CoroutineContext coroutineContext, Function2<? super CoroutineScope, ? super Continuation<? super T>, ? extends Object> function2) throws InterruptedException {
CoroutineContext coroutineContext2;
EventLoop eventLoop;
<!--找到当前线程-->
Thread currentThread = Thread.currentThread();
<!--找当前continuationInterceptor EmptyCoroutineContext就是null -->
ContinuationInterceptor continuationInterceptor = (ContinuationInterceptor) coroutineContext.get(ContinuationInterceptor.Key);
<!--查找eventLoop 不存在就构建 new BlockingEventLoop(Thread.currentThread()) -->
if (continuationInterceptor == null) {
<!--这里的Loop其实就是BlockingEventLoop-->
eventLoop = ThreadLocalEventLoop.INSTANCE.getEventLoop$kotlinx_coroutines_core();
<!--构建coroutineContext2 coroutineContext.plus(eventLoop)这里就是EmptyCoroutineContext.INSTANCE -->
coroutineContext2 = CoroutineContextKt.newCoroutineContext(GlobalScope.INSTANCE, coroutineContext.plus(eventLoop));
} else {
EventLoop eventLoop2 = null;
EventLoop eventLoop3 = continuationInterceptor instanceof EventLoop ? (EventLoop) continuationInterceptor : null;
if (eventLoop3 != null && eventLoop3.shouldBeProcessedFromContext()) {
eventLoop2 = eventLoop3;
}
eventLoop = eventLoop2 == null ? ThreadLocalEventLoop.INSTANCE.currentOrNull$kotlinx_coroutines_core() : eventLoop2;
coroutineContext2 = CoroutineContextKt.newCoroutineContext(GlobalScope.INSTANCE, coroutineContext);
}
<!--构建BlockingCoroutine-->
BlockingCoroutine blockingCoroutine = new BlockingCoroutine(coroutineContext2, currentThread, eventLoop);
<!--blockingCoroutine准备启动function2,将function2对象加入 function2就是个block的封装,利用CoroutineStart.DEFAULT启动->
blockingCoroutine.start(CoroutineStart.DEFAULT, blockingCoroutine, function2);
<!--等啊-->
return (T) blockingCoroutine.joinBlocking();
}
}
首先有个eventLoop 的概念,最终会落到BlockingEventLoop,总觉得会跟Looper的机制类似,继续往后看
public final class BlockingEventLoop extends EventLoopImplBase {
private final Thread thread;
@Override // kotlinx.coroutines.EventLoopImplPlatform
protected Thread getThread() {
return this.thread;
}
public BlockingEventLoop(Thread thread) {
this.thread = thread;
}
}
它继承了EventLoopImplBase-> EventLoopImplPlatform ->EventLoop->CoroutineDispatcher->AbstractCoroutineContextElement->CoroutineContext->CoroutineContext,所以它也是CoroutineContext,只不过封装了很多信息,了解下CoroutineContext的类结构,看看不同的类型,BlockingEventLoop相对是简单的:
接着看下coroutineContext2的构建
public static final CoroutineContext newCoroutineContext(CoroutineScope $this$newCoroutineContext, CoroutineContext context) {
CoroutineContext combined = foldCopies($this$newCoroutineContext.getCoroutineContext(), context, true);
CoroutineContext debug = Debug.getDEBUG() ? combined.plus(new CoroutineId(Debug.getCOROUTINE_ID().incrementAndGet())) : combined;
return (combined == Dispatchers.getDefault() || combined.get(ContinuationInterceptor.Key) != null) ? debug : debug.plus(Dispatchers.getDefault());
}
之后传递给新构建的BlockingCoroutine,BlockingCoroutine三个参数parentContext【BlockingEventLoop】,blockedThread【检查】,eventLoop[为了joinBlocking]
public BlockingCoroutine(CoroutineContext parentContext, Thread blockedThread, EventLoop eventLoop) {
super(parentContext, true, true);
this.blockedThread = blockedThread;
this.eventLoop = eventLoop;
}
之后启动它blockingCoroutine.start(CoroutineStart.DEFAULT, blockingCoroutine, function2)调用的是CoroutineStart.DEFAULT的invoke函数,最终调用CancellableKt的startCoroutineCancellable进行处理
public final class CancellableKt {
public static final <R, T> void startCoroutineCancellable(Function2<? super R, ? super Continuation<? super T>, ? extends Object> function2, R r, Continuation<? super T> continuation, Function1<? super Throwable, Unit> function1) {
try {
Continuation intercepted = IntrinsicsKt.intercepted(IntrinsicsKt.createCoroutineUnintercepted(function2, r, continuation));
Result.Companion companion = Result.Companion;
DispatchedContinuationKt.resumeCancellableWith(intercepted, Result.m4764constructorimpl(Unit.INSTANCE), function1);
} catch (Throwable e$iv) {
dispatcherFailure(continuation, e$iv);
}
}
而在这里会调用之前Function的create构造 ,因为function1 是一个suspendlambda,所以他也是一个 BaseContinuationImpl
public static final <T> Continuation<Unit> createCoroutineUnintercepted(Function1<? super Continuation<? super T>, ? extends Object> function1, Continuation<? super T> continuation) {
C1725xa50de661 intrinsicsKt__IntrinsicsJvmKt$createCoroutineUnintercepted$$inlined$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt$2;
Intrinsics.checkNotNullParameter(function1, "<this>");
Intrinsics.checkNotNullParameter(continuation, "completion");
Continuation probeCompletion = DebugProbes.probeCoroutineCreated(continuation);
<!--runblocking会走入这个分支 -->
if (function1 instanceof BaseContinuationImpl) {
return ((BaseContinuationImpl) function1).create(probeCompletion);
而create其实就回到了最初的生成类,这里只构建,不执行,
后续调用看看是否有拦截器,如果有则返回封装的拦截器,否则封装一个关于intercepted,注意的是上述createCoroutineUnintercepted返回的BaseContinuationImpl,其内部用的context是BlockCorotine的context,而context其实是BlockingEventLoop, BlockingEventLoop存在intercepter
public CoroutineDispatcher() {
super(ContinuationInterceptor.Key);
}
之后调用continuationImpl.intercepted继续封装处理
public static final <T> Continuation<T> intercepted(Continuation<? super T> continuation) {
Continuation<T> continuation2;
<!--满足 ContinuationImpl-->
ContinuationImpl continuationImpl = continuation instanceof ContinuationImpl ? (ContinuationImpl) continuation : null;
<!--这里走的是continuation2 continuationImpl.intercepted() -->
return (continuationImpl == null || (continuation2 = (Continuation<T>) continuationImpl.intercepted()) == null) ? continuation : continuation2;
}
而ContinuationImpl的intercepted 调用的是continuationInterceptor.interceptContinuation
public final Continuation<Object> intercepted() {
ContinuationImpl it = this.intercepted;
if (it == null) {
ContinuationInterceptor continuationInterceptor = (ContinuationInterceptor) getContext().get(ContinuationInterceptor.Key);
if (continuationInterceptor == null || (it = continuationInterceptor.interceptContinuation(this)) == null) {
it = this;
}
this.intercepted = it;
}
return it;
}
而最终BlockingEventLoop是一个CoroutineDispatcher,它会构造一个DispatchedContinuation,传递给resumeCancellableWith
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
最后调用DispatchedContinuationKt.resumeCancellableWith 处理,
public static final <T> void resumeCancellableWith(Continuation<? super T> continuation, Object obj, Function1<? super Throwable, Unit> function1) {
boolean z;
UndispatchedCoroutine<?> undispatchedCoroutine;
<!--因为是DispatchedContinuation,走派发的路径-->
if (continuation instanceof DispatchedContinuation) {
DispatchedContinuation dispatchedContinuation = (DispatchedContinuation) continuation;
Object state = CompletionStateKt.toState(obj, function1);
if (dispatchedContinuation.dispatcher.isDispatchNeeded(dispatchedContinuation.getContext())) {
dispatchedContinuation._state = state;
dispatchedContinuation.resumeMode = 1;
dispatchedContinuation.dispatcher.dispatch(dispatchedContinuation.getContext(), dispatchedContinuation);
return;
走派发的话,无非就是任务入队
同时入队的任务带着回调,其实也可以认为是之前Continue的封装,封装着整个协程block的逻辑,之后利用 coroutine.joinBlocking()会阻塞等待所有的任务完成才会结束,执行runBlocking后面的。 包括自己内部的直接协程,还有子协程,当然delay负责挂起协程,真正负责阻塞的是joinBlocking自身,而不是delay函数自身, delay只是将协程体挂起而已,等待被某个契机唤醒,runblock有自己的唤醒手段。
fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
@Suppress("DEPRECATION")
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// now return result
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
以上就是runblocking的简单分析,可以看到有队列的影子、有封装的影子、有阻塞的影子,并不是说协程不会阻塞,也会,只是写成可以认为比较方便的处理阻塞,在runblocking这个模型里,runblocking构建了一个Loop模型,将runblocking中的suspend任务挂在这个Loop队列中,然后循环执行完毕后才继续后面的任务,如果中间有子任务挂靠同一个Corontine,同样需要等待子任务,无论子任务是否跟当前写成同一个派发,但是如果不是当前的子协程,就不受限制
runBlocking {
println("Current Thread ++ " + Thread.currentThread().name)
delay(500)
<!--非子协程 不受限制-->
CoroutineScope(Dispatchers.IO).launch(Dispatchers.Default) {
delay(5000)
println("Current Thread ++ " + Thread.currentThread().name)
}
println("Current Thread ++ " + Thread.currentThread().name)
}
runBlocking {
println("Current Thread ++ " + Thread.currentThread().name)
delay(500)
<!--子协程,受限制-->
launch(Dispatchers.Default) {
delay(5000)
println("Current Thread ++ " + Thread.currentThread().name)
}
println("Current Thread ++ " + Thread.currentThread().name)
}
可以看到,runblocking的协程是单线程、单次的,所以不需要太多复杂的东西,队列 + 阻塞循环就可以了,有Loop跟队列的概念。如果其中掺杂await,runblocking不仅挂起当前协程,还可能会挂起当前线程
这个跟它使用BlockingCoroutine的joinBlocking有关系,未完成,就可以调用parkNanos挂起,这里如果是要用awarit主动等待子协程的完成的话,就会睡眠
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
当然,如果制定派发器,那么会有个唤起,afterCompletion ,统一调用,会被其他线程唤醒。
override fun afterCompletion(state: Any?) {
// wake up blocked thread
if (Thread.currentThread() != blockedThread)
unpark(blockedThread)
}
GlobalScope.launch 原理 [Dispatchers.Default]
CoroutineScope明示Corotine的范围与归属,ContextScope内部含有coroutineContext:明示Corotine的执行主体是谁,谁来处理。
fun main() {
GlobalScope.launch { // 在后台启动一个新的协程并继续
delay(1000L)
println("World!")
}
这里不阻塞,协程无法执行
println("Hello,") // 主线程中的代码会立即执行
Thread.sleep(2000L)
}
GlobalScope被废弃,容易内存泄漏,简单看下实现
public object GlobalScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
GlobalScope是个单例,实现了CoroutineScope接口,CoroutineScope.launch是CoroutineScope的扩展函数,
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
launch首先利用newCoroutineContext构建上下文,这里返回的默认是internal object DefaultScheduler : SchedulerCoroutineDispatcher【派发对象】,然后创建StandaloneCoroutine,不过这里需要注意的是通runblocking不同,StandaloneCoroutine在start之后,并没有joinBlocking这样的操作可以调用,所以可以预料,默认情况下,它只负责将任务提交上去,并不会在当前线程中等待执行完毕。 当然如若你需要等待可以换种写法,主动join。到这里有了StandaloneCoroutine、Context[DefaultScheduler],接着看start,启动器跟之前runblocking类似,没什么新的,产生分歧的地方在构建协程体的时候传递的Context不同,这里传递是Dispatchers.Default
当然,interceptContinuation获取的仍旧是DispatchedContinuation,后续的resume流程跟runblocking类似,走dispatch
不过这里没有joinBlocking,谁会负责执行呢?注意这里不是BlockingEventLoop,而是DefaultScheduler,从字面上看就具备执行能力,
可以看到,走到了与BlockingEventLoop不同的dispatch流程中去,BlockingEventLoop只是入队,而DefaultScheduler除了入队,还有线程池的动作,也从代码上解释了不同的Context背后不同派发的机制。最终任务会被加到一个全局的队列中去,与这个全局队列映射的有个Loop线程
简单看下DefaultScheduler的机制,它跟Java的线程池就很像了,加入队列后,通知线程池执行
到这里GlobalScope.launch的执行就结束了,其实还是Java线程池那一套,只不过还是将Block进行了封装,将后续需要执行的通过状态机封装成Function对象,在处理耗时任务的时候,封装了线程阻塞耗时操作。有线程池+Loop+队列的概念。
可以看到 协程的基本框架是:loop+queue+线程 + block封装回调。
kotlin的delay原理 :delay的目的是挂起协程,当然也可能会伴随线程睡眠,但是睡眠往往不是delay导致的
kotlin的delay是针对协程的,目的是挂起协程,不是线程,delay的挂起时间是相对于协程自身的执行时间,而跟线程没关系,任何一个delay都是协程启程启后才有意义,一个同一个协程中,不会有多个delay并发,同一个线程中,也不会有多个协程同时执行,所以单线程中delay一定顺序执行,
从输出看出delay不会直接挂起线程,如果是的话,两者的输出时间就不应该一致,应该有先后执行顺序,delay的代码如下:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
<!--用到了context.delay-->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
所以可以猜想不同的context对于delay的处理应该会不同
在runblocking中,使用的是BlockingEventLoop的scheduleResumeAfterDelay
挂起点挂起协程
如果是在GlobleScope中调用呢?
context是Dispatch.default不是Delay,所以用默认delay
协程嵌套中的delay是怎么处理的
public static final Object delay(long j, Continuation<? super Unit> continuation) {
if (j <= 0) {
return Unit.INSTANCE;
}
CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted(continuation), 1);
cancellableContinuationImpl.initCancellability();
CancellableContinuationImpl cancellableContinuationImpl2 = cancellableContinuationImpl;
if (j < Long.MAX_VALUE) {
getDelay(cancellableContinuationImpl2.getContext()).scheduleResumeAfterDelay(j, cancellableContinuationImpl2);
}
Object result = cancellableContinuationImpl.getResult();
if (result == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbes.probeCoroutineSuspended(continuation);
}
return result == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? result : Unit.INSTANCE;
}
构建CancellableContinuationImpl,调用cancellableContinuationImpl.getResult,这个第一次会返回IntrinsicsKt.getCOROUTINE_SUSPENDED,成为挂起态,
public final Object getResult() {
Job job;
boolean isReusable = isReusable();
if (trySuspend()) {
if (this.parentHandle == null) {
installParentHandle();
}
if (isReusable) {
releaseClaimedReusableContinuation();
}
return IntrinsicsKt.getCOROUTINE_SUSPENDED();
}
第一次执行trySuspend,会返回true,
private final boolean trySuspend() {
do {
int i = this._decision;
if (i != 0) {
if (i == 2) {
return false;
}
}
} while (!_decision$FU.compareAndSet(this, 0, 1));
return true;
}
最终流程还是一样会回到EventLoopImplBase创建DelayedResumeTask,EventLoopImplBase 有两个_delayed + _queue
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)
// Allocated only only once
private val _delayed = atomic<DelayedTaskQueue?>(null)
private val _isCompleted = atomic(false)
processNextEvent 先处理延迟delay的, _queue,处理的时候存在不断的更新机制。
任何时候不可能处理两个DelayedResumeTask,只会处理一个 。EventLoopImplBase正常情况下会不断的执行当前Loop的所有协程,直到碰到delay,碰到delay后才会执行睡眠,如果有些协程在delay之前启动,那就会先执行之前的协程,只要有可执行的协程,就不可能睡眠,除非所有的协程都执行完了,只剩下delay的时候,才会书面这个时候会计算睡眠时间nextTime,统一从delayList中计算。
protected override val nextTime: Long
get() {
if (super.nextTime == 0L) return 0L
val queue = _queue.value
when {
queue === null -> {} // empty queue -- proceed
queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
else -> return 0 // non-empty queue
}
val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
}
睡眠唤醒之后,会继续执行之前delay后的任务,因为之前DelayResumeTask将回调对象塞进去了
在DelayResumeTask run的时候,会逐步回溯到原来协程对象,最终调用其invokeSuspend完成回调。
每个delay必定属于某个协程,只有协程启动,才会启动delay,所以只要_queue有任务,就不会执行delay导致的休眠,一直要等到没有正常任务了才会执行休眠,delay才是kotlin中线程睡眠的不二手段。将协程Task插入队列,执行,每个suspend函数都内含回调意思,在每个节点处理回调,同一个协程的block suspend调用必定是顺序的,suspend的意义是耗时跟挂起,本身没有一定会睡眠的意思,也就是说,在执行中如果发生delay,那一定还是需要回调唤起,如果是耗时操作,则还是会阻塞当前协程执行,如果不想阻塞协程继续执行,可以启动新协程,当然如果一定要等待返回,当前协程还是可能挂起的,甚至相应的线程会睡眠。
如果使用的lifecycleScope,它最终会接着HandlerContext的scheduleResumeAfterDelay调用,其实就是handler.postDelayed
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
cancelOnRejection(continuation.context, block)
}
}
suspend与唤起与线程睡眠[协程Task] async、withContext挂起操作
suspend与线程睡眠没有任何关系,协程的挂起并不一定导致线程的休眠,对于当前协程所处的线程,大部分时候其实都是被会被处理成回调,至于睡眠,往往是另一个技术的加持,只要当前协程上下文没有亲自处理耗时阻塞任务,就不会导致对应线程挂起,挂起线程大部分不是协程导致的。任何suspend调用都是一个新的回调入口,后续的操作都会被处理成回调后续的操作。
lifecycleCoroutineScope.launch {
var def = async {
for (i in 1..1000 * 1000 * 100) {
print("")
}
}
//协程会挂起,不会阻塞 ,kotlin suspend函数,不会造成阻塞 不会Never
def.await()
def = CoroutineScope(Dispatchers.Default).async {
for (i in 1..1000 * 1000 * 100) {
print("")
}
}
//协程会挂起,但是任务不是当前协程线程执行,当前线程不会阻塞
def.await()
}
正如上面的两种await,都会当值当前协程挂起,但是第一个会造成阻塞,第二个不会,即使都是回调处理,但是执行主体不同 。同样的使用还有withContext,看看有没有切换协程上下文,UI线程的耗时操作,必须切换到非UI线程的协程体重去,否则会阻塞UI线程,导致卡顿。
lifecycleCoroutineScope.launch {
//协程会挂起,并且任务是当前协程线程执行,当前线程会阻塞
var reponse = withContext(this.coroutineContext) {
for (i in 1..1000 * 1000 * 100) {
print("")
}
}
//协程会挂起,但是任务不是当前协程线程执行,当前线程不会阻塞
reponse = withContext(Dispatchers.Default) {
for (i in 1..1000 * 1000 * 100) {
print("")
}
}
}
其实直观上也好理解,耗时任务在哪个线程执行,会阻塞哪个线程,其他线程也没必要强等待,靠隐性实现的回调就可。在kotlin的协程框架中,await不等于阻塞,只是挂起,等待恢复,看看原理。
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
正常使用DeferredCoroutine,假设用的也是默认的Dispatchers.Default,看看其执行过程,start流程跟之前类似,依旧是加入到队列中,等待执行,如果没有await操作,其实完全就跟当前协程关系不大了,如果有await,会触发suspend类的封装。
可以看到是需要挂起的,等待回调,回调传入的还有父级协程的this,不过DeferredCoroutine是如何跟当前的节点衔接起来的呢?之前suspend是同一个协程上下文比如delay
public interface Deferred<T> extends Job {
Object await(Continuation<? super T> continuation);
DeferredCoroutine传入的参数其实是外部协程,它封装成了Continuation对象,awaitSuspend
public final Object awaitSuspend(Continuation<Object> continuation) {
AwaitContinuation cont = new AwaitContinuation(IntrinsicsKt.intercepted(continuation), this);
cont.initCancellability();
<!--这里应该是给相应的Corotine添加了回调-->
CancellableContinuationKt.disposeOnCancellation(cont, invokeOnCompletion(new ResumeAwaitOnCompletion(cont)));
Object result = cont.getResult();
if (result == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbes.probeCoroutineSuspended(continuation);
}
return result;
}
invokeOnCompletion类似于添加回调,等待当前任务执行完毕的时候,回调参数里面的任务,参考写法如下,大概技术这个原理,这里其实就是之前DeferredCoroutine完成。
launch {
delay(2000)
}.invokeOnCompletion {
println("回调")
}
完后调用ResumeAwaitOnCompletion 的invoke 恢复执行,invokeOnCompletion是个典型的回调,它不会有新的协程出现,而是添加回调对象,
public final override fun invokeOnCompletion(
onCancelling: Boolean,
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
// Create node upfront -- for common cases it just initializes JobNode.job field,
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
val node: JobNode = makeNode(handler, onCancelling)
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
if (_state.compareAndSet(state, node)) return node
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
_state.compareAndSet(state, node) 这句话就是用来添加回调对象的 即 JobNode ,在该任务结束之后会处理尾部工作,调用state.invoke(cause)处理回调
// suppressed == true when any exceptions were suppressed while building the final completion cause
private fun completeStateFinalization(state: Incomplete, update: Any?) {
<!--state-->
if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
try {
state.invoke(cause)
} catch (ex: Throwable) {
handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
}
} else {
state.list?.notifyCompletion(cause)
}
}
之前的JobNode算是completion handler ,在这里处理回调,当然如果在await的时候,已经完成了,会直接处理其中的任务
else -> { // is complete
if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
return NonDisposableHandle
}
关于线程的切换,在构建state这个回调对象的时候,会传递父协程体进去,而且生成的JobNode并不相同,依靠这个区分invoke,是否直接唤起,还是重新走派发
再次走到Task的dispatch中,派发到JobNode对应的context上下文,将任务插入相应Queue,其实还是Loop+queue那套
最后回到调用await之后的协程线程
可以看到协程线程的切换还是要依靠协程那套context Loop queue的模型,在一个协程结束的时候,通过回调往另一个Loop Queue中插入消息。
对于withcontext,它是直接在构造协程Block的时候,将外部协程回调传递进去,在结束的时候利用 completion.resumeWith(outcome);
重新换气外部协程体状态机,
public final Object invokeSuspend(Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (BuildersKt.withContext(Dispatchers.getDefault(), new
<!--这里设置回调-->
CoroutinesTestModule$getNetInfo$1$reponse$1(null), this) != coroutine_suspended) {
break;
} else {
return coroutine_suspended;
}
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
System.out.println((Object) Intrinsics.stringPlus(LiveLiterals$CoroutinesTestModuleKt.INSTANCE.m3718x9ea3ba64(), Unit.INSTANCE));
return Unit.INSTANCE;
}
}
lifecycleScope.launch 中的suspend
通过上述的分析,我们知道协程的执行还是Loop线程+Queue,那么Android的UI线程怎么做,它本身已经是一个Loop线程了,协程会被抽象成Task或者数说runnable,既然UI线程已经有了Loop跟Queue,那直接提供给协程机制用就可以了,我们知道,核心是context对应的dispatcher,那么lifecycleScope对应的是谁呢?看一下两个扩展属性:
public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
public val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}
可以看到是SupervisorJob() + Dispatchers.Main.immediate,这里有个 +重载,最终落到
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
}
Android平台上
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
return HandlerContext(mainLooper.asHandler(async = true))
}
override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
最终是HandlerContext,可以看到它内含Handler,这两个是其他线程与UI线程切换的关键,因为UI线程的协程都能拿到HandlerContext,也就能被其他协程通过invokeOnCompletion回调的方式发射回到main线程
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
可以看到handler.post发射block。具体不再过多分析。总之还是一个Loop+一个Queue,对应相应的context 【Dispatcher】, withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T)函数也是同样道理它主要做的就是上下文切换,withContext发起的协程在参数context限定的上下文执行,然后切回原来的congtext,withContext是有返回值的,其实跟asyn await类似,只不过做了进一步的封装。
Java自己实现协程类的做法??
kotlin的协程看起来是个新概念,其核心实现还是Java的封装,它好像干掉了回调,但是其实不过是封装成了阻塞,但是Java中如果是阻塞,则必定会牵扯Task及线程,由于Java不是函数式语言,任务也必须封装成对象,所以使用起来有些繁琐,但是如果加以对比,其实发现Java也可以做到:
<!--Java框架中的实现-->
fun javaCoroutines() {
val executor: ExecutorService = Executors.newCachedThreadPool()
<!--类似于启动协程-->
executor.execute {
println(
<!--相比而言多了一层,因为需要获取一个可以阻塞等待的对象-->
executor.submit(Callable {
println("子线程在进行计算");
Thread.sleep(2000)
"结果" + System.currentTimeMillis()
}).get()
)
}
}
如果将协程函数看做是一个Task对象其实就很好跟Java对应起来,
suspend fun task() {
println("子线程在进行计算");
delay(2000)
"结果" + System.currentTimeMillis()
}
<!--kotlin实现-->
fun kotlinCoroutines() {
<!--kotlin的协程框架将上述Java的工作进行了封装,所以写起来简化-->
runBlocking(Dispatchers.IO) {
val result = task()
println(result)
}
}
通过对比其实可以看出:kotlin的协程在用法上,其实就是更好的Task封装,或者说少了一层,Java没法直接传递函数,只能用Runable封装一下。一个更有效的解释就是doAsync,类似一个Java跟kotlin之间的缓冲:
suspend与挂起的关系【suspend只会挂起协程,不会挂起线程】
默认情况自,自定义的函数是没必要添加suspend参数的,除非有suspend的可能比如内部调用的await,withcontext等协程框架中挂起的函数,如果加了会有提示冗余
所以只有挂起能力的才会添加suspend,换句话说,一定会变相调用kotlin库里的某个suspend函数,相应的内部函数一定存在自己的封装,牵扯到回调添加,任务的重新唤起插入等,如await,suspend只会挂起协程,不会挂起线程。
suspendCancellableCoroutine:将异步回调转换为不通调用,同时添加取消,携程被取消的时候,可以添加回调
suspend fun fetchDataCancellable(): String = suspendCancellableCoroutine { cont ->
// 开启异步,处理,并且,在结束后唤醒
try {
getNetWorkData(object : OnCallBack {
override fun onSuccess() {
cont.resumeWith(Result.success("success"))
}
override fun onFail() {
cont.resumeWith(Result.failure(Exception("fail")))
}
})
} catch (e: Exception) {
cont.resumeWithException(e)
}
// 添加取消回调,resumeWith 不会再回掉
cont.invokeOnCancellation {
println("Fetch data operation was cancelled")
}
}
使用 :
xx.launch{
ret = fetchDataCancellable()
if(ret == ""){
}
}
上述代码 将getNetWorkData 从异步改为协程同步,当然,其实协程的本质还是异步。只是借助写法,编译工具提升代码编译的效率。
总结
- launch后的每个协程Block都会被抽象是一个Function+suspendlambda协程Task对象,该协程中的每个suspend调用都是一个label分支,用于调用
- 协程的提交不会挂起,协程的执行可能挂起「如果中间有挂起函数」【好比是任务入队】
- launch是提交任务的过程,suspend调用是添加回调或者说另一个切换提交的过程
- 协程的执行可能会发生挂起或者阻塞线程
- Delay不一定立刻挂起,但是有很大可能会挂起线程【存在睡眠唤醒】
- 耗时操作只要不是在当前协程上下文执行,就不会造成当前协程线程的阻塞【会被转化成回调】
- kotlin协程最大的辅助机制是高阶函数,简化回调传递,运行核心还是将Block或者函数体抽象成对象
- suspend才只会挂起协程,不会阻塞线程 suspend全是回调
- 协程的返回值不是返回值,是回调入口