1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/promise/activity.h"
#include <stddef.h>
#include <initializer_list>
#include <vector>
#include "y_absl/strings/str_cat.h"
#include "y_absl/strings/str_format.h"
#include "y_absl/strings/str_join.h"
#include "src/core/lib/gprpp/atomic_utils.h"
namespace grpc_core {
///////////////////////////////////////////////////////////////////////////////
// GLOBALS
thread_local Activity* Activity::g_current_activity_{nullptr};
namespace promise_detail {
///////////////////////////////////////////////////////////////////////////////
// HELPER TYPES
TString Unwakeable::ActivityDebugTag(WakeupMask) const {
return "<unknown>";
}
// Weak handle to an Activity.
// Handle can persist while Activity goes away.
class FreestandingActivity::Handle final : public Wakeable {
public:
explicit Handle(FreestandingActivity* activity) : activity_(activity) {}
// Ref the Handle (not the activity).
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
// Activity is going away... drop its reference and sever the connection back.
void DropActivity() Y_ABSL_LOCKS_EXCLUDED(mu_) {
mu_.Lock();
GPR_ASSERT(activity_ != nullptr);
activity_ = nullptr;
mu_.Unlock();
Unref();
}
// Activity needs to wake up (if it still exists!) - wake it up, and drop the
// ref that was kept for this handle.
void Wakeup(WakeupMask) override Y_ABSL_LOCKS_EXCLUDED(mu_) {
mu_.Lock();
// Note that activity refcount can drop to zero, but we could win the lock
// against DropActivity, so we need to only increase activities refcount if
// it is non-zero.
if (activity_ && activity_->RefIfNonzero()) {
FreestandingActivity* activity = activity_;
mu_.Unlock();
// Activity still exists and we have a reference: wake it up, which will
// drop the ref.
activity->Wakeup(0);
} else {
// Could not get the activity - it's either gone or going. No need to wake
// it up!
mu_.Unlock();
}
// Drop the ref to the handle (we have one ref = one wakeup semantics).
Unref();
}
void Drop(WakeupMask) override { Unref(); }
TString ActivityDebugTag(WakeupMask) const override {
MutexLock lock(&mu_);
return activity_ == nullptr ? "<unknown>" : activity_->DebugTag();
}
private:
// Unref the Handle (not the activity).
void Unref() {
if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
delete this;
}
}
// Two initial refs: one for the waiter that caused instantiation, one for the
// activity.
std::atomic<size_t> refs_{2};
mutable Mutex mu_ Y_ABSL_ACQUIRED_AFTER(activity_->mu_);
FreestandingActivity* activity_ Y_ABSL_GUARDED_BY(mu_);
};
///////////////////////////////////////////////////////////////////////////////
// ACTIVITY IMPLEMENTATION
bool FreestandingActivity::RefIfNonzero() { return IncrementIfNonzero(&refs_); }
FreestandingActivity::Handle* FreestandingActivity::RefHandle() {
if (handle_ == nullptr) {
// No handle created yet - construct it and return it.
handle_ = new Handle(this);
return handle_;
} else {
// Already had to create a handle, ref & return it.
handle_->Ref();
return handle_;
}
}
void FreestandingActivity::DropHandle() {
handle_->DropActivity();
handle_ = nullptr;
}
Waker FreestandingActivity::MakeNonOwningWaker() {
mu_.AssertHeld();
return Waker(RefHandle(), 0);
}
} // namespace promise_detail
TString Activity::DebugTag() const {
return y_absl::StrFormat("ACTIVITY[%p]", this);
}
///////////////////////////////////////////////////////////////////////////////
// INTRA ACTIVITY WAKER IMPLEMENTATION
TString IntraActivityWaiter::DebugString() const {
std::vector<int> bits;
for (size_t i = 0; i < 8 * sizeof(WakeupMask); i++) {
if (wakeups_ & (1 << i)) bits.push_back(i);
}
return y_absl::StrCat("{", y_absl::StrJoin(bits, ","), "}");
}
} // namespace grpc_core
|