aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/lwtrace/shuttle.h
diff options
context:
space:
mode:
authorserxa <serxa@yandex-team.ru>2022-02-10 16:49:08 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:08 +0300
commite5d4696304c6689379ac7ce334512404d4b7836c (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/lwtrace/shuttle.h
parentd6d7db348c2cc64e71243cab9940ee6778f4317d (diff)
downloadydb-e5d4696304c6689379ac7ce334512404d4b7836c.tar.gz
Restoring authorship annotation for <serxa@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/lwtrace/shuttle.h')
-rw-r--r--library/cpp/lwtrace/shuttle.h572
1 files changed, 286 insertions, 286 deletions
diff --git a/library/cpp/lwtrace/shuttle.h b/library/cpp/lwtrace/shuttle.h
index 24c38e4c47..85c6e4da61 100644
--- a/library/cpp/lwtrace/shuttle.h
+++ b/library/cpp/lwtrace/shuttle.h
@@ -1,45 +1,45 @@
-#pragma once
-
+#pragma once
+
#include "event.h"
#include <library/cpp/containers/stack_vector/stack_vec.h>
-#include <util/generic/ptr.h>
-#include <util/system/spinlock.h>
-
+#include <util/generic/ptr.h>
+#include <util/system/spinlock.h>
+
#include <algorithm>
-#include <type_traits>
-
-namespace NLWTrace {
- struct TProbe;
-
- class IShuttle;
- using TShuttlePtr = TIntrusivePtr<IShuttle>;
-
- // Represents interface of tracing context passing by application between probes
- class alignas(8) IShuttle: public TThrRefBase {
- private:
- ui64 TraceIdx;
+#include <type_traits>
+
+namespace NLWTrace {
+ struct TProbe;
+
+ class IShuttle;
+ using TShuttlePtr = TIntrusivePtr<IShuttle>;
+
+ // Represents interface of tracing context passing by application between probes
+ class alignas(8) IShuttle: public TThrRefBase {
+ private:
+ ui64 TraceIdx;
ui64 SpanId;
ui64 ParentSpanId = 0;
- TAtomic Status = 0;
- static constexpr ui64 DeadFlag = 0x1ull;
- TShuttlePtr Next;
-
- public:
+ TAtomic Status = 0;
+ static constexpr ui64 DeadFlag = 0x1ull;
+ TShuttlePtr Next;
+
+ public:
explicit IShuttle(ui64 traceIdx, ui64 spanId)
- : TraceIdx(traceIdx)
+ : TraceIdx(traceIdx)
, SpanId(spanId)
- {
- }
-
- virtual ~IShuttle() {
- }
-
- ui64 GetTraceIdx() const {
- return TraceIdx;
- }
-
+ {
+ }
+
+ virtual ~IShuttle() {
+ }
+
+ ui64 GetTraceIdx() const {
+ return TraceIdx;
+ }
+
ui64 GetSpanId() const {
return SpanId;
}
@@ -52,130 +52,130 @@ namespace NLWTrace {
ParentSpanId = parentSpanId;
}
- template <class F, class R>
- R UnlessDead(F func, R dead) {
- while (true) {
- ui64 status = AtomicGet(Status);
- if (status & DeadFlag) {
- return dead;
- }
- if (AtomicCas(&Status, status + 2, status)) {
- R result = func();
- ATOMIC_COMPILER_BARRIER();
- AtomicSub(Status, 2);
- return result;
- }
+ template <class F, class R>
+ R UnlessDead(F func, R dead) {
+ while (true) {
+ ui64 status = AtomicGet(Status);
+ if (status & DeadFlag) {
+ return dead;
+ }
+ if (AtomicCas(&Status, status + 2, status)) {
+ R result = func();
+ ATOMIC_COMPILER_BARRIER();
+ AtomicSub(Status, 2);
+ return result;
+ }
+ }
+ }
+
+ template <class F>
+ void UnlessDead(F func) {
+ while (true) {
+ ui64 status = AtomicGet(Status);
+ if (status & DeadFlag) {
+ return;
+ }
+ if (AtomicCas(&Status, status + 2, status)) {
+ func();
+ ATOMIC_COMPILER_BARRIER();
+ AtomicSub(Status, 2);
+ return;
+ }
}
}
- template <class F>
- void UnlessDead(F func) {
- while (true) {
- ui64 status = AtomicGet(Status);
- if (status & DeadFlag) {
- return;
- }
- if (AtomicCas(&Status, status + 2, status)) {
- func();
- ATOMIC_COMPILER_BARRIER();
- AtomicSub(Status, 2);
- return;
- }
- }
- }
-
- void Serialize(TShuttleTrace& msg) {
- UnlessDead([&] {
- DoSerialize(msg);
- });
- }
-
- // Returns false iff shuttle should be destroyed
+ void Serialize(TShuttleTrace& msg) {
+ UnlessDead([&] {
+ DoSerialize(msg);
+ });
+ }
+
+ // Returns false iff shuttle should be destroyed
bool AddProbe(TProbe* probe, const TParams& params, ui64 timestamp = 0) {
- return UnlessDead([&] {
- return DoAddProbe(probe, params, timestamp);
- }, false);
- }
-
- // Track object was destroyed
- void EndOfTrack() {
- if (Next) {
- Next->EndOfTrack();
- Next.Reset();
- }
- UnlessDead([&] {
- DoEndOfTrack();
- });
- }
-
- // Shuttle was dropped from orbit
- TShuttlePtr Drop() {
- UnlessDead([&] {
- DoDrop();
- });
+ return UnlessDead([&] {
+ return DoAddProbe(probe, params, timestamp);
+ }, false);
+ }
+
+ // Track object was destroyed
+ void EndOfTrack() {
+ if (Next) {
+ Next->EndOfTrack();
+ Next.Reset();
+ }
+ UnlessDead([&] {
+ DoEndOfTrack();
+ });
+ }
+
+ // Shuttle was dropped from orbit
+ TShuttlePtr Drop() {
+ UnlessDead([&] {
+ DoDrop();
+ });
return Detach(); // Detached from orbit on Drop
}
TShuttlePtr Detach() {
- TShuttlePtr result;
+ TShuttlePtr result;
result.Swap(Next);
- return result;
- }
-
- // Trace session was destroyed
- void Kill() {
- AtomicAdd(Status, 1); // Sets DeadFlag
- while (AtomicGet(Status) != 1) { // Wait until any vcall is over
- SpinLockPause();
- }
- // After this point all virtual calls on shuttle are disallowed
- ATOMIC_COMPILER_BARRIER();
- }
-
- const TShuttlePtr& GetNext() const {
- return Next;
- }
-
- TShuttlePtr& GetNext() {
- return Next;
- }
-
- void SetNext(const TShuttlePtr& next) {
- Next = next;
- }
-
- bool Fork(TShuttlePtr& child) {
- return UnlessDead([&] {
- return DoFork(child);
- }, true);
+ return result;
+ }
+
+ // Trace session was destroyed
+ void Kill() {
+ AtomicAdd(Status, 1); // Sets DeadFlag
+ while (AtomicGet(Status) != 1) { // Wait until any vcall is over
+ SpinLockPause();
+ }
+ // After this point all virtual calls on shuttle are disallowed
+ ATOMIC_COMPILER_BARRIER();
+ }
+
+ const TShuttlePtr& GetNext() const {
+ return Next;
+ }
+
+ TShuttlePtr& GetNext() {
+ return Next;
+ }
+
+ void SetNext(const TShuttlePtr& next) {
+ Next = next;
+ }
+
+ bool Fork(TShuttlePtr& child) {
+ return UnlessDead([&] {
+ return DoFork(child);
+ }, true);
}
bool Join(TShuttlePtr& child) {
- return UnlessDead([&] {
- return DoJoin(child);
- }, false);
+ return UnlessDead([&] {
+ return DoJoin(child);
+ }, false);
}
bool IsDead() {
return AtomicGet(Status) & DeadFlag;
}
- protected:
+ protected:
virtual bool DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) = 0;
- virtual void DoEndOfTrack() = 0;
- virtual void DoDrop() = 0;
+ virtual void DoEndOfTrack() = 0;
+ virtual void DoDrop() = 0;
virtual void DoSerialize(TShuttleTrace& msg) = 0;
- virtual bool DoFork(TShuttlePtr& child) = 0;
+ virtual bool DoFork(TShuttlePtr& child) = 0;
virtual bool DoJoin(const TShuttlePtr& child) = 0;
- };
-
- // Not thread-safe orbit
- // Orbit should not be used concurrenty, lock is used
- // to ensure this is the case and avoid race condition if not.
- class TOrbit {
- private:
- TShuttlePtr HeadNoLock;
- public:
+ };
+
+ // Not thread-safe orbit
+ // Orbit should not be used concurrenty, lock is used
+ // to ensure this is the case and avoid race condition if not.
+ class TOrbit {
+ private:
+ TShuttlePtr HeadNoLock;
+ public:
TOrbit() = default;
TOrbit(const TOrbit&) = delete;
TOrbit(TOrbit&&) = default;
@@ -183,135 +183,135 @@ namespace NLWTrace {
TOrbit& operator=(const TOrbit&) = delete;
TOrbit& operator=(TOrbit&&) = default;
- ~TOrbit() {
- Reset();
- }
-
- void Reset() {
- NotConcurrent([] (TShuttlePtr& head) {
- if (head) {
- head->EndOfTrack();
- head.Reset();
- }
- });
- }
-
- // Checks if there is at least one shuttle in orbit
- // NOTE: used by every LWTRACK macro check, so keep it optimized - do not lock
- bool HasShuttles() const {
- return HeadNoLock.Get();
- }
-
- void AddShuttle(const TShuttlePtr& shuttle) {
- NotConcurrent([&] (TShuttlePtr& head) {
- Y_VERIFY(!shuttle->GetNext());
- shuttle->SetNext(head);
- head = shuttle;
- });
- }
-
- // Moves every shuttle from `orbit' into this
- void Take(TOrbit& orbit) {
- NotConcurrent([&] (TShuttlePtr& head) {
- orbit.NotConcurrent([&] (TShuttlePtr& oHead) {
- TShuttlePtr* ref = &oHead;
- if (ref->Get()) {
- while (TShuttlePtr& next = (*ref)->GetNext()) {
- ref = &next;
- }
- (*ref)->SetNext(head);
- head.Swap(oHead);
- oHead.Reset();
- }
- });
- });
- }
-
+ ~TOrbit() {
+ Reset();
+ }
+
+ void Reset() {
+ NotConcurrent([] (TShuttlePtr& head) {
+ if (head) {
+ head->EndOfTrack();
+ head.Reset();
+ }
+ });
+ }
+
+ // Checks if there is at least one shuttle in orbit
+ // NOTE: used by every LWTRACK macro check, so keep it optimized - do not lock
+ bool HasShuttles() const {
+ return HeadNoLock.Get();
+ }
+
+ void AddShuttle(const TShuttlePtr& shuttle) {
+ NotConcurrent([&] (TShuttlePtr& head) {
+ Y_VERIFY(!shuttle->GetNext());
+ shuttle->SetNext(head);
+ head = shuttle;
+ });
+ }
+
+ // Moves every shuttle from `orbit' into this
+ void Take(TOrbit& orbit) {
+ NotConcurrent([&] (TShuttlePtr& head) {
+ orbit.NotConcurrent([&] (TShuttlePtr& oHead) {
+ TShuttlePtr* ref = &oHead;
+ if (ref->Get()) {
+ while (TShuttlePtr& next = (*ref)->GetNext()) {
+ ref = &next;
+ }
+ (*ref)->SetNext(head);
+ head.Swap(oHead);
+ oHead.Reset();
+ }
+ });
+ });
+ }
+
void AddProbe(TProbe* probe, const TParams& params, ui64 timestamp = 0) {
- NotConcurrent([&] (TShuttlePtr& head) {
- TShuttlePtr* ref = &head;
- while (IShuttle* s = ref->Get()) {
- if (!s->AddProbe(probe, params, timestamp)) { // Shuttle self-destructed
- *ref = s->Drop(); // May destroy shuttle
- } else {
- ref = &s->GetNext(); // Keep shuttle
- }
- }
- });
- }
-
- template <class TFunc>
- void ForEachShuttle(ui64 traceIdx, TFunc&& func) {
- NotConcurrent([&] (TShuttlePtr& head) {
- TShuttlePtr* ref = &head;
- while (IShuttle* s = ref->Get()) {
- if (s->GetTraceIdx() == traceIdx && !func(s)) { // Shuttle self-destructed
- *ref = s->Drop(); // May destroy shuttle
- } else {
- ref = &s->GetNext(); // Keep shuttle
- }
- }
- });
- }
-
- void Serialize(ui64 traceIdx, TShuttleTrace& msg) {
- ForEachShuttle(traceIdx, [&] (NLWTrace::IShuttle* shuttle) {
+ NotConcurrent([&] (TShuttlePtr& head) {
+ TShuttlePtr* ref = &head;
+ while (IShuttle* s = ref->Get()) {
+ if (!s->AddProbe(probe, params, timestamp)) { // Shuttle self-destructed
+ *ref = s->Drop(); // May destroy shuttle
+ } else {
+ ref = &s->GetNext(); // Keep shuttle
+ }
+ }
+ });
+ }
+
+ template <class TFunc>
+ void ForEachShuttle(ui64 traceIdx, TFunc&& func) {
+ NotConcurrent([&] (TShuttlePtr& head) {
+ TShuttlePtr* ref = &head;
+ while (IShuttle* s = ref->Get()) {
+ if (s->GetTraceIdx() == traceIdx && !func(s)) { // Shuttle self-destructed
+ *ref = s->Drop(); // May destroy shuttle
+ } else {
+ ref = &s->GetNext(); // Keep shuttle
+ }
+ }
+ });
+ }
+
+ void Serialize(ui64 traceIdx, TShuttleTrace& msg) {
+ ForEachShuttle(traceIdx, [&] (NLWTrace::IShuttle* shuttle) {
shuttle->Serialize(msg);
return false;
});
}
- bool HasShuttle(ui64 traceIdx) {
- return NotConcurrent([=] (TShuttlePtr& head) {
- TShuttlePtr ref = head;
- while (IShuttle* s = ref.Get()) {
- if (s->GetTraceIdx() == traceIdx) {
- return true;
- } else {
- ref = s->GetNext();
- }
+ bool HasShuttle(ui64 traceIdx) {
+ return NotConcurrent([=] (TShuttlePtr& head) {
+ TShuttlePtr ref = head;
+ while (IShuttle* s = ref.Get()) {
+ if (s->GetTraceIdx() == traceIdx) {
+ return true;
+ } else {
+ ref = s->GetNext();
+ }
}
- return false;
- });
+ return false;
+ });
}
bool Fork(TOrbit& child) {
- return NotConcurrent([&] (TShuttlePtr& head) {
- return child.NotConcurrent([&] (TShuttlePtr& cHead) {
- bool result = true;
- TShuttlePtr* ref = &head;
- while (IShuttle* shuttle = ref->Get()) {
- if (shuttle->IsDead()) {
- *ref = shuttle->Drop();
- } else {
- result = result && shuttle->Fork(cHead);
- ref = &shuttle->GetNext();
- }
- }
- return result;
- });
- });
+ return NotConcurrent([&] (TShuttlePtr& head) {
+ return child.NotConcurrent([&] (TShuttlePtr& cHead) {
+ bool result = true;
+ TShuttlePtr* ref = &head;
+ while (IShuttle* shuttle = ref->Get()) {
+ if (shuttle->IsDead()) {
+ *ref = shuttle->Drop();
+ } else {
+ result = result && shuttle->Fork(cHead);
+ ref = &shuttle->GetNext();
+ }
+ }
+ return result;
+ });
+ });
}
void Join(TOrbit& child) {
- NotConcurrent([&] (TShuttlePtr& head) {
- child.NotConcurrent([&] (TShuttlePtr& cHead) {
- TShuttlePtr* ref = &head;
- while (IShuttle* shuttle = ref->Get()) {
- if (shuttle->IsDead()) {
- *ref = shuttle->Drop();
- } else {
- child.Join(cHead, shuttle);
- ref = &shuttle->GetNext();
- }
- }
- });
- });
- }
-
- private:
- static void Join(TShuttlePtr& head, IShuttle* parent) {
- TShuttlePtr* ref = &head;
+ NotConcurrent([&] (TShuttlePtr& head) {
+ child.NotConcurrent([&] (TShuttlePtr& cHead) {
+ TShuttlePtr* ref = &head;
+ while (IShuttle* shuttle = ref->Get()) {
+ if (shuttle->IsDead()) {
+ *ref = shuttle->Drop();
+ } else {
+ child.Join(cHead, shuttle);
+ ref = &shuttle->GetNext();
+ }
+ }
+ });
+ });
+ }
+
+ private:
+ static void Join(TShuttlePtr& head, IShuttle* parent) {
+ TShuttlePtr* ref = &head;
while (IShuttle* child = ref->Get()) {
if (parent->GetTraceIdx() == child->GetTraceIdx() && parent->GetSpanId() == child->GetParentSpanId()) {
TShuttlePtr next = child->Detach();
@@ -322,37 +322,37 @@ namespace NLWTrace {
}
}
}
-
- template <class TFunc>
- typename std::invoke_result<TFunc, TShuttlePtr&>::type NotConcurrent(TFunc func) {
- // `HeadNoLock` is binary-copied into local `headCopy` and written with special `locked` value
- // during not concurrent operations. Not concurrent operations should not work
- // with `HeadNoLock` directly. Instead `headCopy` is passed into `func` by reference and
- // after `func()` it is binary-copied back into `HeadNoLock`.
- static_assert(sizeof(HeadNoLock) == sizeof(TAtomic));
- TAtomic* headPtr = reinterpret_cast<TAtomic*>(&HeadNoLock);
- TAtomicBase headCopy = AtomicGet(*headPtr);
- static constexpr TAtomicBase locked = 0x1ull;
- if (headCopy != locked && AtomicCas(headPtr, locked, headCopy)) {
- struct TUnlocker { // to avoid specialization for R=void
- TAtomic* HeadPtr;
- TAtomicBase* HeadCopy;
- ~TUnlocker() {
- ATOMIC_COMPILER_BARRIER();
- AtomicSet(*HeadPtr, *HeadCopy);
- }
- } scoped{headPtr, &headCopy};
- return func(*reinterpret_cast<TShuttlePtr*>(&headCopy));
- } else {
- LockFailed();
- return typename std::invoke_result<TFunc, TShuttlePtr&>::type();
- }
- }
-
- void LockFailed();
- };
-
- inline size_t HasShuttles(const TOrbit& orbit) {
- return orbit.HasShuttles();
- }
-}
+
+ template <class TFunc>
+ typename std::invoke_result<TFunc, TShuttlePtr&>::type NotConcurrent(TFunc func) {
+ // `HeadNoLock` is binary-copied into local `headCopy` and written with special `locked` value
+ // during not concurrent operations. Not concurrent operations should not work
+ // with `HeadNoLock` directly. Instead `headCopy` is passed into `func` by reference and
+ // after `func()` it is binary-copied back into `HeadNoLock`.
+ static_assert(sizeof(HeadNoLock) == sizeof(TAtomic));
+ TAtomic* headPtr = reinterpret_cast<TAtomic*>(&HeadNoLock);
+ TAtomicBase headCopy = AtomicGet(*headPtr);
+ static constexpr TAtomicBase locked = 0x1ull;
+ if (headCopy != locked && AtomicCas(headPtr, locked, headCopy)) {
+ struct TUnlocker { // to avoid specialization for R=void
+ TAtomic* HeadPtr;
+ TAtomicBase* HeadCopy;
+ ~TUnlocker() {
+ ATOMIC_COMPILER_BARRIER();
+ AtomicSet(*HeadPtr, *HeadCopy);
+ }
+ } scoped{headPtr, &headCopy};
+ return func(*reinterpret_cast<TShuttlePtr*>(&headCopy));
+ } else {
+ LockFailed();
+ return typename std::invoke_result<TFunc, TShuttlePtr&>::type();
+ }
+ }
+
+ void LockFailed();
+ };
+
+ inline size_t HasShuttles(const TOrbit& orbit) {
+ return orbit.HasShuttles();
+ }
+}