Golang中GMP模型深入探究

GMP 模型是 Go 语言中的一种调度模型,用于管理 Go 程序中的并发执行。GMP 代表 Goroutines、M(OS 线程,Machine)和 P(Processor,逻辑处理器)。这种模型使得 Go 语言可以高效地调度和执行大量的 goroutine

GMP

G

  • goroutine,协程的抽象。
  • g需要绑定到p才能执行。

M

  • machine,是golang中对线程的抽象
  • m不直接执行g,而是先和p绑定,由其实现代理

P

  • processor,golang中的调度器
  • p的数量决定了g最大的并发数量,可以通过GOMAXPROCS进行设置。

G

数据结构

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
type g struct {
	// stack: 表示实际的堆栈内存区间:[stack.lo, stack.hi)。
	stack stack

    // ...
	
	// 指向最里面的 panic 对象。
	_panic *_panic

	// 指向最里面的 defer 对象。
	_defer *_defer

	// 当前关联的 m 结构体。
	m *m

	// 保存 goroutine 调度相关的上下文。
	sched gobuf

	// 如果状态为 Gsyscall,则保存系统调用的堆栈指针以便 GC 使用。
	syscallsp uintptr

	// 如果状态为 Gsyscall,则保存系统调用的程序计数器以便 GC 使用。
	syscallpc uintptr

	// 堆栈顶部预期的堆栈指针,用于 traceback 检查。
	stktopsp uintptr

	// 通用指针参数字段,用于在特定上下文中传递值。
	param unsafe.Pointer

	// goroutine 的原子状态。
	atomicstatus uint32

	// : sigprof/scang 锁,用于同步访问堆栈。
	stackLock uint32

	//  goroutine的唯一标识符。
	goid int64

	// 在调度队列中的链接。
	schedlink guintptr

	// goroutine 阻塞的近似时间。
	waitsince int64

	// 如果状态为 Gwaiting,则表示阻塞原因。
	waitreason waitReason

	// 抢占信号,类似于 stackguard0 = stackpreempt。
	preempt bool

	// 在抢占时转换为 _Gpreempted,否则只是取消调度。
	preemptStop bool

	// 在同步安全点缩小堆栈。
	preemptShrink bool

	// 表示 g 是否在异步安全点停止。
	asyncSafePoint bool

	// 在意外错误地址上触发 panic(而不是崩溃)。
	paniconfault bool

	// g 已扫描堆栈,受状态中的 _Gscan 位保护。
	gcscandone bool

	// 禁止拆分堆栈。
	throwsplit bool

	// 表示是否有未锁定的通道指向此 goroutine 的堆栈。
	activeStackChans bool

	// 表示 goroutine 是否即将停在 chansend 或 chanrecv 上。
	parkingOnChan uint8

	// 忽略竞争检测事件。
	raceignore int8

	// StartTrace 已发出关于此 goroutine 的 EvGoInSyscall 事件。
	sysblocktraced bool

	// 是否跟踪此 goroutine 的调度延迟统计。
	tracking bool

	// 用于决定是否跟踪此 goroutine。
	trackingSeq uint8

	// 此 goroutine 上次变为可运行状态的时间戳,仅在跟踪时使用。
	runnableStamp int64

	// 处于可运行状态的时间,在运行时清除,仅在跟踪时使用。
	runnableTime int64

	// 系统调用返回时的 CPU 时钟周期(用于跟踪)。
	sysexitticks int64

	// 跟踪事件序列器。
	traceseq uint64

	// 最后一个为此 goroutine 发出事件的 P。
	tracelastp puintptr

	// 当前锁定的 m 结构体指针。
	lockedm muintptr

	// 信号值。
	sig uint32

	// 写缓冲区。
	writebuf []byte

	// ...

	// 信号发生时的程序计数器。
	sigpc uintptr

	// 创建此 goroutine 的 go 语句的程序计数器。
	gopc uintptr

	// 创建此 goroutine 的祖先信息(仅在 debug.tracebackancestors 时使用)。
	ancestors *[]ancestorInfo

	// goroutine 函数的起始程序计数器。
	startpc uintptr

	// 竞争检测上下文。
	racectx uintptr

	// g 正在等待的 sudog 结构(具有有效的 elem 指针)。
	waiting *sudog

	// cgo 回溯上下文。
	cgoCtxt []uintptr

	// 分析器标签。
	labels unsafe.Pointer

	// 缓存的计时器,用于 time.Sleep。
	timer *timer

	// 表示是否正在参与 select 操作以及是否有人赢得了竞争。
	selectDone uint32

    // ...
}

