• G是goroutine实现的核心结构,它包含了栈,指令指针,以及其他对调度goroutine很重要的信息,例如其阻塞的channel。表示 Goroutine,它是一个待执行的任务

  • M结构是Machine,系统线程,它由操作系统管理,goroutine就是跑在M之上的;M是一个很大的结构,里面维护小对象内存cache(mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。M的PC寄存器会执行G提供的函数。表示操作系统的线程,它由操作系统的调度器调度和管理;

  • P结构是Processor,处理器

    ,它的主要用途就是用来执行goroutine,它维护了一个goroutine队列,即runqueue。Processor的让我们从N:1调度到M:N调度的重要部分。

    表示处理器,它可以被看做运行在线程上的本地调度器;

    • 本地队列:本地的队列是无锁的,没有数据竞争问题,处理速度比较高;
    • 全局队列:是用来平衡不同的P的任务数量,所有的M共享P的全局队列。

Processor的数量是在启动时被设置为环境变量GOMAXPROCS的值,或者通过运行时调度函数GOMAXPROCS()进行设置。Processor数量固定意味着任意时刻只有GOMAXPROCS个线程在运行着go代码

进程,线程和协程的区别

  1. 进程:系统进行资源分配的基本单位,每个进程有独立且隔离的内存空间;
  2. 线程:线程是CPU调度的基本单位,线程依赖于进程的存在,每个线程会共享父进程的资源;
  3. 协程:用户态的轻量级线程,协程的调度完全由用户控制,协程切换只需要保存任务的上下文,没有内核开销

数据结构

G

Goroutine 是 Go 语言调度器中待执行的任务,它在运行时调度器中的地位与线程在操作系统中差不多,但是它占用了更小的内存空间,也降低了上下文切换的开销。

Goroutine 只存在于 Go 语言的运行时,它是 Go 语言在用户态提供的线程,作为一种粒度更细的资源调度单元,如果使用得当能够在高并发的场景下更高效地利用机器的 CPU。

Goroutine 在 Go 语言运行时使用私有结构体 runtime.g 表示。这个私有结构体非常复杂,总共包含 40 多个用于表示各种状态的成员变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type g struct {
        stack       stack
        stackguard0 uintptr
        preempt       bool // 抢占信号
        preemptStop   bool // 抢占时将状态修改成 `_Gpreempted`
        preemptShrink bool // 在同步安全点收缩栈
        _panic       *_panic // 最内侧的 panic 结构体
        _defer       *_defer // 最内侧的延迟函数结构体
        m              *m  // 当前 Goroutine 占用的线程,可能为空;
        sched          gobuf // 存储 Goroutine 的调度相关的数据;
        atomicstatus   uint32 //Goroutine 的状态;
        goid           int64
}
type gobuf struct { //在调度器保存或者恢复上下文的时候用到,其中的栈指针和程序计数器会用来存储或者恢复寄存器中的值,改变程序即将执行的代码。
        sp   uintptr //栈指针
        pc   uintptr //程序计数器;
        g    guintptr // 持有 runtime.gobuf 的 Goroutine;
        ret  sys.Uintreg // 系统调用的返回值;
}

其中 stack 字段描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi),另一个字段 stackguard0 可以用于调度器抢占式调度。除了 stackguard0 之外,Goroutine 中还包含另外三个与抢占密切相关的字段:preempt、preemptStop、preemptShrink

结构体 runtime.gatomicstatus 字段存储了当前 Goroutine 的状态。除了几个已经不被使用的以及与 GC 相关的状态之外,Goroutine 可能处于以下 9 种状态:

状态 描述
_Gidle 刚刚被分配并且还没有被初始化
_Grunnable 没有执行代码,没有栈的所有权,存储在运行队列中
_Grunning 可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P
_Gsyscall 正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上
_Gwaiting 由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上
_Gdead 没有被使用,没有执行代码,可能有分配的栈
_Gcopystack 栈正在被拷贝,没有执行代码,不在运行队列上
_Gpreempted 由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒
_Gscan GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在

虽然 Goroutine 在运行时中定义的状态非常多而且复杂,但是我们可以将这些不同的状态聚合成三种:等待中、可运行、运行中,运行期间会在这三种状态来回切换:

  • 等待中:Goroutine 正在等待某些条件满足,例如:系统调用结束等,包括 _Gwaiting_Gsyscall_Gpreempted 几个状态;
  • 可运行:Goroutine 已经准备就绪,可以在线程运行,如果当前程序中有非常多的 Goroutine,每个 Goroutine 就可能会等待更多的时间,即 _Grunnable
  • 运行中:Goroutine 正在某个线程上运行,即 _Grunning

M

Go 语言并发模型中的 M 是操作系统线程。调度器最多可以创建 10000 个线程,但是其中大多数的线程都不会执行用户代码(可能陷入系统调用),最多只会有 GOMAXPROCS 个活跃线程能够正常运行。

