1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
|
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
import (
"internal/runtime/atomic"
"internal/runtime/sys"
"unsafe"
)
// A synctestBubble is a set of goroutines started by synctest.Run.
type synctestBubble struct {
mu mutex
timers timers
id uint64 // unique id
now int64 // current fake time
root *g // caller of synctest.Run
waiter *g // caller of synctest.Wait
main *g // goroutine started by synctest.Run
waiting bool // true if a goroutine is calling synctest.Wait
done bool // true if main has exited
// The bubble is active (not blocked) so long as running > 0 || active > 0.
//
// running is the number of goroutines which are not "durably blocked":
// Goroutines which are either running, runnable, or non-durably blocked
// (for example, blocked in a syscall).
//
// active is used to keep the bubble from becoming blocked,
// even if all goroutines in the bubble are blocked.
// For example, park_m can choose to immediately unpark a goroutine after parking it.
// It increments the active count to keep the bubble active until it has determined
// that the park operation has completed.
total int // total goroutines
running int // non-blocked goroutines
active int // other sources of activity
}
// changegstatus is called when the non-lock status of a g changes.
// It is never called with a Gscanstatus.
func (bubble *synctestBubble) changegstatus(gp *g, oldval, newval uint32) {
// Determine whether this change in status affects the idleness of the bubble.
// If this isn't a goroutine starting, stopping, durably blocking,
// or waking up after durably blocking, then return immediately without
// locking bubble.mu.
//
// For example, stack growth (newstack) will changegstatus
// from _Grunning to _Gcopystack. This is uninteresting to synctest,
// but if stack growth occurs while bubble.mu is held, we must not recursively lock.
totalDelta := 0
wasRunning := true
switch oldval {
case _Gdead:
wasRunning = false
totalDelta++
case _Gwaiting:
if gp.waitreason.isIdleInSynctest() {
wasRunning = false
}
}
isRunning := true
switch newval {
case _Gdead:
isRunning = false
totalDelta--
if gp == bubble.main {
bubble.done = true
}
case _Gwaiting:
if gp.waitreason.isIdleInSynctest() {
isRunning = false
}
}
// It's possible for wasRunning == isRunning while totalDelta != 0;
// for example, if a new goroutine is created in a non-running state.
if wasRunning == isRunning && totalDelta == 0 {
return
}
lock(&bubble.mu)
bubble.total += totalDelta
if wasRunning != isRunning {
if isRunning {
bubble.running++
} else {
bubble.running--
if raceenabled && newval != _Gdead {
// Record that this goroutine parking happens before
// any subsequent Wait.
racereleasemergeg(gp, bubble.raceaddr())
}
}
}
if bubble.total < 0 {
fatal("total < 0")
}
if bubble.running < 0 {
fatal("running < 0")
}
wake := bubble.maybeWakeLocked()
unlock(&bubble.mu)
if wake != nil {
goready(wake, 0)
}
}
// incActive increments the active-count for the bubble.
// A bubble does not become durably blocked while the active-count is non-zero.
func (bubble *synctestBubble) incActive() {
lock(&bubble.mu)
bubble.active++
unlock(&bubble.mu)
}
// decActive decrements the active-count for the bubble.
func (bubble *synctestBubble) decActive() {
lock(&bubble.mu)
bubble.active--
if bubble.active < 0 {
throw("active < 0")
}
wake := bubble.maybeWakeLocked()
unlock(&bubble.mu)
if wake != nil {
goready(wake, 0)
}
}
// maybeWakeLocked returns a g to wake if the bubble is durably blocked.
func (bubble *synctestBubble) maybeWakeLocked() *g {
if bubble.running > 0 || bubble.active > 0 {
return nil
}
// Increment the bubble active count, since we've determined to wake something.
// The woken goroutine will decrement the count.
// We can't just call goready and let it increment bubble.running,
// since we can't call goready with bubble.mu held.
//
// Incrementing the active count here is only necessary if something has gone wrong,
// and a goroutine that we considered durably blocked wakes up unexpectedly.
// Two wakes happening at the same time leads to very confusing failure modes,
// so we take steps to avoid it happening.
bubble.active++
next := bubble.timers.wakeTime()
if next > 0 && next <= bubble.now {
// A timer is scheduled to fire. Wake the root goroutine to handle it.
return bubble.root
}
if gp := bubble.waiter; gp != nil {
// A goroutine is blocked in Wait. Wake it.
return gp
}
// All goroutines in the bubble are durably blocked, and nothing has called Wait.
// Wake the root goroutine.
return bubble.root
}
func (bubble *synctestBubble) raceaddr() unsafe.Pointer {
// Address used to record happens-before relationships created by the bubble.
//
// Wait creates a happens-before relationship between itself and
// the blocking operations which caused other goroutines in the bubble to park.
return unsafe.Pointer(bubble)
}
var bubbleGen atomic.Uint64 // bubble ID counter
//go:linkname synctestRun internal/synctest.Run
func synctestRun(f func()) {
if debug.asynctimerchan.Load() != 0 {
panic("synctest.Run not supported with asynctimerchan!=0")
}
gp := getg()
if gp.bubble != nil {
panic("synctest.Run called from within a synctest bubble")
}
bubble := &synctestBubble{
id: bubbleGen.Add(1),
total: 1,
running: 1,
root: gp,
}
const synctestBaseTime = 946684800000000000 // midnight UTC 2000-01-01
bubble.now = synctestBaseTime
lockInit(&bubble.mu, lockRankSynctest)
lockInit(&bubble.timers.mu, lockRankTimers)
gp.bubble = bubble
defer func() {
gp.bubble = nil
}()
// This is newproc, but also records the new g in bubble.main.
pc := sys.GetCallerPC()
systemstack(func() {
fv := *(**funcval)(unsafe.Pointer(&f))
bubble.main = newproc1(fv, gp, pc, false, waitReasonZero)
pp := getg().m.p.ptr()
runqput(pp, bubble.main, true)
wakep()
})
lock(&bubble.mu)
bubble.active++
for {
unlock(&bubble.mu)
systemstack(func() {
// Clear gp.m.curg while running timers,
// so timer goroutines inherit their child race context from g0.
curg := gp.m.curg
gp.m.curg = nil
gp.bubble.timers.check(bubble.now, bubble)
gp.m.curg = curg
})
gopark(synctestidle_c, nil, waitReasonSynctestRun, traceBlockSynctest, 0)
lock(&bubble.mu)
if bubble.active < 0 {
throw("active < 0")
}
next := bubble.timers.wakeTime()
if next == 0 {
break
}
if next < bubble.now {
throw("time went backwards")
}
if bubble.done {
// Time stops once the bubble's main goroutine has exited.
break
}
bubble.now = next
}
total := bubble.total
unlock(&bubble.mu)
if raceenabled {
// Establish a happens-before relationship between bubbled goroutines exiting
// and Run returning.
raceacquireg(gp, gp.bubble.raceaddr())
}
if total != 1 {
var reason string
if bubble.done {
reason = "deadlock: main bubble goroutine has exited but blocked goroutines remain"
} else {
reason = "deadlock: all goroutines in bubble are blocked"
}
panic(synctestDeadlockError{reason: reason, bubble: bubble})
}
if gp.timer != nil && gp.timer.isFake {
// Verify that we haven't marked this goroutine's sleep timer as fake.
// This could happen if something in Run were to call timeSleep.
throw("synctest root goroutine has a fake timer")
}
}
type synctestDeadlockError struct {
reason string
bubble *synctestBubble
}
func (e synctestDeadlockError) Error() string {
return e.reason
}
func synctestidle_c(gp *g, _ unsafe.Pointer) bool {
lock(&gp.bubble.mu)
canIdle := true
if gp.bubble.running == 0 && gp.bubble.active == 1 {
// All goroutines in the bubble have blocked or exited.
canIdle = false
} else {
gp.bubble.active--
}
unlock(&gp.bubble.mu)
return canIdle
}
//go:linkname synctestWait internal/synctest.Wait
func synctestWait() {
gp := getg()
if gp.bubble == nil {
panic("goroutine is not in a bubble")
}
lock(&gp.bubble.mu)
// We use a bubble.waiting bool to detect simultaneous calls to Wait rather than
// checking to see if bubble.waiter is non-nil. This avoids a race between unlocking
// bubble.mu and setting bubble.waiter while parking.
if gp.bubble.waiting {
unlock(&gp.bubble.mu)
panic("wait already in progress")
}
gp.bubble.waiting = true
unlock(&gp.bubble.mu)
gopark(synctestwait_c, nil, waitReasonSynctestWait, traceBlockSynctest, 0)
lock(&gp.bubble.mu)
gp.bubble.active--
if gp.bubble.active < 0 {
throw("active < 0")
}
gp.bubble.waiter = nil
gp.bubble.waiting = false
unlock(&gp.bubble.mu)
// Establish a happens-before relationship on the activity of the now-blocked
// goroutines in the bubble.
if raceenabled {
raceacquireg(gp, gp.bubble.raceaddr())
}
}
func synctestwait_c(gp *g, _ unsafe.Pointer) bool {
lock(&gp.bubble.mu)
if gp.bubble.running == 0 && gp.bubble.active == 0 {
// This shouldn't be possible, since gopark increments active during unlockf.
throw("running == 0 && active == 0")
}
gp.bubble.waiter = gp
unlock(&gp.bubble.mu)
return true
}
//go:linkname synctest_isInBubble internal/synctest.IsInBubble
func synctest_isInBubble() bool {
return getg().bubble != nil
}
//go:linkname synctest_acquire internal/synctest.acquire
func synctest_acquire() any {
if bubble := getg().bubble; bubble != nil {
bubble.incActive()
return bubble
}
return nil
}
//go:linkname synctest_release internal/synctest.release
func synctest_release(bubble any) {
bubble.(*synctestBubble).decActive()
}
//go:linkname synctest_inBubble internal/synctest.inBubble
func synctest_inBubble(bubble any, f func()) {
gp := getg()
if gp.bubble != nil {
panic("goroutine is already bubbled")
}
gp.bubble = bubble.(*synctestBubble)
defer func() {
gp.bubble = nil
}()
f()
}
// specialBubble is a special used to associate objects with bubbles.
type specialBubble struct {
_ sys.NotInHeap
special special
bubbleid uint64
}
// Keep these in sync with internal/synctest.
const (
bubbleAssocUnbubbled = iota // not associated with any bubble
bubbleAssocCurrentBubble // associated with the current bubble
bubbleAssocOtherBubble // associated with a different bubble
)
// getOrSetBubbleSpecial checks the special record for p's bubble membership.
//
// If add is true and p is not associated with any bubble,
// it adds a special record for p associating it with bubbleid.
//
// It returns ok==true if p is associated with bubbleid
// (including if a new association was added),
// and ok==false if not.
func getOrSetBubbleSpecial(p unsafe.Pointer, bubbleid uint64, add bool) (assoc int) {
span := spanOfHeap(uintptr(p))
if span == nil {
// This is probably a package var.
// We can't attach a special to it, so always consider it unbubbled.
return bubbleAssocUnbubbled
}
// Ensure that the span is swept.
// Sweeping accesses the specials list w/o locks, so we have
// to synchronize with it. And it's just much safer.
mp := acquirem()
span.ensureSwept()
offset := uintptr(p) - span.base()
lock(&span.speciallock)
// Find splice point, check for existing record.
iter, exists := span.specialFindSplicePoint(offset, _KindSpecialBubble)
if exists {
// p is already associated with a bubble.
// Return true iff it's the same bubble.
s := (*specialBubble)((unsafe.Pointer)(*iter))
if s.bubbleid == bubbleid {
assoc = bubbleAssocCurrentBubble
} else {
assoc = bubbleAssocOtherBubble
}
} else if add {
// p is not associated with a bubble,
// and we've been asked to add an association.
lock(&mheap_.speciallock)
s := (*specialBubble)(mheap_.specialBubbleAlloc.alloc())
unlock(&mheap_.speciallock)
s.bubbleid = bubbleid
s.special.kind = _KindSpecialBubble
s.special.offset = offset
s.special.next = *iter
*iter = (*special)(unsafe.Pointer(s))
spanHasSpecials(span)
assoc = bubbleAssocCurrentBubble
} else {
// p is not associated with a bubble.
assoc = bubbleAssocUnbubbled
}
unlock(&span.speciallock)
releasem(mp)
return assoc
}
// synctest_associate associates p with the current bubble.
// It returns false if p is already associated with a different bubble.
//
//go:linkname synctest_associate internal/synctest.associate
func synctest_associate(p unsafe.Pointer) int {
return getOrSetBubbleSpecial(p, getg().bubble.id, true)
}
// synctest_disassociate disassociates p from its bubble.
//
//go:linkname synctest_disassociate internal/synctest.disassociate
func synctest_disassociate(p unsafe.Pointer) {
removespecial(p, _KindSpecialBubble)
}
// synctest_isAssociated reports whether p is associated with the current bubble.
//
//go:linkname synctest_isAssociated internal/synctest.isAssociated
func synctest_isAssociated(p unsafe.Pointer) bool {
return getOrSetBubbleSpecial(p, getg().bubble.id, false) == bubbleAssocCurrentBubble
}
|