g的生命周期

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const (
	// G status
    
	// 协程开始创建的状态,此时还未初始化完成
	_Gidle = iota // 0
    // 协程再待执行队列中,等待被执行
	_Grunnable // 1
    // 协程正在执行,同一时刻一个p中只能有一个g处于这个状态
	_Grunning // 2
    // 协程正在执行系统调用
	_Gsyscall // 3
    // 协程处于挂起状态,需要等待被唤醒。gc,channel通信,锁操作的时候会进入这个状态。
	_Gwaiting // 4

	_Gmoribund_unused // 5

    // 协程刚初始化完成,或者已经被销毁的时候的状态
	_Gdead // 6

	_Genqueue_unused // 7
	_Gcopystack // 8
	_Gpreempted // 9
	_Gscan          = 0x1000
	_Gscanrunnable  = _Gscan + _Grunnable  // 0x1001
	_Gscanrunning   = _Gscan + _Grunning   // 0x1002
	_Gscansyscall   = _Gscan + _Gsyscall   // 0x1003
	_Gscanwaiting   = _Gscan + _Gwaiting   // 0x1004
	_Gscanpreempted = _Gscan + _Gpreempted // 0x1009
)

M

数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
type m struct {
    g0               *g               // 调度堆栈上的 goroutine。
    
    morebuf          gobuf            // 传递给 morestack 的 gobuf 参数。
    // ...
    procid           uint64           // 用于调试器,但偏移量没有硬编码。
    gsignal          *g               // 处理信号的 goroutine。
    goSigStack       gsignalStack     // Go 分配的信号处理堆栈。
    sigmask          sigset           // 保存的信号掩码。
    
    tls              [tlsSlots]uintptr // 线程本地存储(对于 x86 extern 寄存器)。
    
    mstartfn         func()           // 启动函数。
    curg             *g               // 当前运行的 goroutine。
    caughtsig        guintptr         // 处理致命信号时正在运行的 goroutine。
    
    p                puintptr         // 执行 Go 代码时附加的 P(如果不执行 Go 代码则为 nil)。
    
    nextp            puintptr         // 下一个 P。
    oldp             puintptr         // 执行系统调用前附加的 P。
    id               int64            // 线程 ID。
    mallocing        int32            // 正在进行的内存分配操作数。
    throwing         throwType        // 抛出异常的类型。
    preemptoff       string           // 如果不为空,保持当前 goroutine 在此 m 上运行。
    locks            int32            // 持有的锁的数量。
    dying            int32            // 正在终止的标志。
    profilehz        int32            // 性能分析的频率。
    spinning         bool             // m 是否无工作并积极寻找工作。
    blocked          bool             // m 是否在 note 上阻塞。
    newSigstack      bool             // 在 C 线程上调用 sigaltstack 的 minit 标志。
    printlock        int8             // 打印锁。
    incgo            bool             // m 是否正在执行 cgo 调用。
    freeWait         uint32           // 如果为 0,则安全释放 g0 并删除 m(原子操作)。
    fastrand         uint64           // 快速随机数种子。
    needextram       bool             // 是否需要额外的 m。
    traceback        uint8            // 是否进行回溯。
    ncgocall         uint64           // 总的 cgo 调用次数。
    ncgo             int32            // 当前正在进行的 cgo 调用次数。
    cgoCallersUse    uint32           // 如果非零,则 cgoCallers 临时使用。
    cgoCallers       *cgoCallers      // 如果在 cgo 调用中崩溃,则保存 cgo 回溯。
    park             note             // 用于 goroutine 的 park。
    alllink          *m               // 在 allm 链表上。
    schedlink        muintptr         // 在调度链表上。
    lockedg          guintptr         // 当前锁定的 goroutine。
    createstack      [32]uintptr      // 创建此线程的堆栈。
    lockedExt        uint32           // 跟踪外部 LockOSThread 的状态。
    lockedInt        uint32           // 跟踪内部 lockOSThread 的状态。
    nextwaitm        muintptr         // 等待锁的下一个 m。
    waitunlockf      func(*g, unsafe.Pointer) bool // 等待解锁的函数。
    waitlock         unsafe.Pointer   // 等待解锁的指针。
    waittraceev      byte             // 等待解锁的跟踪事件。
    waittraceskip    int              // 等待解锁的跟踪跳过数。
    startingtrace    bool             // 开始跟踪的标志。
    syscalltick      uint32           // 系统调用的时钟周期。
    freelink         *m               // 在 sched.freem 链表上。
    libcall          libcall          // 用于低级别 NOSPLIT 函数的参数。
    libcallpc        uintptr          // 用于 CPU 性能分析的程序计数器。
    libcallsp        uintptr          // 用于 CPU 性能分析的堆栈指针。
    libcallg         guintptr         // 用于 CPU 性能分析的 goroutine。
    syscall          libcall          // 存储 Windows 上的系统调用参数。
    vdsoSP           uintptr          // 在 VDSO 调用中进行回溯时的堆栈指针(如果不在调用中则为 0)。
    vdsoPC           uintptr          // 在 VDSO 调用中进行回溯时的程序计数器。
    preemptGen       uint32           // 记录完成的抢占信号次数,用于检测请求抢占但失败的情况,原子访问。
    signalPending    uint32           // 是否有待处理的抢占信号,原子访问。
    dlogPerM                         // 用于调试日志的字段。
    mOS                               // 操作系统相关字段。
    locksHeldLen     int              // 此 m 持有的锁的数量(最多 10 个),由锁排序代码维护。
    // ...
}
  • g0是一类特殊的goroutine,不用与执行用户方法,负责执行g之间的切换调度,与m的关系比例为1:1