在默认情况下,运行时会将 GOMAXPROCS 设置成当前机器的核数,我们也可以在程序中使用 runtime.GOMAXPROCS 来改变最大的活跃线程数。

在大多数情况下,我们都会使用 Go 的默认设置,也就是线程数等于 CPU 数,默认的设置不会频繁触发操作系统的线程调度和上下文切换,所有的调度都会发生在用户态,由 Go 语言调度器触发,能够减少很多额外开销。

1
2
3
4
5
6
7
type m struct {
        g0   *g
        curg *g
        p             puintptr
        nextp         puintptr
        oldp          puintptr
}

其中 g0 是持有调度栈的 Goroutine,curg 是在当前线程上运行的用户 Goroutine,这也是操作系统线程唯一关心的两个 Goroutine。

g0 是一个运行时中比较特殊的 Goroutine,它会深度参与运行时的调度过程,包括 Goroutine 的创建、大内存分配和 CGO 函数的执行。在后面的小节中,我们会经常看到 g0 的身影。

还存在三个与处理器相关的字段,它们分别表示正在运行代码的处理器 p、暂存的处理器 nextp 和执行系统调用之前使用线程的处理器 oldp

P

调度器中的处理器 P 是线程和 Goroutine 的中间层,它能提供线程需要的上下文环境,也会负责调度线程上的等待队列,通过处理器 P 的调度,每一个内核线程都能够执行多个 Goroutine,它能在 Goroutine 进行一些 I/O 操作时及时让出计算资源,提高线程的利用率。

因为调度器在启动时就会创建 GOMAXPROCS 个处理器,所以 Go 语言程序的处理器数量一定会等于 GOMAXPROCS,这些处理器会绑定到不同的内核线程上。

runtime.p 是处理器的运行时表示,作为调度器的内部实现,它包含的字段也非常多,其中包括与性能追踪、垃圾回收和计时器相关的字段,这些字段也非常重要,但是在这里就不展示了,我们主要关注处理器中的线程和运行队列:

1
2
3
4
5
6
7
8
9
type p struct {
        m           muintptr

        runqhead uint32
        runqtail uint32
        runq     [256]guintptr
        runnext guintptr
        ...
}

反向存储的线程维护着线程与处理器之间的关系,而 runqheadrunqtailrunq 三个字段表示处理器持有的运行队列,其中存储着待执行的 Goroutine 列表,runnext 中是线程下一个需要执行的 Goroutine。

runtime.p 结构体中的状态 status 字段会是以下五种中的一种:

状态 描述
_Pidle 处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空
_Prunning 被线程 M 持有,并且正在执行用户代码或者调度器
_Psyscall 没有执行用户代码,当前线程陷入系统调用
_Pgcstop 被线程 M 持有,当前处理器由于垃圾回收被停止
_Pdead 当前处理器已经不被使用

通过分析处理器 P 的状态,我们能够对处理器的工作过程有一些简单理解,例如处理器在执行用户代码时会处于 _Prunning 状态,在当前线程执行 I/O 操作时会陷入 _Psyscall 状态。

原理

单线程调度器

0.x 版本调度器只包含表示 Goroutine 的 G 和表示线程的 M 两种结构,全局也只有一个线程。我们可以在 clean up scheduler 提交中找到单线程调度器的源代码,在这时 Go 语言的调度器还是由 C 语言实现的,调度函数 runtime.scheduler:9682400 也只包含 40 多行代码 :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void scheduler(void) {
        G* gp;
        lock(&sched);

        if(gosave(&m->sched)){
                lock(&sched);
                gp = m->curg;
                switch(gp->status){
                case Grunnable:
                case Grunning:
                        gp->status = Grunnable;
                        gput(gp);
                        break;
                ...
                }
                notewakeup(&gp->stopped);
        }

        gp = nextgandunlock();
        noteclear(&gp->stopped);
        gp->status = Grunning;
        m->curg = gp;
        g = gp;
        gogo(&gp->sched);
}

该函数会遵循如下的过程调度 Goroutine:

  1. 获取调度器的全局锁;
  2. 调用 runtime.gosave:9682400 保存栈寄存器和程序计数器;
  3. 调用 runtime.nextgandunlock:9682400 获取下一个需要运行的 Goroutine 并解锁调度器;
  4. 修改全局线程 m 上要执行的 Goroutine;
  5. 调用 runtime.gogo:9682400 函数运行最新的 Goroutine;

虽然这个单线程调度器的唯一优点就是能运行,但是这次提交已经包含了 G 和 M 两个重要的数据结构,也建立了 Go 语言调度器的框架。

多线程调度器

