1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
// -*- C++ -*-
//===----------------------------------------------------------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#ifndef _LIBCPP___STOP_TOKEN_STOP_STATE_H
#define _LIBCPP___STOP_TOKEN_STOP_STATE_H
#include <__availability>
#include <__config>
#include <__stop_token/atomic_unique_lock.h>
#include <__stop_token/intrusive_list_view.h>
#include <__thread/id.h>
#include <atomic>
#include <cstdint>
#if !defined(_LIBCPP_HAS_NO_PRAGMA_SYSTEM_HEADER)
# pragma GCC system_header
#endif
_LIBCPP_BEGIN_NAMESPACE_STD
#if _LIBCPP_STD_VER >= 20 && !defined(_LIBCPP_HAS_NO_THREADS)
struct __stop_callback_base : __intrusive_node_base<__stop_callback_base> {
using __callback_fn_t = void(__stop_callback_base*) noexcept;
_LIBCPP_HIDE_FROM_ABI explicit __stop_callback_base(__callback_fn_t* __callback_fn) : __callback_fn_(__callback_fn) {}
_LIBCPP_HIDE_FROM_ABI void __invoke() noexcept { __callback_fn_(this); }
__callback_fn_t* __callback_fn_;
atomic<bool> __completed_ = false;
bool* __destroyed_ = nullptr;
};
class __stop_state {
static constexpr uint32_t __stop_requested_bit = 1;
static constexpr uint32_t __callback_list_locked_bit = 1 << 1;
static constexpr uint32_t __stop_source_counter_shift = 2;
// The "stop_source counter" is not used for lifetime reference counting.
// When the number of stop_source reaches 0, the remaining stop_tokens's
// stop_possible will return false. We need this counter to track this.
//
// The "callback list locked" bit implements the atomic_unique_lock to
// guard the operations on the callback list
//
// 31 - 2 | 1 | 0 |
// stop_source counter | callback list locked | stop_requested |
atomic<uint32_t> __state_ = 0;
// Reference count for stop_token + stop_callback + stop_source
// When the counter reaches zero, the state is destroyed
// It is used by __intrusive_shared_ptr, but it is stored here for better layout
atomic<uint32_t> __ref_count_ = 0;
using __state_t = uint32_t;
using __callback_list_lock = __atomic_unique_lock<__state_t, __callback_list_locked_bit>;
using __callback_list = __intrusive_list_view<__stop_callback_base>;
__callback_list __callback_list_;
__thread_id __requesting_thread_;
public:
_LIBCPP_HIDE_FROM_ABI __stop_state() noexcept = default;
_LIBCPP_HIDE_FROM_ABI void __increment_stop_source_counter() noexcept {
_LIBCPP_ASSERT_UNCATEGORIZED(
__state_.load(std::memory_order_relaxed) <= static_cast<__state_t>(~(1 << __stop_source_counter_shift)),
"stop_source's counter reaches the maximum. Incrementing the counter will overflow");
__state_.fetch_add(1 << __stop_source_counter_shift, std::memory_order_relaxed);
}
// We are not destroying the object after counter decrements to zero, nor do we have
// operations depend on the ordering of decrementing the counter. relaxed is enough.
_LIBCPP_HIDE_FROM_ABI void __decrement_stop_source_counter() noexcept {
_LIBCPP_ASSERT_UNCATEGORIZED(
__state_.load(std::memory_order_relaxed) >= static_cast<__state_t>(1 << __stop_source_counter_shift),
"stop_source's counter is 0. Decrementing the counter will underflow");
__state_.fetch_sub(1 << __stop_source_counter_shift, std::memory_order_relaxed);
}
_LIBCPP_HIDE_FROM_ABI bool __stop_requested() const noexcept {
// acquire because [thread.stoptoken.intro] A call to request_stop that returns true
// synchronizes with a call to stop_requested on an associated stop_token or stop_source
// object that returns true.
// request_stop's compare_exchange_weak has release which syncs with this acquire
return (__state_.load(std::memory_order_acquire) & __stop_requested_bit) != 0;
}
_LIBCPP_HIDE_FROM_ABI bool __stop_possible_for_stop_token() const noexcept {
// [stoptoken.mem] false if "a stop request was not made and there are no associated stop_source objects"
// Todo: Can this be std::memory_order_relaxed as the standard does not say anything except not to introduce data
// race?
__state_t __curent_state = __state_.load(std::memory_order_acquire);
return ((__curent_state & __stop_requested_bit) != 0) || ((__curent_state >> __stop_source_counter_shift) != 0);
}
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __request_stop() noexcept {
auto __cb_list_lock = __try_lock_for_request_stop();
if (!__cb_list_lock.__owns_lock()) {
return false;
}
__requesting_thread_ = this_thread::get_id();
while (!__callback_list_.__empty()) {
auto __cb = __callback_list_.__pop_front();
// allow other callbacks to be removed while invoking the current callback
__cb_list_lock.__unlock();
bool __destroyed = false;
__cb->__destroyed_ = &__destroyed;
__cb->__invoke();
// __cb's invoke function could potentially delete itself. We need to check before accessing __cb's member
if (!__destroyed) {
// needs to set __destroyed_ pointer to nullptr, otherwise it points to a local variable
// which is to be destroyed at the end of the loop
__cb->__destroyed_ = nullptr;
// [stopcallback.cons] If callback is concurrently executing on another thread, then the return
// from the invocation of callback strongly happens before ([intro.races]) callback is destroyed.
// this release syncs with the acquire in the remove_callback
__cb->__completed_.store(true, std::memory_order_release);
__cb->__completed_.notify_all();
}
__cb_list_lock.__lock();
}
return true;
}
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __add_callback(__stop_callback_base* __cb) noexcept {
// If it is already stop_requested. Do not try to request it again.
const auto __give_up_trying_to_lock_condition = [__cb](__state_t __state) {
if ((__state & __stop_requested_bit) != 0) {
// already stop requested, synchronously run the callback and no need to lock the list again
__cb->__invoke();
return true;
}
// no stop source. no need to lock the list to add the callback as it can never be invoked
return (__state >> __stop_source_counter_shift) == 0;
};
__callback_list_lock __cb_list_lock(__state_, __give_up_trying_to_lock_condition);
if (!__cb_list_lock.__owns_lock()) {
return false;
}
__callback_list_.__push_front(__cb);
return true;
// unlock here: [thread.stoptoken.intro] Registration of a callback synchronizes with the invocation of
// that callback.
// Note: this release sync with the acquire in the request_stop' __try_lock_for_request_stop
}
// called by the destructor of stop_callback
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI void __remove_callback(__stop_callback_base* __cb) noexcept {
__callback_list_lock __cb_list_lock(__state_);
// under below condition, the request_stop call just popped __cb from the list and could execute it now
bool __potentially_executing_now = __cb->__prev_ == nullptr && !__callback_list_.__is_head(__cb);
if (__potentially_executing_now) {
auto __requested_thread = __requesting_thread_;
__cb_list_lock.__unlock();
if (std::this_thread::get_id() != __requested_thread) {
// [stopcallback.cons] If callback is concurrently executing on another thread, then the return
// from the invocation of callback strongly happens before ([intro.races]) callback is destroyed.
__cb->__completed_.wait(false, std::memory_order_acquire);
} else {
// The destructor of stop_callback runs on the same thread of the thread that invokes the callback.
// The callback is potentially invoking its own destuctor. Set the flag to avoid accessing destroyed
// members on the invoking side
if (__cb->__destroyed_) {
*__cb->__destroyed_ = true;
}
}
} else {
__callback_list_.__remove(__cb);
}
}
private:
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI __callback_list_lock __try_lock_for_request_stop() noexcept {
// If it is already stop_requested, do not try to request stop or lock the list again.
const auto __lock_fail_condition = [](__state_t __state) { return (__state & __stop_requested_bit) != 0; };
// set locked and requested bit at the same time
const auto __after_lock_state = [](__state_t __state) {
return __state | __callback_list_locked_bit | __stop_requested_bit;
};
// acq because [thread.stoptoken.intro] Registration of a callback synchronizes with the invocation of that
// callback. We are going to invoke the callback after getting the lock, acquire so that we can see the
// registration of a callback (and other writes that happens-before the add_callback)
// Note: the rel (unlock) in the add_callback syncs with this acq
// rel because [thread.stoptoken.intro] A call to request_stop that returns true synchronizes with a call
// to stop_requested on an associated stop_token or stop_source object that returns true.
// We need to make sure that all writes (including user code) before request_stop will be made visible
// to the threads that waiting for `stop_requested == true`
// Note: this rel syncs with the acq in `stop_requested`
const auto __locked_ordering = std::memory_order_acq_rel;
return __callback_list_lock(__state_, __lock_fail_condition, __after_lock_state, __locked_ordering);
}
template <class _Tp>
friend struct __intrusive_shared_ptr_traits;
};
template <class _Tp>
struct __intrusive_shared_ptr_traits;
template <>
struct __intrusive_shared_ptr_traits<__stop_state> {
_LIBCPP_HIDE_FROM_ABI static atomic<uint32_t>& __get_atomic_ref_count(__stop_state& __state) {
return __state.__ref_count_;
}
};
#endif // _LIBCPP_STD_VER >= 20 && !defined(_LIBCPP_HAS_NO_THREADS)
_LIBCPP_END_NAMESPACE_STD
#endif // _LIBCPP___STOP_TOKEN_STOP_STATE_H
|