P

数据类型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
type p struct {
	id          int32          // 处理器的唯一标识符
	status      uint32         // 处理器的当前状态,比如 idle、running 等
	link        puintptr       // 链接到下一个处理器,用于维护处理器链表
	schedtick   uint32         // 调度器调用计数器,每次调度调用时递增
	syscalltick uint32         // 系统调用计数器,每次系统调用时递增
	sysmontick  sysmontick     // sysmon(系统监控器)最后一次观察到的tick值
	m           muintptr       // 关联的 OS 线程(m 结构体),如果空闲则为 nil
	mcache      *mcache        // 内存分配缓存
	pcache      pageCache      // 页面缓存
	raceprocctx uintptr        // 用于数据竞争检测的上下文

	deferpool    []*_defer      // 可用的 defer 结构体池
	deferpoolbuf [32]*_defer    // 缓存的 defer 结构体

	goidcache    uint64         // goroutine ID 缓存,减少对全局 ID 生成器的访问
	goidcacheend uint64         // goroutine ID 缓存的结束位置

	runqhead uint32             // 可运行 goroutine 队列的头部索引
	runqtail uint32             // 可运行 goroutine 队列的尾部索引
	runq     [256]guintptr      // 可运行 goroutine 队列
	runnext  guintptr           // 下一个要运行的 goroutine,如果不为空,则在当前 goroutine 运行完后立即运行

	gFree struct {              // 可用 goroutine 列表(状态为 Gdead)
		gList
		n int32
	}

	sudogcache []*sudog         // sudog 结构体缓存
	sudogbuf   [128]*sudog      // 缓存的 sudog 结构体

	mspancache struct {         // mspan 对象缓存
		len int                // 缓存的长度
		buf [128]*mspan        // 缓存的 mspan 对象
	}

	tracebuf traceBufPtr        // 跟踪缓冲区

	traceSweep bool             // 是否跟踪 sweep 事件
	traceSwept, traceReclaimed uintptr // 跟踪当前 sweep 循环中被 sweep 和回收的字节数

	palloc persistentAlloc      // 持久化分配器

	_ uint32 // 对齐字段,确保后面的字段是原子的

	timer0When uint64           // 定时器堆中第一个条目的时间
	timerModifiedEarliest uint64 // 最早的修改过的定时器的时间

	gcAssistTime         int64  // 在 assistAlloc 中花费的纳秒数
	gcFractionalMarkTime int64  // 在 fractional mark worker 中花费的纳秒数

	limiterEvent limiterEvent   // GC CPU 限制器事件

	gcMarkWorkerMode gcMarkWorkerMode // 下一个 mark worker 的模式
	gcMarkWorkerStartTime int64      // 最近一个 mark worker 开始的时间

	gcw gcWork                  // 当前处理器的 GC 工作缓冲区
	wbBuf wbBuf                 // 当前处理器的写屏障缓冲区

	runSafePointFn uint32       // 如果为 1,在下一个安全点运行 sched.safePointFn

	statsSeq uint32             // 统计序列计数器,偶数表示当前没有写入统计,奇数表示正在写入统计

	timersLock mutex            // 定时器锁
	timers []*timer             // 定时器数组,必须持有 timersLock 访问
	numTimers uint32            // 定时器堆中的定时器数量(使用原子操作修改)
	deletedTimers uint32        // 定时器堆中的已删除定时器数量(使用原子操作修改)

	timerRaceCtx uintptr        // 执行定时器函数时使用的竞争上下文

	maxStackScanDelta int64     // 存活 goroutine 持有的堆栈空间的累计量
	scannedStackSize uint64     // 当前 goroutine 的堆栈大小
	scannedStacks    uint64     // 当前 goroutine 的数量

	preempt bool                // 如果设置为 true,表示这个处理器应尽快进入调度器
}