Go 语言在 1.0 版本正式发布时就支持了多线程的调度器,与上一个版本几乎不可用的调度器相比,Go 语言团队在这一阶段实现了从不可用到可用的跨越。我们可以在 pkg/runtime/proc.c 文件中找到 1.0.1 版本的调度器,多线程版本的调度函数 runtime.schedule:go1.0.1 包含 70 多行代码,我们在这里保留了该函数的核心逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void schedule(G *gp) {
        schedlock();
        if(gp != nil) {
                gp->m = nil;
                uint32 v = runtime·xadd(&runtime·sched.atomic, -1<<mcpuShift);
                if(atomic_mcpu(v) > maxgomaxprocs)
                        runtime·throw("negative mcpu in scheduler");

                switch(gp->status){
                case Grunning:
                        gp->status = Grunnable;
                        gput(gp);
                        break;
                case ...:
                }
        } else {
                ...
        }
        gp = nextgandunlock();
        gp->status = Grunning;
        m->curg = gp;
        gp->m = m;
        runtime·gogo(&gp->sched, 0);
}

整体的逻辑与单线程调度器没有太多区别,因为我们的程序中可能同时存在多个活跃线程,所以多线程调度器引入了 GOMAXPROCS 变量帮助我们灵活控制程序中的最大处理器数,即活跃线程数。

多线程调度器的主要问题是调度时的锁竞争会严重浪费资源,Scalable Go Scheduler Design Doc 中对调度器做的性能测试发现 14% 的时间都花费在 runtime.futex:go1.0.1 上,该调度器有以下问题需要解决:

  1. 调度器和锁是全局资源,所有的调度状态都是中心化存储的,锁竞争问题严重;
  2. 线程需要经常互相传递可运行的 Goroutine,引入了大量的延迟;
  3. 每个线程都需要处理内存缓存,导致大量的内存占用并影响数据局部性;
  4. 系统调用频繁阻塞和解除阻塞正在运行的线程,增加了额外开销;
  • io 与系统调用不能混淆,一个 io 过程可能包括多次系统调用。
  • (经过一个系统调用发现文件描述符还未可用而)阻塞的 io 首先会导致 G 的挂起,此时 G 与 M 分离,且不在任何 P 的运行队列中,当前的 P 会调度下一个 G,这个阶段不涉及新线程的创建。
  • 被 io 挂起的 G 由网络轮询器维护,直到文件描述符可用。
  • 网络轮询器既会被(在独立线程中的)系统监控 Goroutine 触发,也会被其他各个活跃线程上的 Goroutine 触发。
  • 当文件描述符可用时,G 会重新加入到原来的 P 中等待被调度。
  • 当 G 被重新调度时,会重新发起读/写系统调用。
  • 当 G 进行系统调用的时候,对应的 M 和 P 也阻塞在系统调用,并不会立刻发生抢占,只有当这个阻塞持续时间过长(10 ms)时,才会将 P(及之上的其他 G)抢占并分配到空闲的 M 上,此时如果没有空闲的,才会创建新的线程。

通过以上过程可见,密集的 io 通常并不会产生过多的线程。

任务窃取调度器

2012 年 Google 的工程师 Dmitry Vyukov 在 Scalable Go Scheduler Design Doc 中指出了现有多线程调度器的问题并在多线程调度器上提出了两个改进的手段:

  1. 在当前的 G-M 模型中引入了处理器 P,增加中间层;
  2. 在处理器 P 的基础上实现基于工作窃取的调度器;

基于任务窃取的 Go 语言调度器使用了沿用至今的 G-M-P 模型,我们能在 runtime: improved scheduler 提交中找到任务窃取调度器刚被实现时的源代码,调度器的 runtime.schedule:779c45a 在这个版本的调度器中反而更简单了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
static void schedule(void) {
    G *gp;
 top:
    if(runtime·gcwaiting) {
        gcstopm();
        goto top;
    }

    gp = runqget(m->p);
    if(gp == nil)
        gp = findrunnable();

    ...

    execute(gp);
}
  1. 如果当前运行时在等待垃圾回收,调用 runtime.gcstopm:779c45a 函数;
  2. 调用 runtime.runqget:779c45aruntime.findrunnable:779c45a 从本地或者全局的运行队列中获取待执行的 Goroutine;
  3. 调用 runtime.execute:779c45a 在当前线程 M 上运行 Goroutine;

当前处理器本地的运行队列中不包含 Goroutine 时,调用 runtime.findrunnable:779c45a 会触发工作窃取,从其它的处理器的队列中随机获取一些 Goroutine。

运行时 G-M-P 模型中引入的处理器 P 是线程和 Goroutine 的中间层,我们从它的结构体中就能看到处理器与 M 和 G 的关系:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
struct P {
	Lock;

	uint32	status;
	P*	link;
	uint32	tick;
	M*	m;
	MCache*	mcache;

