diff options
author | serxa <serxa@yandex-team.ru> | 2022-02-10 16:49:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:08 +0300 |
commit | e5d4696304c6689379ac7ce334512404d4b7836c (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/lwtrace/control.h | |
parent | d6d7db348c2cc64e71243cab9940ee6778f4317d (diff) | |
download | ydb-e5d4696304c6689379ac7ce334512404d4b7836c.tar.gz |
Restoring authorship annotation for <serxa@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/lwtrace/control.h')
-rw-r--r-- | library/cpp/lwtrace/control.h | 578 |
1 files changed, 289 insertions, 289 deletions
diff --git a/library/cpp/lwtrace/control.h b/library/cpp/lwtrace/control.h index 478488e653..16b24eafd2 100644 --- a/library/cpp/lwtrace/control.h +++ b/library/cpp/lwtrace/control.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include "custom_action.h" #include "event.h" #include "log.h" @@ -10,199 +10,199 @@ #include <google/protobuf/repeated_field.h> -#include <util/generic/deque.h> -#include <util/generic/hash.h> -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/generic/set.h> -#include <util/generic/vector.h> - -namespace NLWTrace { +#include <util/generic/deque.h> +#include <util/generic/hash.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> + +namespace NLWTrace { using TProbeMap = THashMap<std::pair<TString, TString>, TProbe*>; - // Interface for probe ownership management - class IBox: public virtual TThrRefBase { - private: - bool Owns; + // Interface for probe ownership management + class IBox: public virtual TThrRefBase { + private: + bool Owns; - public: - explicit IBox(bool ownsProbe = false) - : Owns(ownsProbe) + public: + explicit IBox(bool ownsProbe = false) + : Owns(ownsProbe) { } - - bool OwnsProbe() { - return Owns; - } - - virtual TProbe* GetProbe() = 0; - }; - - using TBoxPtr = TIntrusivePtr<IBox>; - - // Simple not-owning box, that just holds pointer to static/global probe (e.g. created by LWTRACE_DEFINE_PROVIDER) - class TStaticBox: public IBox { - private: - TProbe* Probe; - - public: - explicit TStaticBox(TProbe* probe) - : IBox(false) - , Probe(probe) + + bool OwnsProbe() { + return Owns; + } + + virtual TProbe* GetProbe() = 0; + }; + + using TBoxPtr = TIntrusivePtr<IBox>; + + // Simple not-owning box, that just holds pointer to static/global probe (e.g. created by LWTRACE_DEFINE_PROVIDER) + class TStaticBox: public IBox { + private: + TProbe* Probe; + + public: + explicit TStaticBox(TProbe* probe) + : IBox(false) + , Probe(probe) { } - - TProbe* GetProbe() override { - return Probe; - } - }; - - // Just a set of unique probes - // TODO[serxa]: get rid of different ProbeRegistries, use unique one (singleton) with auto registration - class TProbeRegistry: public TNonCopyable { - private: - TMutex Mutex; - - // Probe* pointer uniquely identifies a probe and boxptr actually owns TProbe object (if required) - using TProbes = THashMap<TProbe*, TBoxPtr>; - TProbes Probes; - - // Probe provider-name pairs must be unique, keep track of them - using TIds = TSet<std::pair<TString, TString>>; - TIds Ids; - - public: - // Add probes from null-terminated array of probe pointers. Probe can be added multiple times. Thread-safe. - // Implies probes you pass will live forever (e.g. created by LWTRACE_DEFINE_PROVIDER) - void AddProbesList(TProbe** reg); - - // Manage probes that are created/destructed dynamically - void AddProbe(const TBoxPtr& box); - void RemoveProbe(TProbe* probe); - - // Helper class to make thread-safe iteration over probes - class TProbesAccessor { - private: - TGuard<TMutex> Guard; - TProbes& Probes; - public: - explicit TProbesAccessor(TProbeRegistry* registry) - : Guard(registry->Mutex) - , Probes(registry->Probes) - {} - - explicit TProbesAccessor(TProbeRegistry& registry) - : TProbesAccessor(®istry) - {} - - auto begin() { return Probes.begin(); } - auto end() { return Probes.end(); } - }; - - friend class TProbesAccessor; - - private: - void AddProbeNoLock(const TBoxPtr& box); - void RemoveProbeNoLock(TProbe* probe); - }; - - // Represents a compiled trace query, holds executors attached to probes - class TSession: public TNonCopyable { - public: - typedef THashMap<TString, TAtomicBase> TTraceVariables; - - private: - const TInstant StartTime; - const ui64 TraceIdx; - TProbeRegistry& Registry; - TDuration StoreDuration; - TDuration ReadDuration; - TCyclicLog CyclicLog; - TDurationLog DurationLog; - TCyclicDepot CyclicDepot; - TDurationDepot DurationDepot; - TAtomic LastTrackId; + + TProbe* GetProbe() override { + return Probe; + } + }; + + // Just a set of unique probes + // TODO[serxa]: get rid of different ProbeRegistries, use unique one (singleton) with auto registration + class TProbeRegistry: public TNonCopyable { + private: + TMutex Mutex; + + // Probe* pointer uniquely identifies a probe and boxptr actually owns TProbe object (if required) + using TProbes = THashMap<TProbe*, TBoxPtr>; + TProbes Probes; + + // Probe provider-name pairs must be unique, keep track of them + using TIds = TSet<std::pair<TString, TString>>; + TIds Ids; + + public: + // Add probes from null-terminated array of probe pointers. Probe can be added multiple times. Thread-safe. + // Implies probes you pass will live forever (e.g. created by LWTRACE_DEFINE_PROVIDER) + void AddProbesList(TProbe** reg); + + // Manage probes that are created/destructed dynamically + void AddProbe(const TBoxPtr& box); + void RemoveProbe(TProbe* probe); + + // Helper class to make thread-safe iteration over probes + class TProbesAccessor { + private: + TGuard<TMutex> Guard; + TProbes& Probes; + public: + explicit TProbesAccessor(TProbeRegistry* registry) + : Guard(registry->Mutex) + , Probes(registry->Probes) + {} + + explicit TProbesAccessor(TProbeRegistry& registry) + : TProbesAccessor(®istry) + {} + + auto begin() { return Probes.begin(); } + auto end() { return Probes.end(); } + }; + + friend class TProbesAccessor; + + private: + void AddProbeNoLock(const TBoxPtr& box); + void RemoveProbeNoLock(TProbe* probe); + }; + + // Represents a compiled trace query, holds executors attached to probes + class TSession: public TNonCopyable { + public: + typedef THashMap<TString, TAtomicBase> TTraceVariables; + + private: + const TInstant StartTime; + const ui64 TraceIdx; + TProbeRegistry& Registry; + TDuration StoreDuration; + TDuration ReadDuration; + TCyclicLog CyclicLog; + TDurationLog DurationLog; + TCyclicDepot CyclicDepot; + TDurationDepot DurationDepot; + TAtomic LastTrackId; TAtomic LastSpanId; - typedef TVector<std::pair<TProbe*, IExecutor*>> TProbes; - TProbes Probes; - bool Attached; - NLWTrace::TQuery Query; - TTraceVariables TraceVariables; - TTraceResources TraceResources; - void InsertExecutor(TTraceVariables& traceVariables, - size_t bi, const NLWTrace::TPredicate* pred, - const google::protobuf::RepeatedPtrField<NLWTrace::TAction>& actions, - TProbe* probe, const bool destructiveActionsAllowed, - const TCustomActionFactory& customActionFactory); - - private: - void Destroy(); - - public: - TSession(ui64 traceIdx, - TProbeRegistry& registry, - const NLWTrace::TQuery& query, - const bool destructiveActionsAllowed, - const TCustomActionFactory& customActionFactory); - ~TSession(); - void Detach(); - size_t GetEventsCount() const; - size_t GetThreadsCount() const; - const NLWTrace::TQuery& GetQuery() const { - return Query; - } - TInstant GetStartTime() const { - return StartTime; - } - ui64 GetTraceIdx() const { - return TraceIdx; - } - TTraceResources& Resources() { - return TraceResources; - } - const TTraceResources& Resources() const { - return TraceResources; - } - - template <class TReader> - void ReadThreads(TReader& r) const { - CyclicLog.ReadThreads(r); - DurationLog.ReadThreads(r); - } - - template <class TReader> - void ReadItems(TReader& r) const { - CyclicLog.ReadItems(r); - DurationLog.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r); - } - - template <class TReader> - void ReadItems(ui64 now, ui64 duration, TReader& r) const { - CyclicLog.ReadItems(r); - DurationLog.ReadItems(now, duration, r); - } - - template <class TReader> - void ReadDepotThreads(TReader& r) const { - CyclicDepot.ReadThreads(r); - DurationDepot.ReadThreads(r); - } - - template <class TReader> - void ReadDepotItems(TReader& r) const { - CyclicDepot.ReadItems(r); - DurationDepot.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r); - } - - template <class TReader> - void ReadDepotItems(ui64 now, ui64 duration, TReader& r) const { - CyclicDepot.ReadItems(r); - DurationDepot.ReadItems(now, duration, r); - } - void ToProtobuf(TLogPb& pb) const; - }; - + typedef TVector<std::pair<TProbe*, IExecutor*>> TProbes; + TProbes Probes; + bool Attached; + NLWTrace::TQuery Query; + TTraceVariables TraceVariables; + TTraceResources TraceResources; + void InsertExecutor(TTraceVariables& traceVariables, + size_t bi, const NLWTrace::TPredicate* pred, + const google::protobuf::RepeatedPtrField<NLWTrace::TAction>& actions, + TProbe* probe, const bool destructiveActionsAllowed, + const TCustomActionFactory& customActionFactory); + + private: + void Destroy(); + + public: + TSession(ui64 traceIdx, + TProbeRegistry& registry, + const NLWTrace::TQuery& query, + const bool destructiveActionsAllowed, + const TCustomActionFactory& customActionFactory); + ~TSession(); + void Detach(); + size_t GetEventsCount() const; + size_t GetThreadsCount() const; + const NLWTrace::TQuery& GetQuery() const { + return Query; + } + TInstant GetStartTime() const { + return StartTime; + } + ui64 GetTraceIdx() const { + return TraceIdx; + } + TTraceResources& Resources() { + return TraceResources; + } + const TTraceResources& Resources() const { + return TraceResources; + } + + template <class TReader> + void ReadThreads(TReader& r) const { + CyclicLog.ReadThreads(r); + DurationLog.ReadThreads(r); + } + + template <class TReader> + void ReadItems(TReader& r) const { + CyclicLog.ReadItems(r); + DurationLog.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r); + } + + template <class TReader> + void ReadItems(ui64 now, ui64 duration, TReader& r) const { + CyclicLog.ReadItems(r); + DurationLog.ReadItems(now, duration, r); + } + + template <class TReader> + void ReadDepotThreads(TReader& r) const { + CyclicDepot.ReadThreads(r); + DurationDepot.ReadThreads(r); + } + + template <class TReader> + void ReadDepotItems(TReader& r) const { + CyclicDepot.ReadItems(r); + DurationDepot.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r); + } + + template <class TReader> + void ReadDepotItems(ui64 now, ui64 duration, TReader& r) const { + CyclicDepot.ReadItems(r); + DurationDepot.ReadItems(now, duration, r); + } + void ToProtobuf(TLogPb& pb) const; + }; + // Deserialization result. // Either IsSuccess is true or FailedEventNames contains event names // we were not able to deserialize. @@ -218,112 +218,112 @@ namespace NLWTrace { } }; - // Just a registry of all active trace queries - // Facade for all interactions with probes/traces - class TManager: public TNonCopyable { - private: - TProbeRegistry& Registry; - TMutex Mtx; + // Just a registry of all active trace queries + // Facade for all interactions with probes/traces + class TManager: public TNonCopyable { + private: + TProbeRegistry& Registry; + TMutex Mtx; ui64 LastTraceIdx = 1; - typedef THashMap<TString, TSession*> TTraces; // traceId -> TSession - TTraces Traces; - bool DestructiveActionsAllowed; - TCustomActionFactory CustomActionFactory; + typedef THashMap<TString, TSession*> TTraces; // traceId -> TSession + TTraces Traces; + bool DestructiveActionsAllowed; + TCustomActionFactory CustomActionFactory; THolder<TRunLogShuttleActionExecutor<TCyclicDepot>> SerializingExecutor; - - public: + + public: static constexpr ui64 RemoteTraceIdx = 0; public: - TManager(TProbeRegistry& registry, bool allowDestructiveActions); - ~TManager(); - bool HasTrace(const TString& id) const; - const TSession* GetTrace(const TString& id) const; - void New(const TString& id, const NLWTrace::TQuery& query); - void Delete(const TString& id); - void Stop(const TString& id); - - template <class TReader> - void ReadProbes(TReader& reader) const { - TProbeRegistry::TProbesAccessor probes(Registry); - for (auto& kv : probes) { - TProbe* probe = kv.first; - reader.Push(probe); - } - } - - template <class TReader> - void ReadTraces(TReader& reader) const { - TGuard<TMutex> g(Mtx); - for (const auto& Trace : Traces) { - const TString& id = Trace.first; - const TSession* trace = Trace.second; - reader.Push(id, trace); - } - } - - template <class TReader> - void ReadLog(const TString& id, TReader& reader) { - TGuard<TMutex> g(Mtx); - TTraces::iterator it = Traces.find(id); - if (it == Traces.end()) { - ythrow yexception() << "trace id '" << id << "' is not used"; - } else { - TSession* trace = it->second; - trace->ReadItems(reader); - } - } - - template <class TReader> - void ReadLog(const TString& id, ui64 now, TReader& reader) { - TGuard<TMutex> g(Mtx); - TTraces::iterator it = Traces.find(id); - if (it == Traces.end()) { - ythrow yexception() << "trace id '" << id << "' is not used"; - } else { - TSession* trace = it->second; - trace->ReadItems(now, reader); - } - } - - template <class TReader> - void ReadDepot(const TString& id, TReader& reader) { - TGuard<TMutex> g(Mtx); - TTraces::iterator it = Traces.find(id); - if (it == Traces.end()) { - ythrow yexception() << "trace id '" << id << "' is not used"; - } else { - TSession* trace = it->second; - trace->ReadDepotItems(reader); - } - } - - template <class TReader> - void ReadDepot(const TString& id, ui64 now, TReader& reader) { - TGuard<TMutex> g(Mtx); - TTraces::iterator it = Traces.find(id); - if (it == Traces.end()) { - ythrow yexception() << "trace id '" << id << "' is not used"; - } else { - TSession* trace = it->second; - trace->ReadDepotItems(now, reader); - } - } - - bool GetDestructiveActionsAllowed() { - return DestructiveActionsAllowed; - } - - void RegisterCustomAction(const TString& name, const TCustomActionFactory::TCallback& callback) { - CustomActionFactory.Register(name, callback); - } - - template <class T> - void RegisterCustomAction() { - CustomActionFactory.Register(T::GetActionName(), [=](TProbe* probe, const TCustomAction& action, TSession* trace) { - return new T(probe, action, trace); - }); - } + TManager(TProbeRegistry& registry, bool allowDestructiveActions); + ~TManager(); + bool HasTrace(const TString& id) const; + const TSession* GetTrace(const TString& id) const; + void New(const TString& id, const NLWTrace::TQuery& query); + void Delete(const TString& id); + void Stop(const TString& id); + + template <class TReader> + void ReadProbes(TReader& reader) const { + TProbeRegistry::TProbesAccessor probes(Registry); + for (auto& kv : probes) { + TProbe* probe = kv.first; + reader.Push(probe); + } + } + + template <class TReader> + void ReadTraces(TReader& reader) const { + TGuard<TMutex> g(Mtx); + for (const auto& Trace : Traces) { + const TString& id = Trace.first; + const TSession* trace = Trace.second; + reader.Push(id, trace); + } + } + + template <class TReader> + void ReadLog(const TString& id, TReader& reader) { + TGuard<TMutex> g(Mtx); + TTraces::iterator it = Traces.find(id); + if (it == Traces.end()) { + ythrow yexception() << "trace id '" << id << "' is not used"; + } else { + TSession* trace = it->second; + trace->ReadItems(reader); + } + } + + template <class TReader> + void ReadLog(const TString& id, ui64 now, TReader& reader) { + TGuard<TMutex> g(Mtx); + TTraces::iterator it = Traces.find(id); + if (it == Traces.end()) { + ythrow yexception() << "trace id '" << id << "' is not used"; + } else { + TSession* trace = it->second; + trace->ReadItems(now, reader); + } + } + + template <class TReader> + void ReadDepot(const TString& id, TReader& reader) { + TGuard<TMutex> g(Mtx); + TTraces::iterator it = Traces.find(id); + if (it == Traces.end()) { + ythrow yexception() << "trace id '" << id << "' is not used"; + } else { + TSession* trace = it->second; + trace->ReadDepotItems(reader); + } + } + + template <class TReader> + void ReadDepot(const TString& id, ui64 now, TReader& reader) { + TGuard<TMutex> g(Mtx); + TTraces::iterator it = Traces.find(id); + if (it == Traces.end()) { + ythrow yexception() << "trace id '" << id << "' is not used"; + } else { + TSession* trace = it->second; + trace->ReadDepotItems(now, reader); + } + } + + bool GetDestructiveActionsAllowed() { + return DestructiveActionsAllowed; + } + + void RegisterCustomAction(const TString& name, const TCustomActionFactory::TCallback& callback) { + CustomActionFactory.Register(name, callback); + } + + template <class T> + void RegisterCustomAction() { + CustomActionFactory.Register(T::GetActionName(), [=](TProbe* probe, const TCustomAction& action, TSession* trace) { + return new T(probe, action, trace); + }); + } TProbeMap GetProbesMap(); @@ -347,5 +347,5 @@ namespace NLWTrace { bool IsTraced(TOrbit& orbit) { return orbit.HasShuttle(TManager::RemoteTraceIdx); } - }; -} + }; +} |