Schedt

全局的runq队列

数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
type schedt struct {
	
	goidgen   uint64           // 全局唯一的 goroutine ID 生成器。
	lastpoll  uint64           // 上次网络轮询的时间,如果当前正在轮询则为 0。
	pollUntil uint64           // 当前轮询睡眠到的时间。

	lock      mutex            // 调度器的全局锁。

	midle        muintptr       // 空闲的 M 列表,等待工作。
	nmidle       int32          // 等待工作的空闲 M 的数量。
	nmidlelocked int32          // 等待工作的锁定 M 的数量。
	mnext        int64          // 已创建的 M 的数量和下一个 M ID。
	maxmcount    int32          // 允许的最大 M 数量(或死锁)。
	nmsys        int32          // 不计入死锁检测的系统 M 的数量。
	nmfreed      int64          // 累计释放的 M 的数量。

	ngsys uint32                // 系统 goroutines 的数量,原子更新。

	pidle      puintptr         // 空闲的 P 列表。
	npidle     uint32           // 空闲 P 的数量。
	nmspinning uint32           // 查看 proc.go 中的 "Worker thread parking/unparking" 注释。

	runq     gQueue             // 全局可运行的 G 队列。
	runqsize int32              // 全局可运行队列中的 G 数量。

	disable struct {
		// user disables scheduling of user goroutines.
		user     bool         // 用户禁用调度用户 goroutine。
		runnable gQueue       // 待处理的可运行 G 队列。
		n        int32        // 待处理的可运行 G 数量。
	}
	
	gFree struct {
		lock    mutex          // 锁保护 gFree 访问。
		stack   gList          // 有堆栈的 G 列表。
		noStack gList          // 无堆栈的 G 列表。
		n       int32          // 总数。
	}
	
	sudoglock  mutex          // 锁保护 sudogcache 访问。
	sudogcache *sudog         // 可用 sudog 结构的中央缓存。
	
	deferlock mutex           // 锁保护 deferpool 访问。
	deferpool *_defer         // 可用 defer 结构的中央池。
	
	freem *m                 // 等待被释放的 M 列表。

	gcwaiting  uint32        // GC 等待运行的标志。
	stopwait   int32         // 停止等待计数器。
	stopnote   note          // 停止等待通知。
	sysmonwait uint32        // 系统监视器等待标志。
	sysmonnote note          // 系统监视器等待通知。

	safePointFn   func(*p)    // 在下一个 GC 安全点时在每个 P 上调用的函数。
	safePointWait int32       // 安全点等待计数器。
	safePointNote note        // 安全点等待通知。

	profilehz int32           // CPU 性能分析率。

	procresizetime int64      // 上次 gomaxprocs 变更的时间(纳秒)。
	totaltime      int64      // ∫gomaxprocs dt 到 procresizetime。

	sysmonlock mutex          // 保护 sysmon 操作运行时的锁。

	timeToRun timeHistogram   // 调度延迟的分布,定义为 G 在 _Grunnable 状态中花费的时间总和。
}
  • runq:全局goroutine队列
  • runqsize:全局goroutine队列的容量