	G**	runq;
	int32	runqhead;
	int32	runqtail;
	int32	runqsize;

	G*	gfree;
	int32	gfreecnt;
};

处理器持有一个由可运行的 Goroutine 组成的环形的运行队列 runq,还反向持有一个线程。调度器在调度时会从处理器的队列中选择队列头的 Goroutine 放到线程 M 上执行。

基于工作窃取的多线程调度器将每一个线程绑定到了独立的 CPU 上,这些线程会被不同处理器管理,不同的处理器通过工作窃取对任务进行再分配实现任务的平衡,也能提升调度器和 Go 语言程序的整体性能。

抢占式调度

对 Go 语言并发模型的修改提升了调度器的性能,但是 1.1 版本中的调度器仍然不支持抢占式调度,程序只能依靠 Goroutine 主动让出 CPU 资源才能触发调度。Go 语言的调度器在 1.2 版本中引入基于协作的抢占式调度解决下面的问题:

  • 某些 Goroutine 可以长时间占用线程,造成其它 Goroutine 的饥饿;
  • 垃圾回收需要暂停整个程序(Stop-the-world,STW),最长可能需要几分钟的时间,导致整个程序无法工作;

1.2 版本的抢占式调度虽然能够缓解这个问题,但是它实现的抢占式调度是基于协作的,在之后很长的一段时间里 Go 语言的调度器都有一些无法被抢占的边缘情况,例如:for 循环或者垃圾回收长时间占用线程,这些问题中的一部分直到 1.14 才被基于信号的抢占式调度解决。

基于协作的抢占式调度

可以在 pkg/runtime/proc.c 文件中找到引入基于协作的抢占式调度后的调度器。Go 语言会在分段栈的机制上实现抢占调度,利用编译器在分段栈上插入的函数,所有 Goroutine 在函数调用时都有机会进入运行时检查是否需要执行抢占。Go 团队通过以下的多个提交实现该特性:

  • runtime: add stackguard0 to G
    • 为 Goroutine 引入 stackguard0 字段,该字段被设置成 StackPreempt 意味着当前 Goroutine 发出了抢占请求;
  • runtime: introduce preemption function (not used for now)
  • runtime: preempt goroutines for GC
  • runtime: preempt long-running goroutines
  • runtime: more reliable preemption
    • 修复 Goroutine 因为周期性执行非阻塞的 CGO 或者系统调用不会被抢占的问题;

上面的多个提交实现了抢占式调度,但是还缺少最关键的一个环节 — 编译器如何在函数调用前插入函数,我们能在非常古老的提交 runtime: stack growth adjustments, cleanup 中找到编译器插入函数的雏形,最新版本的 Go 语言会通过 cmd/internal/obj/x86.stacksplit 插入 runtime.morestack,该函数可能会调用 runtime.newstack 触发抢占。从上面的多个提交中,我们能归纳出基于协作的抢占式调度的工作原理:

  1. 编译器会在调用函数前插入 runtime.morestack
  2. Go 语言运行时会在垃圾回收暂停程序、系统监控发现 Goroutine 运行超过 10ms 时发出抢占请求 StackPreempt
  3. 当发生函数调用时,可能会执行编译器插入的 runtime.morestack,它调用的 runtime.newstack 会检查 Goroutine 的 stackguard0 字段是否为 StackPreempt
  4. 如果 stackguard0StackPreempt,就会触发抢占让出当前线程;

这种实现方式虽然增加了运行时的复杂度,但是实现相对简单,也没有带来过多的额外开销,总体来看还是比较成功的实现,也在 Go 语言中使用了 10 几个版本。因为这里的抢占是通过编译器插入函数实现的,还是需要函数调用作为入口才能触发抢占,所以这是一种协作式的抢占式调度

基于信号的抢占式调度

基于协作的抢占式调度虽然实现巧妙,但是并不完备,我们能在 runtime: non-cooperative goroutine preemption 中找到一些遗留问题:

Go 语言在 1.14 版本中实现了非协作的抢占式调度,在实现的过程中我们重构已有的逻辑并为 Goroutine 增加新的状态和字段来支持抢占。Go 团队通过下面的一系列提交实现了这一功能,我们可以按时间顺序分析相关提交理解它的工作原理:

  • runtime: add general suspendG/resumeG
    • 挂起 Goroutine 的过程是在垃圾回收的栈扫描时完成的,我们通过 runtime.suspendGruntime.resumeG 两个函数重构栈扫描这一过程;
    • 调用 runtime.suspendG 时会将处于运行状态的 Goroutine 的 preemptStop 标记成 true
    • 调用 runtime.preemptPark 可以挂起当前 Goroutine、将其状态更新成 _Gpreempted 并触发调度器的重新调度,该函数能够交出线程控制权;
  • runtime: asynchronous preemption function for x86
  • runtime: use signals to preempt Gs for suspendG
  • runtime: implement async scheduler preemption

