diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-11-24 13:14:34 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-11-24 14:46:00 +0300 |
commit | 87f7fceed34bcafb8aaff351dd493a35c916986f (patch) | |
tree | 26809ec8f550aba8eb019e59adc3d48e51913eb2 /contrib/go/_std_1.18/src/sync | |
parent | 11bc4015b8010ae201bf3eb33db7dba425aca35e (diff) | |
download | ydb-87f7fceed34bcafb8aaff351dd493a35c916986f.tar.gz |
Ydb stable 22-4-4322.4.43
x-stable-origin-commit: 8d49d46cc834835bf3e50870516acd7376a63bcf
Diffstat (limited to 'contrib/go/_std_1.18/src/sync')
-rw-r--r-- | contrib/go/_std_1.18/src/sync/atomic/asm.s | 85 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/atomic/doc.go | 144 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/atomic/value.go | 194 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/cond.go | 98 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/map.go | 386 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/mutex.go | 250 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/once.go | 70 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/pool.go | 294 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/poolqueue.go | 309 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/runtime.go | 57 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/runtime2.go | 19 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/rwmutex.go | 223 | ||||
-rw-r--r-- | contrib/go/_std_1.18/src/sync/waitgroup.go | 147 |
13 files changed, 2276 insertions, 0 deletions
diff --git a/contrib/go/_std_1.18/src/sync/atomic/asm.s b/contrib/go/_std_1.18/src/sync/atomic/asm.s new file mode 100644 index 0000000000..2022304665 --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/atomic/asm.s @@ -0,0 +1,85 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !race + +#include "textflag.h" + +TEXT ·SwapInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchg(SB) + +TEXT ·SwapUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchg(SB) + +TEXT ·SwapInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchg64(SB) + +TEXT ·SwapUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchg64(SB) + +TEXT ·SwapUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchguintptr(SB) + +TEXT ·CompareAndSwapInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Cas(SB) + +TEXT ·CompareAndSwapUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Cas(SB) + +TEXT ·CompareAndSwapUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Casuintptr(SB) + +TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Cas64(SB) + +TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Cas64(SB) + +TEXT ·AddInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadd(SB) + +TEXT ·AddUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadd(SB) + +TEXT ·AddUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadduintptr(SB) + +TEXT ·AddInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadd64(SB) + +TEXT ·AddUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadd64(SB) + +TEXT ·LoadInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Load(SB) + +TEXT ·LoadUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Load(SB) + +TEXT ·LoadInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Load64(SB) + +TEXT ·LoadUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Load64(SB) + +TEXT ·LoadUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Loaduintptr(SB) + +TEXT ·LoadPointer(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Loadp(SB) + +TEXT ·StoreInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Store(SB) + +TEXT ·StoreUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Store(SB) + +TEXT ·StoreInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Store64(SB) + +TEXT ·StoreUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Store64(SB) + +TEXT ·StoreUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Storeuintptr(SB) diff --git a/contrib/go/_std_1.18/src/sync/atomic/doc.go b/contrib/go/_std_1.18/src/sync/atomic/doc.go new file mode 100644 index 0000000000..805ef956d5 --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/atomic/doc.go @@ -0,0 +1,144 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package atomic provides low-level atomic memory primitives +// useful for implementing synchronization algorithms. +// +// These functions require great care to be used correctly. +// Except for special, low-level applications, synchronization is better +// done with channels or the facilities of the sync package. +// Share memory by communicating; +// don't communicate by sharing memory. +// +// The swap operation, implemented by the SwapT functions, is the atomic +// equivalent of: +// +// old = *addr +// *addr = new +// return old +// +// The compare-and-swap operation, implemented by the CompareAndSwapT +// functions, is the atomic equivalent of: +// +// if *addr == old { +// *addr = new +// return true +// } +// return false +// +// The add operation, implemented by the AddT functions, is the atomic +// equivalent of: +// +// *addr += delta +// return *addr +// +// The load and store operations, implemented by the LoadT and StoreT +// functions, are the atomic equivalents of "return *addr" and +// "*addr = val". +// +package atomic + +import ( + "unsafe" +) + +// BUG(rsc): On 386, the 64-bit functions use instructions unavailable before the Pentium MMX. +// +// On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. +// +// On ARM, 386, and 32-bit MIPS, it is the caller's responsibility +// to arrange for 64-bit alignment of 64-bit words accessed atomically. +// The first word in a variable or in an allocated struct, array, or slice can +// be relied upon to be 64-bit aligned. + +// SwapInt32 atomically stores new into *addr and returns the previous *addr value. +func SwapInt32(addr *int32, new int32) (old int32) + +// SwapInt64 atomically stores new into *addr and returns the previous *addr value. +func SwapInt64(addr *int64, new int64) (old int64) + +// SwapUint32 atomically stores new into *addr and returns the previous *addr value. +func SwapUint32(addr *uint32, new uint32) (old uint32) + +// SwapUint64 atomically stores new into *addr and returns the previous *addr value. +func SwapUint64(addr *uint64, new uint64) (old uint64) + +// SwapUintptr atomically stores new into *addr and returns the previous *addr value. +func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) + +// SwapPointer atomically stores new into *addr and returns the previous *addr value. +func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) + +// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value. +func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) + +// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value. +func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) + +// CompareAndSwapUint32 executes the compare-and-swap operation for a uint32 value. +func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) + +// CompareAndSwapUint64 executes the compare-and-swap operation for a uint64 value. +func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) + +// CompareAndSwapUintptr executes the compare-and-swap operation for a uintptr value. +func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) + +// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value. +func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) + +// AddInt32 atomically adds delta to *addr and returns the new value. +func AddInt32(addr *int32, delta int32) (new int32) + +// AddUint32 atomically adds delta to *addr and returns the new value. +// To subtract a signed positive constant value c from x, do AddUint32(&x, ^uint32(c-1)). +// In particular, to decrement x, do AddUint32(&x, ^uint32(0)). +func AddUint32(addr *uint32, delta uint32) (new uint32) + +// AddInt64 atomically adds delta to *addr and returns the new value. +func AddInt64(addr *int64, delta int64) (new int64) + +// AddUint64 atomically adds delta to *addr and returns the new value. +// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)). +// In particular, to decrement x, do AddUint64(&x, ^uint64(0)). +func AddUint64(addr *uint64, delta uint64) (new uint64) + +// AddUintptr atomically adds delta to *addr and returns the new value. +func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) + +// LoadInt32 atomically loads *addr. +func LoadInt32(addr *int32) (val int32) + +// LoadInt64 atomically loads *addr. +func LoadInt64(addr *int64) (val int64) + +// LoadUint32 atomically loads *addr. +func LoadUint32(addr *uint32) (val uint32) + +// LoadUint64 atomically loads *addr. +func LoadUint64(addr *uint64) (val uint64) + +// LoadUintptr atomically loads *addr. +func LoadUintptr(addr *uintptr) (val uintptr) + +// LoadPointer atomically loads *addr. +func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) + +// StoreInt32 atomically stores val into *addr. +func StoreInt32(addr *int32, val int32) + +// StoreInt64 atomically stores val into *addr. +func StoreInt64(addr *int64, val int64) + +// StoreUint32 atomically stores val into *addr. +func StoreUint32(addr *uint32, val uint32) + +// StoreUint64 atomically stores val into *addr. +func StoreUint64(addr *uint64, val uint64) + +// StoreUintptr atomically stores val into *addr. +func StoreUintptr(addr *uintptr, val uintptr) + +// StorePointer atomically stores val into *addr. +func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) diff --git a/contrib/go/_std_1.18/src/sync/atomic/value.go b/contrib/go/_std_1.18/src/sync/atomic/value.go new file mode 100644 index 0000000000..88315f2d88 --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/atomic/value.go @@ -0,0 +1,194 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package atomic + +import ( + "unsafe" +) + +// A Value provides an atomic load and store of a consistently typed value. +// The zero value for a Value returns nil from Load. +// Once Store has been called, a Value must not be copied. +// +// A Value must not be copied after first use. +type Value struct { + v any +} + +// ifaceWords is interface{} internal representation. +type ifaceWords struct { + typ unsafe.Pointer + data unsafe.Pointer +} + +// Load returns the value set by the most recent Store. +// It returns nil if there has been no call to Store for this Value. +func (v *Value) Load() (val any) { + vp := (*ifaceWords)(unsafe.Pointer(v)) + typ := LoadPointer(&vp.typ) + if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) { + // First store not yet completed. + return nil + } + data := LoadPointer(&vp.data) + vlp := (*ifaceWords)(unsafe.Pointer(&val)) + vlp.typ = typ + vlp.data = data + return +} + +var firstStoreInProgress byte + +// Store sets the value of the Value to x. +// All calls to Store for a given Value must use values of the same concrete type. +// Store of an inconsistent type panics, as does Store(nil). +func (v *Value) Store(val any) { + if val == nil { + panic("sync/atomic: store of nil value into Value") + } + vp := (*ifaceWords)(unsafe.Pointer(v)) + vlp := (*ifaceWords)(unsafe.Pointer(&val)) + for { + typ := LoadPointer(&vp.typ) + if typ == nil { + // Attempt to start first store. + // Disable preemption so that other goroutines can use + // active spin wait to wait for completion. + runtime_procPin() + if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { + runtime_procUnpin() + continue + } + // Complete first store. + StorePointer(&vp.data, vlp.data) + StorePointer(&vp.typ, vlp.typ) + runtime_procUnpin() + return + } + if typ == unsafe.Pointer(&firstStoreInProgress) { + // First store in progress. Wait. + // Since we disable preemption around the first store, + // we can wait with active spinning. + continue + } + // First store completed. Check type and overwrite data. + if typ != vlp.typ { + panic("sync/atomic: store of inconsistently typed value into Value") + } + StorePointer(&vp.data, vlp.data) + return + } +} + +// Swap stores new into Value and returns the previous value. It returns nil if +// the Value is empty. +// +// All calls to Swap for a given Value must use values of the same concrete +// type. Swap of an inconsistent type panics, as does Swap(nil). +func (v *Value) Swap(new any) (old any) { + if new == nil { + panic("sync/atomic: swap of nil value into Value") + } + vp := (*ifaceWords)(unsafe.Pointer(v)) + np := (*ifaceWords)(unsafe.Pointer(&new)) + for { + typ := LoadPointer(&vp.typ) + if typ == nil { + // Attempt to start first store. + // Disable preemption so that other goroutines can use + // active spin wait to wait for completion; and so that + // GC does not see the fake type accidentally. + runtime_procPin() + if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { + runtime_procUnpin() + continue + } + // Complete first store. + StorePointer(&vp.data, np.data) + StorePointer(&vp.typ, np.typ) + runtime_procUnpin() + return nil + } + if typ == unsafe.Pointer(&firstStoreInProgress) { + // First store in progress. Wait. + // Since we disable preemption around the first store, + // we can wait with active spinning. + continue + } + // First store completed. Check type and overwrite data. + if typ != np.typ { + panic("sync/atomic: swap of inconsistently typed value into Value") + } + op := (*ifaceWords)(unsafe.Pointer(&old)) + op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data) + return old + } +} + +// CompareAndSwap executes the compare-and-swap operation for the Value. +// +// All calls to CompareAndSwap for a given Value must use values of the same +// concrete type. CompareAndSwap of an inconsistent type panics, as does +// CompareAndSwap(old, nil). +func (v *Value) CompareAndSwap(old, new any) (swapped bool) { + if new == nil { + panic("sync/atomic: compare and swap of nil value into Value") + } + vp := (*ifaceWords)(unsafe.Pointer(v)) + np := (*ifaceWords)(unsafe.Pointer(&new)) + op := (*ifaceWords)(unsafe.Pointer(&old)) + if op.typ != nil && np.typ != op.typ { + panic("sync/atomic: compare and swap of inconsistently typed values") + } + for { + typ := LoadPointer(&vp.typ) + if typ == nil { + if old != nil { + return false + } + // Attempt to start first store. + // Disable preemption so that other goroutines can use + // active spin wait to wait for completion; and so that + // GC does not see the fake type accidentally. + runtime_procPin() + if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { + runtime_procUnpin() + continue + } + // Complete first store. + StorePointer(&vp.data, np.data) + StorePointer(&vp.typ, np.typ) + runtime_procUnpin() + return true + } + if typ == unsafe.Pointer(&firstStoreInProgress) { + // First store in progress. Wait. + // Since we disable preemption around the first store, + // we can wait with active spinning. + continue + } + // First store completed. Check type and overwrite data. + if typ != np.typ { + panic("sync/atomic: compare and swap of inconsistently typed value into Value") + } + // Compare old and current via runtime equality check. + // This allows value types to be compared, something + // not offered by the package functions. + // CompareAndSwapPointer below only ensures vp.data + // has not changed since LoadPointer. + data := LoadPointer(&vp.data) + var i any + (*ifaceWords)(unsafe.Pointer(&i)).typ = typ + (*ifaceWords)(unsafe.Pointer(&i)).data = data + if i != old { + return false + } + return CompareAndSwapPointer(&vp.data, data, np.data) + } +} + +// Disable/enable preemption, implemented in runtime. +func runtime_procPin() +func runtime_procUnpin() diff --git a/contrib/go/_std_1.18/src/sync/cond.go b/contrib/go/_std_1.18/src/sync/cond.go new file mode 100644 index 0000000000..b254c9360a --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/cond.go @@ -0,0 +1,98 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// Cond implements a condition variable, a rendezvous point +// for goroutines waiting for or announcing the occurrence +// of an event. +// +// Each Cond has an associated Locker L (often a *Mutex or *RWMutex), +// which must be held when changing the condition and +// when calling the Wait method. +// +// A Cond must not be copied after first use. +type Cond struct { + noCopy noCopy + + // L is held while observing or changing the condition + L Locker + + notify notifyList + checker copyChecker +} + +// NewCond returns a new Cond with Locker l. +func NewCond(l Locker) *Cond { + return &Cond{L: l} +} + +// Wait atomically unlocks c.L and suspends execution +// of the calling goroutine. After later resuming execution, +// Wait locks c.L before returning. Unlike in other systems, +// Wait cannot return unless awoken by Broadcast or Signal. +// +// Because c.L is not locked when Wait first resumes, the caller +// typically cannot assume that the condition is true when +// Wait returns. Instead, the caller should Wait in a loop: +// +// c.L.Lock() +// for !condition() { +// c.Wait() +// } +// ... make use of condition ... +// c.L.Unlock() +// +func (c *Cond) Wait() { + c.checker.check() + t := runtime_notifyListAdd(&c.notify) + c.L.Unlock() + runtime_notifyListWait(&c.notify, t) + c.L.Lock() +} + +// Signal wakes one goroutine waiting on c, if there is any. +// +// It is allowed but not required for the caller to hold c.L +// during the call. +func (c *Cond) Signal() { + c.checker.check() + runtime_notifyListNotifyOne(&c.notify) +} + +// Broadcast wakes all goroutines waiting on c. +// +// It is allowed but not required for the caller to hold c.L +// during the call. +func (c *Cond) Broadcast() { + c.checker.check() + runtime_notifyListNotifyAll(&c.notify) +} + +// copyChecker holds back pointer to itself to detect object copying. +type copyChecker uintptr + +func (c *copyChecker) check() { + if uintptr(*c) != uintptr(unsafe.Pointer(c)) && + !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && + uintptr(*c) != uintptr(unsafe.Pointer(c)) { + panic("sync.Cond is copied") + } +} + +// noCopy may be embedded into structs which must not be copied +// after the first use. +// +// See https://golang.org/issues/8005#issuecomment-190753527 +// for details. +type noCopy struct{} + +// Lock is a no-op used by -copylocks checker from `go vet`. +func (*noCopy) Lock() {} +func (*noCopy) Unlock() {} diff --git a/contrib/go/_std_1.18/src/sync/map.go b/contrib/go/_std_1.18/src/sync/map.go new file mode 100644 index 0000000000..2fa3253429 --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/map.go @@ -0,0 +1,386 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// Map is like a Go map[interface{}]interface{} but is safe for concurrent use +// by multiple goroutines without additional locking or coordination. +// Loads, stores, and deletes run in amortized constant time. +// +// The Map type is specialized. Most code should use a plain Go map instead, +// with separate locking or coordination, for better type safety and to make it +// easier to maintain other invariants along with the map content. +// +// The Map type is optimized for two common use cases: (1) when the entry for a given +// key is only ever written once but read many times, as in caches that only grow, +// or (2) when multiple goroutines read, write, and overwrite entries for disjoint +// sets of keys. In these two cases, use of a Map may significantly reduce lock +// contention compared to a Go map paired with a separate Mutex or RWMutex. +// +// The zero Map is empty and ready for use. A Map must not be copied after first use. +type Map struct { + mu Mutex + + // read contains the portion of the map's contents that are safe for + // concurrent access (with or without mu held). + // + // The read field itself is always safe to load, but must only be stored with + // mu held. + // + // Entries stored in read may be updated concurrently without mu, but updating + // a previously-expunged entry requires that the entry be copied to the dirty + // map and unexpunged with mu held. + read atomic.Value // readOnly + + // dirty contains the portion of the map's contents that require mu to be + // held. To ensure that the dirty map can be promoted to the read map quickly, + // it also includes all of the non-expunged entries in the read map. + // + // Expunged entries are not stored in the dirty map. An expunged entry in the + // clean map must be unexpunged and added to the dirty map before a new value + // can be stored to it. + // + // If the dirty map is nil, the next write to the map will initialize it by + // making a shallow copy of the clean map, omitting stale entries. + dirty map[any]*entry + + // misses counts the number of loads since the read map was last updated that + // needed to lock mu to determine whether the key was present. + // + // Once enough misses have occurred to cover the cost of copying the dirty + // map, the dirty map will be promoted to the read map (in the unamended + // state) and the next store to the map will make a new dirty copy. + misses int +} + +// readOnly is an immutable struct stored atomically in the Map.read field. +type readOnly struct { + m map[any]*entry + amended bool // true if the dirty map contains some key not in m. +} + +// expunged is an arbitrary pointer that marks entries which have been deleted +// from the dirty map. +var expunged = unsafe.Pointer(new(any)) + +// An entry is a slot in the map corresponding to a particular key. +type entry struct { + // p points to the interface{} value stored for the entry. + // + // If p == nil, the entry has been deleted, and either m.dirty == nil or + // m.dirty[key] is e. + // + // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry + // is missing from m.dirty. + // + // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty + // != nil, in m.dirty[key]. + // + // An entry can be deleted by atomic replacement with nil: when m.dirty is + // next created, it will atomically replace nil with expunged and leave + // m.dirty[key] unset. + // + // An entry's associated value can be updated by atomic replacement, provided + // p != expunged. If p == expunged, an entry's associated value can be updated + // only after first setting m.dirty[key] = e so that lookups using the dirty + // map find the entry. + p unsafe.Pointer // *interface{} +} + +func newEntry(i any) *entry { + return &entry{p: unsafe.Pointer(&i)} +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *Map) Load(key any) (value any, ok bool) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + // Avoid reporting a spurious miss if m.dirty got promoted while we were + // blocked on m.mu. (If further loads of the same key will not miss, it's + // not worth copying the dirty map for this key.) + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if !ok { + return nil, false + } + return e.load() +} + +func (e *entry) load() (value any, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return nil, false + } + return *(*any)(p), true +} + +// Store sets the value for a key. +func (m *Map) Store(key, value any) { + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok && e.tryStore(&value) { + return + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + // The entry was previously expunged, which implies that there is a + // non-nil dirty map and this entry is not in it. + m.dirty[key] = e + } + e.storeLocked(&value) + } else if e, ok := m.dirty[key]; ok { + e.storeLocked(&value) + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + } + m.mu.Unlock() +} + +// tryStore stores a value if the entry has not been expunged. +// +// If the entry is expunged, tryStore returns false and leaves the entry +// unchanged. +func (e *entry) tryStore(i *any) bool { + for { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return false + } + if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { + return true + } + } +} + +// unexpungeLocked ensures that the entry is not marked as expunged. +// +// If the entry was previously expunged, it must be added to the dirty map +// before m.mu is unlocked. +func (e *entry) unexpungeLocked() (wasExpunged bool) { + return atomic.CompareAndSwapPointer(&e.p, expunged, nil) +} + +// storeLocked unconditionally stores a value to the entry. +// +// The entry must be known not to be expunged. +func (e *entry) storeLocked(i *any) { + atomic.StorePointer(&e.p, unsafe.Pointer(i)) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) { + // Avoid locking if it's a clean hit. + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + actual, loaded, ok := e.tryLoadOrStore(value) + if ok { + return actual, loaded + } + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + m.dirty[key] = e + } + actual, loaded, _ = e.tryLoadOrStore(value) + } else if e, ok := m.dirty[key]; ok { + actual, loaded, _ = e.tryLoadOrStore(value) + m.missLocked() + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + actual, loaded = value, false + } + m.mu.Unlock() + + return actual, loaded +} + +// tryLoadOrStore atomically loads or stores a value if the entry is not +// expunged. +// +// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and +// returns with ok==false. +func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*any)(p), true, true + } + + // Copy the interface after the first load to make this method more amenable + // to escape analysis: if we hit the "load" path or the entry is expunged, we + // shouldn't bother heap-allocating. + ic := i + for { + if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { + return i, false, true + } + p = atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*any)(p), true, true + } + } +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *Map) LoadAndDelete(key any) (value any, loaded bool) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + delete(m.dirty, key) + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if ok { + return e.delete() + } + return nil, false +} + +// Delete deletes the value for a key. +func (m *Map) Delete(key any) { + m.LoadAndDelete(key) +} + +func (e *entry) delete() (value any, ok bool) { + for { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return nil, false + } + if atomic.CompareAndSwapPointer(&e.p, p, nil) { + return *(*any)(p), true + } + } +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently (including by f), Range may reflect any +// mapping for that key from any point during the Range call. Range does not +// block other methods on the receiver; even f itself may call any method on m. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *Map) Range(f func(key, value any) bool) { + // We need to be able to iterate over all of the keys that were already + // present at the start of the call to Range. + // If read.amended is false, then read.m satisfies that property without + // requiring us to hold m.mu for a long time. + read, _ := m.read.Load().(readOnly) + if read.amended { + // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) + // (assuming the caller does not break out early), so a call to Range + // amortizes an entire copy of the map: we can promote the dirty copy + // immediately! + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if read.amended { + read = readOnly{m: m.dirty} + m.read.Store(read) + m.dirty = nil + m.misses = 0 + } + m.mu.Unlock() + } + + for k, e := range read.m { + v, ok := e.load() + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +func (m *Map) missLocked() { + m.misses++ + if m.misses < len(m.dirty) { + return + } + m.read.Store(readOnly{m: m.dirty}) + m.dirty = nil + m.misses = 0 +} + +func (m *Map) dirtyLocked() { + if m.dirty != nil { + return + } + + read, _ := m.read.Load().(readOnly) + m.dirty = make(map[any]*entry, len(read.m)) + for k, e := range read.m { + if !e.tryExpungeLocked() { + m.dirty[k] = e + } + } +} + +func (e *entry) tryExpungeLocked() (isExpunged bool) { + p := atomic.LoadPointer(&e.p) + for p == nil { + if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { + return true + } + p = atomic.LoadPointer(&e.p) + } + return p == expunged +} diff --git a/contrib/go/_std_1.18/src/sync/mutex.go b/contrib/go/_std_1.18/src/sync/mutex.go new file mode 100644 index 0000000000..18b2cedba7 --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/mutex.go @@ -0,0 +1,250 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package sync provides basic synchronization primitives such as mutual +// exclusion locks. Other than the Once and WaitGroup types, most are intended +// for use by low-level library routines. Higher-level synchronization is +// better done via channels and communication. +// +// Values containing the types defined in this package should not be copied. +package sync + +import ( + "internal/race" + "sync/atomic" + "unsafe" +) + +func throw(string) // provided by runtime + +// A Mutex is a mutual exclusion lock. +// The zero value for a Mutex is an unlocked mutex. +// +// A Mutex must not be copied after first use. +type Mutex struct { + state int32 + sema uint32 +} + +// A Locker represents an object that can be locked and unlocked. +type Locker interface { + Lock() + Unlock() +} + +const ( + mutexLocked = 1 << iota // mutex is locked + mutexWoken + mutexStarving + mutexWaiterShift = iota + + // Mutex fairness. + // + // Mutex can be in 2 modes of operations: normal and starvation. + // In normal mode waiters are queued in FIFO order, but a woken up waiter + // does not own the mutex and competes with new arriving goroutines over + // the ownership. New arriving goroutines have an advantage -- they are + // already running on CPU and there can be lots of them, so a woken up + // waiter has good chances of losing. In such case it is queued at front + // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, + // it switches mutex to the starvation mode. + // + // In starvation mode ownership of the mutex is directly handed off from + // the unlocking goroutine to the waiter at the front of the queue. + // New arriving goroutines don't try to acquire the mutex even if it appears + // to be unlocked, and don't try to spin. Instead they queue themselves at + // the tail of the wait queue. + // + // If a waiter receives ownership of the mutex and sees that either + // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, + // it switches mutex back to normal operation mode. + // + // Normal mode has considerably better performance as a goroutine can acquire + // a mutex several times in a row even if there are blocked waiters. + // Starvation mode is important to prevent pathological cases of tail latency. + starvationThresholdNs = 1e6 +) + +// Lock locks m. +// If the lock is already in use, the calling goroutine +// blocks until the mutex is available. +func (m *Mutex) Lock() { + // Fast path: grab unlocked mutex. + if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { + if race.Enabled { + race.Acquire(unsafe.Pointer(m)) + } + return + } + // Slow path (outlined so that the fast path can be inlined) + m.lockSlow() +} + +// TryLock tries to lock m and reports whether it succeeded. +// +// Note that while correct uses of TryLock do exist, they are rare, +// and use of TryLock is often a sign of a deeper problem +// in a particular use of mutexes. +func (m *Mutex) TryLock() bool { + old := m.state + if old&(mutexLocked|mutexStarving) != 0 { + return false + } + + // There may be a goroutine waiting for the mutex, but we are + // running now and can try to grab the mutex before that + // goroutine wakes up. + if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) { + return false + } + + if race.Enabled { + race.Acquire(unsafe.Pointer(m)) + } + return true +} + +func (m *Mutex) lockSlow() { + var waitStartTime int64 + starving := false + awoke := false + iter := 0 + old := m.state + for { + // Don't spin in starvation mode, ownership is handed off to waiters + // so we won't be able to acquire the mutex anyway. + if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { + // Active spinning makes sense. + // Try to set mutexWoken flag to inform Unlock + // to not wake other blocked goroutines. + if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && + atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { + awoke = true + } + runtime_doSpin() + iter++ + old = m.state + continue + } + new := old + // Don't try to acquire starving mutex, new arriving goroutines must queue. + if old&mutexStarving == 0 { + new |= mutexLocked + } + if old&(mutexLocked|mutexStarving) != 0 { + new += 1 << mutexWaiterShift + } + // The current goroutine switches mutex to starvation mode. + // But if the mutex is currently unlocked, don't do the switch. + // Unlock expects that starving mutex has waiters, which will not + // be true in this case. + if starving && old&mutexLocked != 0 { + new |= mutexStarving + } + if awoke { + // The goroutine has been woken from sleep, + // so we need to reset the flag in either case. + if new&mutexWoken == 0 { + throw("sync: inconsistent mutex state") + } + new &^= mutexWoken + } + if atomic.CompareAndSwapInt32(&m.state, old, new) { + if old&(mutexLocked|mutexStarving) == 0 { + break // locked the mutex with CAS + } + // If we were already waiting before, queue at the front of the queue. + queueLifo := waitStartTime != 0 + if waitStartTime == 0 { + waitStartTime = runtime_nanotime() + } + runtime_SemacquireMutex(&m.sema, queueLifo, 1) + starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs + old = m.state + if old&mutexStarving != 0 { + // If this goroutine was woken and mutex is in starvation mode, + // ownership was handed off to us but mutex is in somewhat + // inconsistent state: mutexLocked is not set and we are still + // accounted as waiter. Fix that. + if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { + throw("sync: inconsistent mutex state") + } + delta := int32(mutexLocked - 1<<mutexWaiterShift) + if !starving || old>>mutexWaiterShift == 1 { + // Exit starvation mode. + // Critical to do it here and consider wait time. + // Starvation mode is so inefficient, that two goroutines + // can go lock-step infinitely once they switch mutex + // to starvation mode. + delta -= mutexStarving + } + atomic.AddInt32(&m.state, delta) + break + } + awoke = true + iter = 0 + } else { + old = m.state + } + } + + if race.Enabled { + race.Acquire(unsafe.Pointer(m)) + } +} + +// Unlock unlocks m. +// It is a run-time error if m is not locked on entry to Unlock. +// +// A locked Mutex is not associated with a particular goroutine. +// It is allowed for one goroutine to lock a Mutex and then +// arrange for another goroutine to unlock it. +func (m *Mutex) Unlock() { + if race.Enabled { + _ = m.state + race.Release(unsafe.Pointer(m)) + } + + // Fast path: drop lock bit. + new := atomic.AddInt32(&m.state, -mutexLocked) + if new != 0 { + // Outlined slow path to allow inlining the fast path. + // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. + m.unlockSlow(new) + } +} + +func (m *Mutex) unlockSlow(new int32) { + if (new+mutexLocked)&mutexLocked == 0 { + throw("sync: unlock of unlocked mutex") + } + if new&mutexStarving == 0 { + old := new + for { + // If there are no waiters or a goroutine has already + // been woken or grabbed the lock, no need to wake anyone. + // In starvation mode ownership is directly handed off from unlocking + // goroutine to the next waiter. We are not part of this chain, + // since we did not observe mutexStarving when we unlocked the mutex above. + // So get off the way. + if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { + return + } + // Grab the right to wake someone. + new = (old - 1<<mutexWaiterShift) | mutexWoken + if atomic.CompareAndSwapInt32(&m.state, old, new) { + runtime_Semrelease(&m.sema, false, 1) + return + } + old = m.state + } + } else { + // Starving mode: handoff mutex ownership to the next waiter, and yield + // our time slice so that the next waiter can start to run immediately. + // Note: mutexLocked is not set, the waiter will set it after wakeup. + // But mutex is still considered locked if mutexStarving is set, + // so new coming goroutines won't acquire it. + runtime_Semrelease(&m.sema, true, 1) + } +} diff --git a/contrib/go/_std_1.18/src/sync/once.go b/contrib/go/_std_1.18/src/sync/once.go new file mode 100644 index 0000000000..8844314e7e --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/once.go @@ -0,0 +1,70 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" +) + +// Once is an object that will perform exactly one action. +// +// A Once must not be copied after first use. +type Once struct { + // done indicates whether the action has been performed. + // It is first in the struct because it is used in the hot path. + // The hot path is inlined at every call site. + // Placing done first allows more compact instructions on some architectures (amd64/386), + // and fewer instructions (to calculate offset) on other architectures. + done uint32 + m Mutex +} + +// Do calls the function f if and only if Do is being called for the +// first time for this instance of Once. In other words, given +// var once Once +// if once.Do(f) is called multiple times, only the first call will invoke f, +// even if f has a different value in each invocation. A new instance of +// Once is required for each function to execute. +// +// Do is intended for initialization that must be run exactly once. Since f +// is niladic, it may be necessary to use a function literal to capture the +// arguments to a function to be invoked by Do: +// config.once.Do(func() { config.init(filename) }) +// +// Because no call to Do returns until the one call to f returns, if f causes +// Do to be called, it will deadlock. +// +// If f panics, Do considers it to have returned; future calls of Do return +// without calling f. +// +func (o *Once) Do(f func()) { + // Note: Here is an incorrect implementation of Do: + // + // if atomic.CompareAndSwapUint32(&o.done, 0, 1) { + // f() + // } + // + // Do guarantees that when it returns, f has finished. + // This implementation would not implement that guarantee: + // given two simultaneous calls, the winner of the cas would + // call f, and the second would return immediately, without + // waiting for the first's call to f to complete. + // This is why the slow path falls back to a mutex, and why + // the atomic.StoreUint32 must be delayed until after f returns. + + if atomic.LoadUint32(&o.done) == 0 { + // Outlined slow-path to allow inlining of the fast-path. + o.doSlow(f) + } +} + +func (o *Once) doSlow(f func()) { + o.m.Lock() + defer o.m.Unlock() + if o.done == 0 { + defer atomic.StoreUint32(&o.done, 1) + f() + } +} diff --git a/contrib/go/_std_1.18/src/sync/pool.go b/contrib/go/_std_1.18/src/sync/pool.go new file mode 100644 index 0000000000..d1abb6a8b7 --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/pool.go @@ -0,0 +1,294 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "internal/race" + "runtime" + "sync/atomic" + "unsafe" +) + +// A Pool is a set of temporary objects that may be individually saved and +// retrieved. +// +// Any item stored in the Pool may be removed automatically at any time without +// notification. If the Pool holds the only reference when this happens, the +// item might be deallocated. +// +// A Pool is safe for use by multiple goroutines simultaneously. +// +// Pool's purpose is to cache allocated but unused items for later reuse, +// relieving pressure on the garbage collector. That is, it makes it easy to +// build efficient, thread-safe free lists. However, it is not suitable for all +// free lists. +// +// An appropriate use of a Pool is to manage a group of temporary items +// silently shared among and potentially reused by concurrent independent +// clients of a package. Pool provides a way to amortize allocation overhead +// across many clients. +// +// An example of good use of a Pool is in the fmt package, which maintains a +// dynamically-sized store of temporary output buffers. The store scales under +// load (when many goroutines are actively printing) and shrinks when +// quiescent. +// +// On the other hand, a free list maintained as part of a short-lived object is +// not a suitable use for a Pool, since the overhead does not amortize well in +// that scenario. It is more efficient to have such objects implement their own +// free list. +// +// A Pool must not be copied after first use. +type Pool struct { + noCopy noCopy + + local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal + localSize uintptr // size of the local array + + victim unsafe.Pointer // local from previous cycle + victimSize uintptr // size of victims array + + // New optionally specifies a function to generate + // a value when Get would otherwise return nil. + // It may not be changed concurrently with calls to Get. + New func() any +} + +// Local per-P Pool appendix. +type poolLocalInternal struct { + private any // Can be used only by the respective P. + shared poolChain // Local P can pushHead/popHead; any P can popTail. +} + +type poolLocal struct { + poolLocalInternal + + // Prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte +} + +// from runtime +func fastrandn(n uint32) uint32 + +var poolRaceHash [128]uint64 + +// poolRaceAddr returns an address to use as the synchronization point +// for race detector logic. We don't use the actual pointer stored in x +// directly, for fear of conflicting with other synchronization on that address. +// Instead, we hash the pointer to get an index into poolRaceHash. +// See discussion on golang.org/cl/31589. +func poolRaceAddr(x any) unsafe.Pointer { + ptr := uintptr((*[2]unsafe.Pointer)(unsafe.Pointer(&x))[1]) + h := uint32((uint64(uint32(ptr)) * 0x85ebca6b) >> 16) + return unsafe.Pointer(&poolRaceHash[h%uint32(len(poolRaceHash))]) +} + +// Put adds x to the pool. +func (p *Pool) Put(x any) { + if x == nil { + return + } + if race.Enabled { + if fastrandn(4) == 0 { + // Randomly drop x on floor. + return + } + race.ReleaseMerge(poolRaceAddr(x)) + race.Disable() + } + l, _ := p.pin() + if l.private == nil { + l.private = x + x = nil + } + if x != nil { + l.shared.pushHead(x) + } + runtime_procUnpin() + if race.Enabled { + race.Enable() + } +} + +// Get selects an arbitrary item from the Pool, removes it from the +// Pool, and returns it to the caller. +// Get may choose to ignore the pool and treat it as empty. +// Callers should not assume any relation between values passed to Put and +// the values returned by Get. +// +// If Get would otherwise return nil and p.New is non-nil, Get returns +// the result of calling p.New. +func (p *Pool) Get() any { + if race.Enabled { + race.Disable() + } + l, pid := p.pin() + x := l.private + l.private = nil + if x == nil { + // Try to pop the head of the local shard. We prefer + // the head over the tail for temporal locality of + // reuse. + x, _ = l.shared.popHead() + if x == nil { + x = p.getSlow(pid) + } + } + runtime_procUnpin() + if race.Enabled { + race.Enable() + if x != nil { + race.Acquire(poolRaceAddr(x)) + } + } + if x == nil && p.New != nil { + x = p.New() + } + return x +} + +func (p *Pool) getSlow(pid int) any { + // See the comment in pin regarding ordering of the loads. + size := runtime_LoadAcquintptr(&p.localSize) // load-acquire + locals := p.local // load-consume + // Try to steal one element from other procs. + for i := 0; i < int(size); i++ { + l := indexLocal(locals, (pid+i+1)%int(size)) + if x, _ := l.shared.popTail(); x != nil { + return x + } + } + + // Try the victim cache. We do this after attempting to steal + // from all primary caches because we want objects in the + // victim cache to age out if at all possible. + size = atomic.LoadUintptr(&p.victimSize) + if uintptr(pid) >= size { + return nil + } + locals = p.victim + l := indexLocal(locals, pid) + if x := l.private; x != nil { + l.private = nil + return x + } + for i := 0; i < int(size); i++ { + l := indexLocal(locals, (pid+i)%int(size)) + if x, _ := l.shared.popTail(); x != nil { + return x + } + } + + // Mark the victim cache as empty for future gets don't bother + // with it. + atomic.StoreUintptr(&p.victimSize, 0) + + return nil +} + +// pin pins the current goroutine to P, disables preemption and +// returns poolLocal pool for the P and the P's id. +// Caller must call runtime_procUnpin() when done with the pool. +func (p *Pool) pin() (*poolLocal, int) { + pid := runtime_procPin() + // In pinSlow we store to local and then to localSize, here we load in opposite order. + // Since we've disabled preemption, GC cannot happen in between. + // Thus here we must observe local at least as large localSize. + // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). + s := runtime_LoadAcquintptr(&p.localSize) // load-acquire + l := p.local // load-consume + if uintptr(pid) < s { + return indexLocal(l, pid), pid + } + return p.pinSlow() +} + +func (p *Pool) pinSlow() (*poolLocal, int) { + // Retry under the mutex. + // Can not lock the mutex while pinned. + runtime_procUnpin() + allPoolsMu.Lock() + defer allPoolsMu.Unlock() + pid := runtime_procPin() + // poolCleanup won't be called while we are pinned. + s := p.localSize + l := p.local + if uintptr(pid) < s { + return indexLocal(l, pid), pid + } + if p.local == nil { + allPools = append(allPools, p) + } + // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. + size := runtime.GOMAXPROCS(0) + local := make([]poolLocal, size) + atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release + runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release + return &local[pid], pid +} + +func poolCleanup() { + // This function is called with the world stopped, at the beginning of a garbage collection. + // It must not allocate and probably should not call any runtime functions. + + // Because the world is stopped, no pool user can be in a + // pinned section (in effect, this has all Ps pinned). + + // Drop victim caches from all pools. + for _, p := range oldPools { + p.victim = nil + p.victimSize = 0 + } + + // Move primary cache to victim cache. + for _, p := range allPools { + p.victim = p.local + p.victimSize = p.localSize + p.local = nil + p.localSize = 0 + } + + // The pools with non-empty primary caches now have non-empty + // victim caches and no pools have primary caches. + oldPools, allPools = allPools, nil +} + +var ( + allPoolsMu Mutex + + // allPools is the set of pools that have non-empty primary + // caches. Protected by either 1) allPoolsMu and pinning or 2) + // STW. + allPools []*Pool + + // oldPools is the set of pools that may have non-empty victim + // caches. Protected by STW. + oldPools []*Pool +) + +func init() { + runtime_registerPoolCleanup(poolCleanup) +} + +func indexLocal(l unsafe.Pointer, i int) *poolLocal { + lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) + return (*poolLocal)(lp) +} + +// Implemented in runtime. +func runtime_registerPoolCleanup(cleanup func()) +func runtime_procPin() int +func runtime_procUnpin() + +// The below are implemented in runtime/internal/atomic and the +// compiler also knows to intrinsify the symbol we linkname into this +// package. + +//go:linkname runtime_LoadAcquintptr runtime/internal/atomic.LoadAcquintptr +func runtime_LoadAcquintptr(ptr *uintptr) uintptr + +//go:linkname runtime_StoreReluintptr runtime/internal/atomic.StoreReluintptr +func runtime_StoreReluintptr(ptr *uintptr, val uintptr) uintptr diff --git a/contrib/go/_std_1.18/src/sync/poolqueue.go b/contrib/go/_std_1.18/src/sync/poolqueue.go new file mode 100644 index 0000000000..631f2c15fd --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/poolqueue.go @@ -0,0 +1,309 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// poolDequeue is a lock-free fixed-size single-producer, +// multi-consumer queue. The single producer can both push and pop +// from the head, and consumers can pop from the tail. +// +// It has the added feature that it nils out unused slots to avoid +// unnecessary retention of objects. This is important for sync.Pool, +// but not typically a property considered in the literature. +type poolDequeue struct { + // headTail packs together a 32-bit head index and a 32-bit + // tail index. Both are indexes into vals modulo len(vals)-1. + // + // tail = index of oldest data in queue + // head = index of next slot to fill + // + // Slots in the range [tail, head) are owned by consumers. + // A consumer continues to own a slot outside this range until + // it nils the slot, at which point ownership passes to the + // producer. + // + // The head index is stored in the most-significant bits so + // that we can atomically add to it and the overflow is + // harmless. + headTail uint64 + + // vals is a ring buffer of interface{} values stored in this + // dequeue. The size of this must be a power of 2. + // + // vals[i].typ is nil if the slot is empty and non-nil + // otherwise. A slot is still in use until *both* the tail + // index has moved beyond it and typ has been set to nil. This + // is set to nil atomically by the consumer and read + // atomically by the producer. + vals []eface +} + +type eface struct { + typ, val unsafe.Pointer +} + +const dequeueBits = 32 + +// dequeueLimit is the maximum size of a poolDequeue. +// +// This must be at most (1<<dequeueBits)/2 because detecting fullness +// depends on wrapping around the ring buffer without wrapping around +// the index. We divide by 4 so this fits in an int on 32-bit. +const dequeueLimit = (1 << dequeueBits) / 4 + +// dequeueNil is used in poolDequeue to represent interface{}(nil). +// Since we use nil to represent empty slots, we need a sentinel value +// to represent nil. +type dequeueNil *struct{} + +func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { + const mask = 1<<dequeueBits - 1 + head = uint32((ptrs >> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *poolDequeue) pack(head, tail uint32) uint64 { + const mask = 1<<dequeueBits - 1 + return (uint64(head) << dequeueBits) | + uint64(tail&mask) +} + +// pushHead adds val at the head of the queue. It returns false if the +// queue is full. It must only be called by a single producer. +func (d *poolDequeue) pushHead(val any) bool { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { + // Queue is full. + return false + } + slot := &d.vals[head&uint32(len(d.vals)-1)] + + // Check if the head slot has been released by popTail. + typ := atomic.LoadPointer(&slot.typ) + if typ != nil { + // Another goroutine is still cleaning up the tail, so + // the queue is actually still full. + return false + } + + // The head slot is free, so we own it. + if val == nil { + val = dequeueNil(nil) + } + *(*any)(unsafe.Pointer(slot)) = val + + // Increment head. This passes ownership of slot to popTail + // and acts as a store barrier for writing the slot. + atomic.AddUint64(&d.headTail, 1<<dequeueBits) + return true +} + +// popHead removes and returns the element at the head of the queue. +// It returns false if the queue is empty. It must only be called by a +// single producer. +func (d *poolDequeue) popHead() (any, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm tail and decrement head. We do this before + // reading the value to take back ownership of this + // slot. + head-- + ptrs2 := d.pack(head, tail) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // We successfully took back slot. + slot = &d.vals[head&uint32(len(d.vals)-1)] + break + } + } + + val := *(*any)(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + // Zero the slot. Unlike popTail, this isn't racing with + // pushHead, so we don't need to be careful here. + *slot = eface{} + return val, true +} + +// popTail removes and returns the element at the tail of the queue. +// It returns false if the queue is empty. It may be called by any +// number of consumers. +func (d *poolDequeue) popTail() (any, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm head and tail (for our speculative check + // above) and increment tail. If this succeeds, then + // we own the slot at tail. + ptrs2 := d.pack(head, tail+1) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // Success. + slot = &d.vals[tail&uint32(len(d.vals)-1)] + break + } + } + + // We now own slot. + val := *(*any)(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + + // Tell pushHead that we're done with this slot. Zeroing the + // slot is also important so we don't leave behind references + // that could keep this object live longer than necessary. + // + // We write to val first and then publish that we're done with + // this slot by atomically writing to typ. + slot.val = nil + atomic.StorePointer(&slot.typ, nil) + // At this point pushHead owns the slot. + + return val, true +} + +// poolChain is a dynamically-sized version of poolDequeue. +// +// This is implemented as a doubly-linked list queue of poolDequeues +// where each dequeue is double the size of the previous one. Once a +// dequeue fills up, this allocates a new one and only ever pushes to +// the latest dequeue. Pops happen from the other end of the list and +// once a dequeue is exhausted, it gets removed from the list. +type poolChain struct { + // head is the poolDequeue to push to. This is only accessed + // by the producer, so doesn't need to be synchronized. + head *poolChainElt + + // tail is the poolDequeue to popTail from. This is accessed + // by consumers, so reads and writes must be atomic. + tail *poolChainElt +} + +type poolChainElt struct { + poolDequeue + + // next and prev link to the adjacent poolChainElts in this + // poolChain. + // + // next is written atomically by the producer and read + // atomically by the consumer. It only transitions from nil to + // non-nil. + // + // prev is written atomically by the consumer and read + // atomically by the producer. It only transitions from + // non-nil to nil. + next, prev *poolChainElt +} + +func storePoolChainElt(pp **poolChainElt, v *poolChainElt) { + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v)) +} + +func loadPoolChainElt(pp **poolChainElt) *poolChainElt { + return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp)))) +} + +func (c *poolChain) pushHead(val any) { + d := c.head + if d == nil { + // Initialize the chain. + const initSize = 8 // Must be a power of 2 + d = new(poolChainElt) + d.vals = make([]eface, initSize) + c.head = d + storePoolChainElt(&c.tail, d) + } + + if d.pushHead(val) { + return + } + + // The current dequeue is full. Allocate a new one of twice + // the size. + newSize := len(d.vals) * 2 + if newSize >= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &poolChainElt{prev: d} + d2.vals = make([]eface, newSize) + c.head = d2 + storePoolChainElt(&d.next, d2) + d2.pushHead(val) +} + +func (c *poolChain) popHead() (any, bool) { + d := c.head + for d != nil { + if val, ok := d.popHead(); ok { + return val, ok + } + // There may still be unconsumed elements in the + // previous dequeue, so try backing up. + d = loadPoolChainElt(&d.prev) + } + return nil, false +} + +func (c *poolChain) popTail() (any, bool) { + d := loadPoolChainElt(&c.tail) + if d == nil { + return nil, false + } + + for { + // It's important that we load the next pointer + // *before* popping the tail. In general, d may be + // transiently empty, but if next is non-nil before + // the pop and the pop fails, then d is permanently + // empty, which is the only condition under which it's + // safe to drop d from the chain. + d2 := loadPoolChainElt(&d.next) + + if val, ok := d.popTail(); ok { + return val, ok + } + + if d2 == nil { + // This is the only dequeue. It's empty right + // now, but could be pushed to in the future. + return nil, false + } + + // The tail of the chain has been drained, so move on + // to the next dequeue. Try to drop it from the chain + // so the next pop doesn't have to look at the empty + // dequeue again. + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { + // We won the race. Clear the prev pointer so + // the garbage collector can collect the empty + // dequeue and so popHead doesn't back up + // further than necessary. + storePoolChainElt(&d2.prev, nil) + } + d = d2 + } +} diff --git a/contrib/go/_std_1.18/src/sync/runtime.go b/contrib/go/_std_1.18/src/sync/runtime.go new file mode 100644 index 0000000000..de2b0a3ccd --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/runtime.go @@ -0,0 +1,57 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import "unsafe" + +// defined in package runtime + +// Semacquire waits until *s > 0 and then atomically decrements it. +// It is intended as a simple sleep primitive for use by the synchronization +// library and should not be used directly. +func runtime_Semacquire(s *uint32) + +// SemacquireMutex is like Semacquire, but for profiling contended Mutexes. +// If lifo is true, queue waiter at the head of wait queue. +// skipframes is the number of frames to omit during tracing, counting from +// runtime_SemacquireMutex's caller. +func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) + +// Semrelease atomically increments *s and notifies a waiting goroutine +// if one is blocked in Semacquire. +// It is intended as a simple wakeup primitive for use by the synchronization +// library and should not be used directly. +// If handoff is true, pass count directly to the first waiter. +// skipframes is the number of frames to omit during tracing, counting from +// runtime_Semrelease's caller. +func runtime_Semrelease(s *uint32, handoff bool, skipframes int) + +// See runtime/sema.go for documentation. +func runtime_notifyListAdd(l *notifyList) uint32 + +// See runtime/sema.go for documentation. +func runtime_notifyListWait(l *notifyList, t uint32) + +// See runtime/sema.go for documentation. +func runtime_notifyListNotifyAll(l *notifyList) + +// See runtime/sema.go for documentation. +func runtime_notifyListNotifyOne(l *notifyList) + +// Ensure that sync and runtime agree on size of notifyList. +func runtime_notifyListCheck(size uintptr) +func init() { + var n notifyList + runtime_notifyListCheck(unsafe.Sizeof(n)) +} + +// Active spinning runtime support. +// runtime_canSpin reports whether spinning makes sense at the moment. +func runtime_canSpin(i int) bool + +// runtime_doSpin does active spinning. +func runtime_doSpin() + +func runtime_nanotime() int64 diff --git a/contrib/go/_std_1.18/src/sync/runtime2.go b/contrib/go/_std_1.18/src/sync/runtime2.go new file mode 100644 index 0000000000..9b7e9922fb --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/runtime2.go @@ -0,0 +1,19 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !goexperiment.staticlockranking + +package sync + +import "unsafe" + +// Approximation of notifyList in runtime/sema.go. Size and alignment must +// agree. +type notifyList struct { + wait uint32 + notify uint32 + lock uintptr // key field of the mutex + head unsafe.Pointer + tail unsafe.Pointer +} diff --git a/contrib/go/_std_1.18/src/sync/rwmutex.go b/contrib/go/_std_1.18/src/sync/rwmutex.go new file mode 100644 index 0000000000..f0d4c9771a --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/rwmutex.go @@ -0,0 +1,223 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "internal/race" + "sync/atomic" + "unsafe" +) + +// There is a modified copy of this file in runtime/rwmutex.go. +// If you make any changes here, see if you should make them there. + +// A RWMutex is a reader/writer mutual exclusion lock. +// The lock can be held by an arbitrary number of readers or a single writer. +// The zero value for a RWMutex is an unlocked mutex. +// +// A RWMutex must not be copied after first use. +// +// If a goroutine holds a RWMutex for reading and another goroutine might +// call Lock, no goroutine should expect to be able to acquire a read lock +// until the initial read lock is released. In particular, this prohibits +// recursive read locking. This is to ensure that the lock eventually becomes +// available; a blocked Lock call excludes new readers from acquiring the +// lock. +type RWMutex struct { + w Mutex // held if there are pending writers + writerSem uint32 // semaphore for writers to wait for completing readers + readerSem uint32 // semaphore for readers to wait for completing writers + readerCount int32 // number of pending readers + readerWait int32 // number of departing readers +} + +const rwmutexMaxReaders = 1 << 30 + +// Happens-before relationships are indicated to the race detector via: +// - Unlock -> Lock: readerSem +// - Unlock -> RLock: readerSem +// - RUnlock -> Lock: writerSem +// +// The methods below temporarily disable handling of race synchronization +// events in order to provide the more precise model above to the race +// detector. +// +// For example, atomic.AddInt32 in RLock should not appear to provide +// acquire-release semantics, which would incorrectly synchronize racing +// readers, thus potentially missing races. + +// RLock locks rw for reading. +// +// It should not be used for recursive read locking; a blocked Lock +// call excludes new readers from acquiring the lock. See the +// documentation on the RWMutex type. +func (rw *RWMutex) RLock() { + if race.Enabled { + _ = rw.w.state + race.Disable() + } + if atomic.AddInt32(&rw.readerCount, 1) < 0 { + // A writer is pending, wait for it. + runtime_SemacquireMutex(&rw.readerSem, false, 0) + } + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(&rw.readerSem)) + } +} + +// TryRLock tries to lock rw for reading and reports whether it succeeded. +// +// Note that while correct uses of TryRLock do exist, they are rare, +// and use of TryRLock is often a sign of a deeper problem +// in a particular use of mutexes. +func (rw *RWMutex) TryRLock() bool { + if race.Enabled { + _ = rw.w.state + race.Disable() + } + for { + c := atomic.LoadInt32(&rw.readerCount) + if c < 0 { + if race.Enabled { + race.Enable() + } + return false + } + if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) { + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(&rw.readerSem)) + } + return true + } + } +} + +// RUnlock undoes a single RLock call; +// it does not affect other simultaneous readers. +// It is a run-time error if rw is not locked for reading +// on entry to RUnlock. +func (rw *RWMutex) RUnlock() { + if race.Enabled { + _ = rw.w.state + race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) + race.Disable() + } + if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { + // Outlined slow-path to allow the fast-path to be inlined + rw.rUnlockSlow(r) + } + if race.Enabled { + race.Enable() + } +} + +func (rw *RWMutex) rUnlockSlow(r int32) { + if r+1 == 0 || r+1 == -rwmutexMaxReaders { + race.Enable() + throw("sync: RUnlock of unlocked RWMutex") + } + // A writer is pending. + if atomic.AddInt32(&rw.readerWait, -1) == 0 { + // The last reader unblocks the writer. + runtime_Semrelease(&rw.writerSem, false, 1) + } +} + +// Lock locks rw for writing. +// If the lock is already locked for reading or writing, +// Lock blocks until the lock is available. +func (rw *RWMutex) Lock() { + if race.Enabled { + _ = rw.w.state + race.Disable() + } + // First, resolve competition with other writers. + rw.w.Lock() + // Announce to readers there is a pending writer. + r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders + // Wait for active readers. + if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { + runtime_SemacquireMutex(&rw.writerSem, false, 0) + } + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(&rw.readerSem)) + race.Acquire(unsafe.Pointer(&rw.writerSem)) + } +} + +// TryLock tries to lock rw for writing and reports whether it succeeded. +// +// Note that while correct uses of TryLock do exist, they are rare, +// and use of TryLock is often a sign of a deeper problem +// in a particular use of mutexes. +func (rw *RWMutex) TryLock() bool { + if race.Enabled { + _ = rw.w.state + race.Disable() + } + if !rw.w.TryLock() { + if race.Enabled { + race.Enable() + } + return false + } + if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) { + rw.w.Unlock() + if race.Enabled { + race.Enable() + } + return false + } + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(&rw.readerSem)) + race.Acquire(unsafe.Pointer(&rw.writerSem)) + } + return true +} + +// Unlock unlocks rw for writing. It is a run-time error if rw is +// not locked for writing on entry to Unlock. +// +// As with Mutexes, a locked RWMutex is not associated with a particular +// goroutine. One goroutine may RLock (Lock) a RWMutex and then +// arrange for another goroutine to RUnlock (Unlock) it. +func (rw *RWMutex) Unlock() { + if race.Enabled { + _ = rw.w.state + race.Release(unsafe.Pointer(&rw.readerSem)) + race.Disable() + } + + // Announce to readers there is no active writer. + r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) + if r >= rwmutexMaxReaders { + race.Enable() + throw("sync: Unlock of unlocked RWMutex") + } + // Unblock blocked readers, if any. + for i := 0; i < int(r); i++ { + runtime_Semrelease(&rw.readerSem, false, 0) + } + // Allow other writers to proceed. + rw.w.Unlock() + if race.Enabled { + race.Enable() + } +} + +// RLocker returns a Locker interface that implements +// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock. +func (rw *RWMutex) RLocker() Locker { + return (*rlocker)(rw) +} + +type rlocker RWMutex + +func (r *rlocker) Lock() { (*RWMutex)(r).RLock() } +func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() } diff --git a/contrib/go/_std_1.18/src/sync/waitgroup.go b/contrib/go/_std_1.18/src/sync/waitgroup.go new file mode 100644 index 0000000000..9c6662d04b --- /dev/null +++ b/contrib/go/_std_1.18/src/sync/waitgroup.go @@ -0,0 +1,147 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "internal/race" + "sync/atomic" + "unsafe" +) + +// A WaitGroup waits for a collection of goroutines to finish. +// The main goroutine calls Add to set the number of +// goroutines to wait for. Then each of the goroutines +// runs and calls Done when finished. At the same time, +// Wait can be used to block until all goroutines have finished. +// +// A WaitGroup must not be copied after first use. +type WaitGroup struct { + noCopy noCopy + + // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. + // 64-bit atomic operations require 64-bit alignment, but 32-bit + // compilers only guarantee that 64-bit fields are 32-bit aligned. + // For this reason on 32 bit architectures we need to check in state() + // if state1 is aligned or not, and dynamically "swap" the field order if + // needed. + state1 uint64 + state2 uint32 +} + +// state returns pointers to the state and sema fields stored within wg.state*. +func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { + if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { + // state1 is 64-bit aligned: nothing to do. + return &wg.state1, &wg.state2 + } else { + // state1 is 32-bit aligned but not 64-bit aligned: this means that + // (&state1)+4 is 64-bit aligned. + state := (*[3]uint32)(unsafe.Pointer(&wg.state1)) + return (*uint64)(unsafe.Pointer(&state[1])), &state[0] + } +} + +// Add adds delta, which may be negative, to the WaitGroup counter. +// If the counter becomes zero, all goroutines blocked on Wait are released. +// If the counter goes negative, Add panics. +// +// Note that calls with a positive delta that occur when the counter is zero +// must happen before a Wait. Calls with a negative delta, or calls with a +// positive delta that start when the counter is greater than zero, may happen +// at any time. +// Typically this means the calls to Add should execute before the statement +// creating the goroutine or other event to be waited for. +// If a WaitGroup is reused to wait for several independent sets of events, +// new Add calls must happen after all previous Wait calls have returned. +// See the WaitGroup example. +func (wg *WaitGroup) Add(delta int) { + statep, semap := wg.state() + if race.Enabled { + _ = *statep // trigger nil deref early + if delta < 0 { + // Synchronize decrements with Wait. + race.ReleaseMerge(unsafe.Pointer(wg)) + } + race.Disable() + defer race.Enable() + } + state := atomic.AddUint64(statep, uint64(delta)<<32) + v := int32(state >> 32) + w := uint32(state) + if race.Enabled && delta > 0 && v == int32(delta) { + // The first increment must be synchronized with Wait. + // Need to model this as a read, because there can be + // several concurrent wg.counter transitions from 0. + race.Read(unsafe.Pointer(semap)) + } + if v < 0 { + panic("sync: negative WaitGroup counter") + } + if w != 0 && delta > 0 && v == int32(delta) { + panic("sync: WaitGroup misuse: Add called concurrently with Wait") + } + if v > 0 || w == 0 { + return + } + // This goroutine has set counter to 0 when waiters > 0. + // Now there can't be concurrent mutations of state: + // - Adds must not happen concurrently with Wait, + // - Wait does not increment waiters if it sees counter == 0. + // Still do a cheap sanity check to detect WaitGroup misuse. + if *statep != state { + panic("sync: WaitGroup misuse: Add called concurrently with Wait") + } + // Reset waiters count to 0. + *statep = 0 + for ; w != 0; w-- { + runtime_Semrelease(semap, false, 0) + } +} + +// Done decrements the WaitGroup counter by one. +func (wg *WaitGroup) Done() { + wg.Add(-1) +} + +// Wait blocks until the WaitGroup counter is zero. +func (wg *WaitGroup) Wait() { + statep, semap := wg.state() + if race.Enabled { + _ = *statep // trigger nil deref early + race.Disable() + } + for { + state := atomic.LoadUint64(statep) + v := int32(state >> 32) + w := uint32(state) + if v == 0 { + // Counter is 0, no need to wait. + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(wg)) + } + return + } + // Increment waiters count. + if atomic.CompareAndSwapUint64(statep, state, state+1) { + if race.Enabled && w == 0 { + // Wait must be synchronized with the first Add. + // Need to model this is as a write to race with the read in Add. + // As a consequence, can do the write only for the first waiter, + // otherwise concurrent Waits will race with each other. + race.Write(unsafe.Pointer(semap)) + } + runtime_Semacquire(semap) + if *statep != 0 { + panic("sync: WaitGroup is reused before previous Wait has returned") + } + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(wg)) + } + return + } + } +} |