调度流程

调度指的是g0按照特定的策略找到下一个可执行的g的过程。

两种g的切换

  • func gogo():由g0切换到g
  • func m_call():由g切换为g0

调度类型

主动调度

被动调度

正常调度

抢占调度

找到可执行的g

findRunnable()

获取可调度的g。

该部分代码主要在runtime/proc.go#findRunnable()的方法中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
	_g_ := getg()

	// 这里和handoffp中的条件必须一致:
	// 如果findrunnable将返回一个要运行的G,handoffp必须启动一个M。

top:
	// ...

	// 偶尔检查全局可运行队列,以确保公平性。
	// 否则,两个goroutine可以通过不断重启彼此来完全占据本地运行队列。
	if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
		lock(&sched.lock)
        
        // 如果已经在本地队列中check了61次了,则这次从全局队列中查找
		gp = globrunqget(_p_, 1)
        
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}

	// 从本地队列中获取一个可运行的g
	if gp, inheritTime := runqget(_p_); gp != nil {
		return gp, inheritTime, false
	}

	// 如果本地队列中没有获取到,则从全局runq中获取一个可运行的g
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}

    // 查找并唤醒一些io协程
	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
		if list := netpoll(0); !list.empty() { // non-blocking
			gp := list.pop()
			injectglist(&list)
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.enabled {
				traceGoUnpark(gp, 0)
			}
			return gp, false, false
		}
	}

	// Spinning Ms: steal work from other Ps.
	//
	// Limit the number of spinning Ms to half the number of busy Ps.
	// This is necessary to prevent excessive CPU consumption when
	// GOMAXPROCS>>1 but the program parallelism is low.
	procs := uint32(gomaxprocs)
	if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
		if !_g_.m.spinning {
			_g_.m.spinning = true
			atomic.Xadd(&sched.nmspinning, 1)
		}

        // work-stealing操作,尝试从别的p中的本地队列中窃取别的p的一半的g过来。
		gp, inheritTime, tnow, w, newWork := stealWork(now)
		now = tnow
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false
		}
		if newWork {
			// There may be new timer or GC work; restart to
			// discover.
			goto top
		}
		if w != 0 && (pollUntil == 0 || w < pollUntil) {
			// Earlier timer to wait for.
			pollUntil = w
		}
	}

	// ...
	
}

stealWork()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
	pp := getg().m.p.ptr()

	ranTimer := false

    // 最多发起4次尝试
	const stealTries = 4
	for i := 0; i < stealTries; i++ {
		stealTimersOrRunNextG := i == stealTries-1

        // fastrand():保证是随机挑选的p
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				return nil, false, now, pollUntil, true
			}
			p2 := allp[enum.position()]
			if pp == p2 {
				continue
			}

			
			if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
				tnow, w, ran := checkTimers(p2, now)
				now = tnow
				if w != 0 && (pollUntil == 0 || w < pollUntil) {
					pollUntil = w
				}
				if ran {
					
					if gp, inheritTime := runqget(pp); gp != nil {
						return gp, inheritTime, now, pollUntil, ranTimer
					}
					ranTimer = true
				}
			}

		
			if !idlepMask.read(enum.position()) {
                // 这里会窃取一半的g
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}
	}

	
	return nil, false, now, pollUntil, ranTimer
}