目前的抢占式调度也只会在垃圾回收扫描任务时触发,我们可以梳理一下上述代码实现的抢占式调度过程:

  1. 程序启动时,在 runtime.sighandler 中注册 SIGURG 信号的处理函数 runtime.doSigPreempt
  2. 在触发垃圾回收的栈扫描时会调用runtime.suspendG挂起 Goroutine,该函数会执行下面的逻辑:
    1. _Grunning 状态的 Goroutine 标记成可以被抢占,即将 preemptStop 设置成 true
    2. 调用 runtime.preemptM 触发抢占;
  3. runtime.preemptM 会调用 runtime.signalM 向线程发送信号 SIGURG
  4. 操作系统会中断正在运行的线程并执行预先注册的信号处理函数 runtime.doSigPreempt
  5. runtime.doSigPreempt 函数会处理抢占信号,获取当前的 SP 和 PC 寄存器并调用 runtime.sigctxt.pushCall
  6. runtime.sigctxt.pushCall 会修改寄存器并在程序回到用户态时执行 runtime.asyncPreempt
  7. 汇编指令 runtime.asyncPreempt 会调用运行时函数 runtime.asyncPreempt2
  8. runtime.asyncPreempt2 会调用 runtime.preemptPark
  9. runtime.preemptPark 会修改当前 Goroutine 的状态到 _Gpreempted 并调用 runtime.schedule 让当前函数陷入休眠并让出线程,调度器会选择其它的 Goroutine 继续执行;

上述 9 个步骤展示了基于信号的抢占式调度的执行过程。除了分析抢占的过程之外,我们还需要讨论一下抢占信号的选择,提案根据以下的四个原因选择 SIGURG 作为触发异步抢占的信号;

  1. 该信号需要被调试器透传;
  2. 该信号不会被内部的 libc 库使用并拦截;
  3. 该信号可以随意出现并且不触发任何后果;
  4. 我们需要处理多个平台上的不同信号;

STW 和栈扫描是一个可以抢占的安全点(Safe-points),所以 Go 语言会在这里先加入抢占功能。基于信号的抢占式调度只解决了垃圾回收和栈扫描时存在的问题,它到目前为止没有解决所有问题,但是这种真抢占式调度是调度器走向完备的开始,相信在未来我们会在更多的地方触发抢占。

调度器启动

调度器的启动过程是我们平时比较难以接触的过程,不过作为程序启动前的准备工作,理解调度器的启动过程对我们理解调度器的实现原理很有帮助,运行时通过 runtime.schedinit 初始化调度器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func schedinit() {
	_g_ := getg()
	...

	sched.maxmcount = 10000

	...
	sched.lastpoll = uint64(nanotime())
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
}

在调度器初始函数执行的过程中会将 maxmcount 设置成 10000,这也就是一个 Go 语言程序能够创建的最大线程数,虽然最多可以创建 10000 个线程,但是可以同时运行的线程还是由 GOMAXPROCS 变量控制。

我们从环境变量 GOMAXPROCS 获取了程序能够同时运行的最大处理器数之后就会调用 runtime.procresize 更新程序中处理器的数量,在这时整个程序不会执行任何用户 Goroutine,调度器也会进入锁定状态,runtime.procresize 的执行过程如下:

  1. 如果全局变量 allp 切片中的处理器数量少于期望数量,会对切片进行扩容;
  2. 使用 new 创建新的处理器结构体并调用 runtime.p.init 初始化刚刚扩容的处理器;
  3. 通过指针将线程 m0 和处理器 allp[0] 绑定到一起;
  4. 调用 runtime.p.destroy 释放不再使用的处理器结构;
  5. 通过截断改变全局变量 allp 的长度保证与期望处理器数量相等;
  6. 将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局的空闲队列中;

调用 runtime.procresize 是调度器启动的最后一步,在这一步过后调度器会完成相应数量处理器的启动,等待用户创建运行新的 Goroutine 并为 Goroutine 调度处理器资源。

创建 Goroutine

想要启动一个新的 Goroutine 来执行任务时,我们需要使用 Go 语言的 go 关键字,编译器会通过 cmd/compile/internal/gc.state.stmtcmd/compile/internal/gc.state.call 两个方法将该关键字转换成 runtime.newproc 函数调用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (s *state) call(n *Node, k callKind) *ssa.Value {
	if k == callDeferStack {
		...
	} else {
		switch {
		case k == callGo:
			call = s.newValue1A(ssa.OpStaticCall, types.TypeMem, newproc, s.mem())
		default:
		}
	}
	...
}

runtime.newproc 的入参是参数大小和表示函数的指针 funcval,它会获取 Goroutine 以及调用方的程序计数器,然后调用 runtime.newproc1 函数获取新的 Goroutine 结构体、将其加入处理器的运行队列并在满足条件时调用 runtime.wakep 唤醒新的处理执行 Goroutine:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})
}

runtime.newproc1 会根据传入参数初始化一个 g 结构体,我们可以将该函数分成以下几个部分介绍它的实现:

  1. 获取或者创建新的 Goroutine 结构体;
  2. 将传入的参数移到 Goroutine 的栈上;
  3. 更新 Goroutine 调度相关的属性;

首先是 Goroutine 结构体的创建过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()
	siz := narg
	siz = (siz + 7) &^ 7

	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg)
	}
	...

上述代码会先从处理器的 gFree 列表中查找空闲的 Goroutine,如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体。

接下来,我们会调用 runtime.memmovefn 函数的所有参数拷贝到栈上,argpnarg 分别是参数的内存空间和大小,我们在该方法中会将参数对应的内存空间整块拷贝到栈上:

1
2
3
4
5
6
7
8
9
	...
	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
	totalSize += -totalSize & (sys.SpAlign - 1)
	sp := newg.stack.hi - totalSize
	spArg := sp
	if narg > 0 {
		memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
	}
	...

拷贝了栈上的参数之后,runtime.newproc1 会设置新的 Goroutine 结构体的参数,包括栈指针、程序计数器并更新其状态到 _Grunnable 并返回:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
	...
	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.startpc = fn.fn
	casgstatus(newg, _Gdead, _Grunnable)
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	return newg
}

初始化结构体

runtime.gfget 通过两种不同的方式获取新的 runtime.g

  1. 从 Goroutine 所在处理器的 gFree 列表或者调度器的 sched.gFree 列表中获取 runtime.g
  2. 调用 runtime.malg 生成一个新的 runtime.g 并将结构体追加到全局的 Goroutine 列表 allgs 中。

runtime.gfget 中包含两部分逻辑,它会根据处理器中 gFree 列表中 Goroutine 的数量做出不同的决策:

  1. 当处理器的 Goroutine 列表为空时,会将调度器持有的空闲 Goroutine 转移到当前处理器上,直到 gFree 列表中的 Goroutine 数量达到 32;
  2. 当处理器的 Goroutine 数量充足时,会从列表头部返回一个新的 Goroutine;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func gfget(_p_ *p) *g {
retry:
	if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
		for _p_.gFree.n < 32 {
			gp := sched.gFree.stack.pop()
			if gp == nil {
				gp = sched.gFree.noStack.pop()
				if gp == nil {
					break
				}
			}
			_p_.gFree.push(gp)
		}
		goto retry
	}
	gp := _p_.gFree.pop()
	if gp == nil {
		return nil
	}
	return gp
}

当调度器的 gFree 和处理器的 gFree 列表都不存在结构体时,运行时会调用 runtime.malg 初始化新的 runtime.g 结构,如果申请的堆栈大小大于 0,这里会通过 runtime.stackalloc 分配 2KB 的栈空间:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func malg(stacksize int32) *g {
	newg := new(g)
	if stacksize >= 0 {
		stacksize = round2(_StackSystem + stacksize)
		newg.stack = stackalloc(uint32(stacksize))
		newg.stackguard0 = newg.stack.lo + _StackGuard
		newg.stackguard1 = ^uintptr(0)
	}
	return newg
}

runtime.malg 返回的 Goroutine 会存储到全局变量 allgs 中。

简单总结一下,runtime.newproc1 会从处理器或者调度器的缓存中获取新的结构体,也可以调用 runtime.malg 函数创建。

运行队列

runtime.runqput 会将 Goroutine 放到运行队列上,这既可能是全局的运行队列,也可能是处理器本地的运行队列:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func runqput(_p_ *p, gp *g, next bool) {
	if next {
	retryNext:
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		gp = oldnext.ptr()
	}
retry:
	h := atomic.LoadAcq(&_p_.runqhead)
	t := _p_.runqtail
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1)
		return
	}
	if runqputslow(_p_, gp, h, t) {
		return
	}
	goto retry
}
  1. nexttrue 时,将 Goroutine 设置到处理器的 runnext 作为下一个处理器执行的任务;
  2. nextfalse 并且本地运行队列还有剩余空间时,将 Goroutine 加入处理器持有的本地运行队列;
  3. 当处理器的本地运行队列已经没有剩余空间时就会把本地队列中的一部分 Goroutine 和待加入的 Goroutine 通过 runtime.runqputslow 添加到调度器持有的全局运行队列上;

处理器本地的运行队列是一个使用数组构成的环形链表,它最多可以存储 256 个待执行任务。

Go 语言有两个运行队列,其中一个是处理器本地的运行队列,另一个是调度器持有的全局运行队列,只有在本地运行队列没有剩余空间时才会使用全局队列。