func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
	for {
		h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
		t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
		n := t - h
		n = n - n/2
		if n == 0 {
			if stealRunNextG {
				// Try to steal from _p_.runnext.
				if next := _p_.runnext; next != 0 {
					if _p_.status == _Prunning {
						if GOOS != "windows" && GOOS != "openbsd" && GOOS != "netbsd" {
							usleep(3)
						} else {
							
							osyield()
						}
					}
					if !_p_.runnext.cas(next, 0) {
						continue
					}
					batch[batchHead%uint32(len(batch))] = next
					return 1
				}
			}
			return 0
		}
		if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
			continue
		}
		for i := uint32(0); i < n; i++ {
			g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
			batch[(batchHead+i)%uint32(len(batch))] = g
		}
		if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
			return n
		}
	}
}

work-stealing:当p无法从给自己的本地队列和全局队列中获取可运行的g的时候,就会尝试从别的p窃取一半的g到自己的本地队列。

执行g

execute()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func execute(gp *g, inheritTime bool) {
	_g_ := getg()

	// ...

	_g_.m.curg = gp
	gp.m = _g_.m
    
    // 使用cas将g的状态从runnable改成running
	casgstatus(gp, _Grunnable, _Grunning)
	gp.waitsince = 0
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	if !inheritTime {
		_g_.m.p.ptr().schedtick++
	}

	// ...


    // 执行了gogo()方法之后,执行权将由g0切换到g
	gogo(&gp.sched)
}

调度中常见的方法

Gosched

该方法执行后,执行全将由g切换回g0

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func Gosched() {
	checkTimeouts()
    
    // call了gosched_m这个方法
	mcall(gosched_m)
}


func gosched_m(gp *g) {
	// ...
	goschedImpl(gp)
}


func goschedImpl(gp *g) {
	status := readgstatus(gp)
	if status&^_Gscan != _Grunning {
		dumpgstatus(gp)
		throw("bad g status")
	}
    
    // 将当前运行的g的状态从running改为runnable
	casgstatus(gp, _Grunning, _Grunnable)
    
    // 解绑g和m
	dropg()
    
    // 加锁,并将当前的g放入全局队列中
	lock(&sched.lock)
	globrunqput(gp)
	unlock(&sched.lock)

	schedule()
}

gopark

执行权将从g切换到g0

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
	if reason != waitReasonSleep {
		checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
	}
	mp := acquirem()
	gp := mp.curg
	status := readgstatus(gp)
    
    // 检查当前g的运行状态是否正确
	if status != _Grunning && status != _Gscanrunning {
		throw("gopark: bad g status")
	}
    
    // 保存一些状态
	mp.waitlock = lock
	mp.waitunlockf = unlockf
	gp.waitreason = reason
	mp.waittraceev = traceEv
	mp.waittraceskip = traceskip
	releasem(mp)

    // 调用park_m
	mcall(park_m)
}



func park_m(gp *g) {
	_g_ := getg()

    // ...

    // 将当前正在执行的g的状态从running改成waiting
	casgstatus(gp, _Grunning, _Gwaiting)
    
    // 将g和m解绑
	dropg()

	if fn := _g_.m.waitunlockf; fn != nil {
		ok := fn(gp, _g_.m.waitlock)
		_g_.m.waitunlockf = nil
		_g_.m.waitlock = nil
		if !ok {
			// ...
            
			casgstatus(gp, _Gwaiting, _Grunnable)
			execute(gp, true) // Schedule it back, never returns.
		}
	}
    
    // 进入新的一轮调度流程
	schedule()
}

从上面的代码可以看出,在g进入了waiting状态的时候,这个g没有进入本地队列,也没有进入全局队列,那么该如何唤醒这个在waiting状态的g呢?


相关内容

0%