调度信息

运行时创建 Goroutine 时会通过下面的代码设置调度相关的信息,前两行代码会分别将程序计数器和 Goroutine 设置成 runtime.goexit 和新创建 Goroutine 运行的函数:

1
2
3
4
5
	...
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	...

上述调度信息 sched 不是初始化后的 Goroutine 的最终结果,它还需要经过 runtime.gostartcallfnruntime.gostartcall 的处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func gostartcallfn(gobuf *gobuf, fv *funcval) {
	gostartcall(gobuf, unsafe.Pointer(fv.fn), unsafe.Pointer(fv))
}

func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
	sp := buf.sp
	if sys.RegSize > sys.PtrSize {
		sp -= sys.PtrSize
		*(*uintptr)(unsafe.Pointer(sp)) = 0
	}
	sp -= sys.PtrSize
	*(*uintptr)(unsafe.Pointer(sp)) = buf.pc
	buf.sp = sp
	buf.pc = uintptr(fn)
	buf.ctxt = ctxt
}

调度信息的 sp 中存储了 runtime.goexit 函数的程序计数器,而 pc 中存储了传入函数的程序计数器。因为 pc 寄存器的作用就是存储程序接下来运行的位置,所以 pc 的使用比较好理解,但是 sp 中存储的 runtime.goexit 会让人感到困惑,我们需要配合下面的调度循环来理解它的作用。

循环调度

调度器启动之后,Go 语言运行时会调用 runtime.mstart 以及 runtime.mstart1,前者会初始化 g0 的 stackguard0stackguard1 字段,后者会初始化线程并调用 runtime.schedule 进入调度循环:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func schedule() {
	_g_ := getg()

top:
	var gp *g
	var inheritTime bool

	if gp == nil {
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	if gp == nil {
		gp, inheritTime = runqget(_g_.m.p.ptr())
	}
	if gp == nil {
		gp, inheritTime = findrunnable()
	}

	execute(gp, inheritTime)
}

runtime.schedule 函数会从下面几个地方查找待执行的 Goroutine:

  1. 为了保证公平,当全局运行队列中有待执行的 Goroutine 时,通过 schedtick 保证有一定几率会从全局的运行队列中查找对应的 Goroutine;
  2. 从处理器本地的运行队列中查找待执行的 Goroutine;
  3. 如果前两种方法都没有找到 Goroutine,会通过 runtime.findrunnable 进行阻塞地查找 Goroutine;

runtime.findrunnable 的实现非常复杂,这个 300 多行的函数通过以下的过程获取可运行的 Goroutine:

  1. 从本地运行队列、全局运行队列中查找;
  2. 从网络轮询器中查找是否有 Goroutine 等待运行;
  3. 通过 runtime.runqsteal 尝试从其他随机的处理器中窃取待运行的 Goroutine,该函数还可能窃取处理器的计时器;

因为函数的实现过于复杂,上述的执行过程是经过简化的,总而言之,当前函数一定会返回一个可执行的 Goroutine,如果当前不存在就会阻塞等待。

接下来由 runtime.execute 执行获取的 Goroutine,做好准备工作后,它会通过 runtime.gogo 将 Goroutine 调度到当前线程上。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func execute(gp *g, inheritTime bool) {
	_g_ := getg()

	_g_.m.curg = gp
	gp.m = _g_.m
	casgstatus(gp, _Grunnable, _Grunning)
	gp.waitsince = 0
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	if !inheritTime {
		_g_.m.p.ptr().schedtick++
	}

	gogo(&gp.sched)
}

runtime.gogo 在不同处理器架构上的实现都不同,但是也都大同小异,下面是该函数在 386 架构上的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
TEXT runtime·gogo(SB), NOSPLIT, $8-4
	MOVL buf+0(FP), BX     // 获取调度信息
	MOVL gobuf_g(BX), DX
	MOVL 0(DX), CX         // 保证 Goroutine 不为空
	get_tls(CX)
	MOVL DX, g(CX)
	MOVL gobuf_sp(BX), SP  // 将 runtime.goexit 函数的 PC 恢复到 SP 中
	MOVL gobuf_ret(BX), AX
	MOVL gobuf_ctxt(BX), DX
	MOVL $0, gobuf_sp(BX)
	MOVL $0, gobuf_ret(BX)
	MOVL $0, gobuf_ctxt(BX)
	MOVL gobuf_pc(BX), BX  // 获取待执行函数的程序计数器
	JMP  BX                // 开始执行

它从 runtime.gobuf 中取出了 runtime.goexit 的程序计数器和待执行函数的程序计数器,其中:

  • runtime.goexit 的程序计数器被放到了栈 SP 上;
  • 待执行函数的程序计数器被放到了寄存器 BX 上;

在函数调用一节中,我们曾经介绍过 Go 语言的调用惯例,正常的函数调用都会使用 CALL 指令,该指令会将调用方的返回地址加入栈寄存器 SP 中,然后跳转到目标函数;当目标函数返回后,会从栈中查找调用的地址并跳转回调用方继续执行剩下的代码。

经过一系列复杂的函数调用,我们最终在当前线程的 g0 的栈上调用 runtime.goexit0 函数,该函数会将 Goroutine 转换会 _Gdead 状态、清理其中的字段、移除 Goroutine 和线程的关联并调用 runtime.gfput 重新加入处理器的 Goroutine 空闲列表 gFree

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func goexit0(gp *g) {
	_g_ := getg()

	casgstatus(gp, _Grunning, _Gdead)
	gp.m = nil
	...
	gp.param = nil
	gp.labels = nil
	gp.timer = nil

	dropg()
	gfput(_g_.m.p.ptr(), gp)
	schedule()
}

在最后 runtime.goexit0 会重新调用 runtime.schedule 触发新一轮的 Goroutine 调度,Go 语言中的运行时调度循环会从 runtime.schedule 开始,最终又回到 runtime.schedule,我们可以认为调度循环永远都不会返回。

触发调度

主动挂起

runtime.gopark 是触发调度最常见的方法,该函数会将当前 Goroutine 暂停,被暂停的任务不会放回运行队列,我们来分析该函数的实现原理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
	mp := acquirem()
	gp := mp.curg
	mp.waitlock = lock
	mp.waitunlockf = unlockf
	gp.waitreason = reason
	mp.waittraceev = traceEv
	mp.waittraceskip = traceskip
	releasem(mp)
	mcall(park_m)
}

上述会通过 runtime.mcall 切换到 g0 的栈上调用 runtime.park_m

1
2
3
4
5
6
7
8
func park_m(gp *g) {
	_g_ := getg()

	casgstatus(gp, _Grunning, _Gwaiting)
	dropg()

	schedule()
}

runtime.park_m 会将当前 Goroutine 的状态从 _Grunning 切换至 _Gwaiting,调用 runtime.dropg 移除线程和 Goroutine 之间的关联,在这之后就可以调用 runtime.schedule 触发新一轮的调度了。

当 Goroutine 等待的特定条件满足后,运行时会调用 runtime.goready 将因为调用 runtime.gopark 而陷入休眠的 Goroutine 唤醒。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func goready(gp *g, traceskip int) {
	systemstack(func() {
		ready(gp, traceskip, true)
	})
}

func ready(gp *g, traceskip int, next bool) {
	_g_ := getg()

	casgstatus(gp, _Gwaiting, _Grunnable)
	runqput(_g_.m.p.ptr(), gp, next)
	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
		wakep()
	}
}

runtime.ready 会将准备就绪的 Goroutine 的状态切换至 _Grunnable 并将其加入处理器的运行队列中,等待调度器的调度。

系统调用

系统调用也会触发运行时调度器的调度,为了处理特殊的系统调用,我们甚至在 Goroutine 中加入了 _Gsyscall 状态,Go 语言通过 syscall.Syscallsyscall.RawSyscall 等使用汇编语言编写的方法封装操作系统提供的所有系统调用,其中 syscall.Syscall 的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#define INVOKE_SYSCALL	INT	$0x80

TEXT ·Syscall(SB),NOSPLIT,$0-28
	CALL	runtime·entersyscall(SB)
	...
	INVOKE_SYSCALL
	...
	CALL	runtime·exitsyscall(SB)
	RET
ok:
	...
	CALL	runtime·exitsyscall(SB)
	RET

在通过汇编指令 INVOKE_SYSCALL 执行系统调用前后,上述函数会调用运行时的 runtime.entersyscallruntime.exitsyscall,正是这一层包装能够让我们在陷入系统调用前触发运行时的准备和清理工作。

协作式调度

协作式调度runtime.Gosched 函数会主动让出处理器,允许其他 Goroutine 运行。该函数无法挂起 Goroutine,调度器可能会将当前 Goroutine 调度到其他线程上:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func Gosched() {
	checkTimeouts()
	mcall(gosched_m)
}

func gosched_m(gp *g) {
	goschedImpl(gp)
}

func goschedImpl(gp *g) {
	casgstatus(gp, _Grunning, _Grunnable)
	dropg()
	lock(&sched.lock)
	globrunqput(gp)
	unlock(&sched.lock)

	schedule()
}

经过连续几次跳转,我们最终在 g0 的栈上调用 runtime.goschedImpl,运行时会更新 Goroutine 的状态到 _Grunnable,让出当前的处理器并将 Goroutine 重新放回全局队列,在最后,该函数会调用 runtime.schedule 触发调度。