aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/lwtrace/control.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/lwtrace/control.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/lwtrace/control.h')
-rw-r--r--library/cpp/lwtrace/control.h351
1 files changed, 351 insertions, 0 deletions
diff --git a/library/cpp/lwtrace/control.h b/library/cpp/lwtrace/control.h
new file mode 100644
index 00000000000..16b24eafd2d
--- /dev/null
+++ b/library/cpp/lwtrace/control.h
@@ -0,0 +1,351 @@
+#pragma once
+
+#include "custom_action.h"
+#include "event.h"
+#include "log.h"
+#include "log_shuttle.h"
+#include "probe.h"
+
+#include <library/cpp/lwtrace/protos/lwtrace.pb.h>
+
+#include <google/protobuf/repeated_field.h>
+
+#include <util/generic/deque.h>
+#include <util/generic/hash.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/set.h>
+#include <util/generic/vector.h>
+
+namespace NLWTrace {
+
+ using TProbeMap = THashMap<std::pair<TString, TString>, TProbe*>;
+
+ // Interface for probe ownership management
+ class IBox: public virtual TThrRefBase {
+ private:
+ bool Owns;
+
+ public:
+ explicit IBox(bool ownsProbe = false)
+ : Owns(ownsProbe)
+ {
+ }
+
+ bool OwnsProbe() {
+ return Owns;
+ }
+
+ virtual TProbe* GetProbe() = 0;
+ };
+
+ using TBoxPtr = TIntrusivePtr<IBox>;
+
+ // Simple not-owning box, that just holds pointer to static/global probe (e.g. created by LWTRACE_DEFINE_PROVIDER)
+ class TStaticBox: public IBox {
+ private:
+ TProbe* Probe;
+
+ public:
+ explicit TStaticBox(TProbe* probe)
+ : IBox(false)
+ , Probe(probe)
+ {
+ }
+
+ TProbe* GetProbe() override {
+ return Probe;
+ }
+ };
+
+ // Just a set of unique probes
+ // TODO[serxa]: get rid of different ProbeRegistries, use unique one (singleton) with auto registration
+ class TProbeRegistry: public TNonCopyable {
+ private:
+ TMutex Mutex;
+
+ // Probe* pointer uniquely identifies a probe and boxptr actually owns TProbe object (if required)
+ using TProbes = THashMap<TProbe*, TBoxPtr>;
+ TProbes Probes;
+
+ // Probe provider-name pairs must be unique, keep track of them
+ using TIds = TSet<std::pair<TString, TString>>;
+ TIds Ids;
+
+ public:
+ // Add probes from null-terminated array of probe pointers. Probe can be added multiple times. Thread-safe.
+ // Implies probes you pass will live forever (e.g. created by LWTRACE_DEFINE_PROVIDER)
+ void AddProbesList(TProbe** reg);
+
+ // Manage probes that are created/destructed dynamically
+ void AddProbe(const TBoxPtr& box);
+ void RemoveProbe(TProbe* probe);
+
+ // Helper class to make thread-safe iteration over probes
+ class TProbesAccessor {
+ private:
+ TGuard<TMutex> Guard;
+ TProbes& Probes;
+ public:
+ explicit TProbesAccessor(TProbeRegistry* registry)
+ : Guard(registry->Mutex)
+ , Probes(registry->Probes)
+ {}
+
+ explicit TProbesAccessor(TProbeRegistry& registry)
+ : TProbesAccessor(&registry)
+ {}
+
+ auto begin() { return Probes.begin(); }
+ auto end() { return Probes.end(); }
+ };
+
+ friend class TProbesAccessor;
+
+ private:
+ void AddProbeNoLock(const TBoxPtr& box);
+ void RemoveProbeNoLock(TProbe* probe);
+ };
+
+ // Represents a compiled trace query, holds executors attached to probes
+ class TSession: public TNonCopyable {
+ public:
+ typedef THashMap<TString, TAtomicBase> TTraceVariables;
+
+ private:
+ const TInstant StartTime;
+ const ui64 TraceIdx;
+ TProbeRegistry& Registry;
+ TDuration StoreDuration;
+ TDuration ReadDuration;
+ TCyclicLog CyclicLog;
+ TDurationLog DurationLog;
+ TCyclicDepot CyclicDepot;
+ TDurationDepot DurationDepot;
+ TAtomic LastTrackId;
+ TAtomic LastSpanId;
+ typedef TVector<std::pair<TProbe*, IExecutor*>> TProbes;
+ TProbes Probes;
+ bool Attached;
+ NLWTrace::TQuery Query;
+ TTraceVariables TraceVariables;
+ TTraceResources TraceResources;
+ void InsertExecutor(TTraceVariables& traceVariables,
+ size_t bi, const NLWTrace::TPredicate* pred,
+ const google::protobuf::RepeatedPtrField<NLWTrace::TAction>& actions,
+ TProbe* probe, const bool destructiveActionsAllowed,
+ const TCustomActionFactory& customActionFactory);
+
+ private:
+ void Destroy();
+
+ public:
+ TSession(ui64 traceIdx,
+ TProbeRegistry& registry,
+ const NLWTrace::TQuery& query,
+ const bool destructiveActionsAllowed,
+ const TCustomActionFactory& customActionFactory);
+ ~TSession();
+ void Detach();
+ size_t GetEventsCount() const;
+ size_t GetThreadsCount() const;
+ const NLWTrace::TQuery& GetQuery() const {
+ return Query;
+ }
+ TInstant GetStartTime() const {
+ return StartTime;
+ }
+ ui64 GetTraceIdx() const {
+ return TraceIdx;
+ }
+ TTraceResources& Resources() {
+ return TraceResources;
+ }
+ const TTraceResources& Resources() const {
+ return TraceResources;
+ }
+
+ template <class TReader>
+ void ReadThreads(TReader& r) const {
+ CyclicLog.ReadThreads(r);
+ DurationLog.ReadThreads(r);
+ }
+
+ template <class TReader>
+ void ReadItems(TReader& r) const {
+ CyclicLog.ReadItems(r);
+ DurationLog.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r);
+ }
+
+ template <class TReader>
+ void ReadItems(ui64 now, ui64 duration, TReader& r) const {
+ CyclicLog.ReadItems(r);
+ DurationLog.ReadItems(now, duration, r);
+ }
+
+ template <class TReader>
+ void ReadDepotThreads(TReader& r) const {
+ CyclicDepot.ReadThreads(r);
+ DurationDepot.ReadThreads(r);
+ }
+
+ template <class TReader>
+ void ReadDepotItems(TReader& r) const {
+ CyclicDepot.ReadItems(r);
+ DurationDepot.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r);
+ }
+
+ template <class TReader>
+ void ReadDepotItems(ui64 now, ui64 duration, TReader& r) const {
+ CyclicDepot.ReadItems(r);
+ DurationDepot.ReadItems(now, duration, r);
+ }
+ void ToProtobuf(TLogPb& pb) const;
+ };
+
+ // Deserialization result.
+ // Either IsSuccess is true or FailedEventNames contains event names
+ // we were not able to deserialize.
+ struct TTraceDeserializeStatus
+ {
+ bool IsSuccess = true;
+ TVector<TString> FailedEventNames;
+
+ void AddFailedEventName(const TString& name)
+ {
+ IsSuccess = false;
+ FailedEventNames.emplace_back(name);
+ }
+ };
+
+ // Just a registry of all active trace queries
+ // Facade for all interactions with probes/traces
+ class TManager: public TNonCopyable {
+ private:
+ TProbeRegistry& Registry;
+ TMutex Mtx;
+ ui64 LastTraceIdx = 1;
+ typedef THashMap<TString, TSession*> TTraces; // traceId -> TSession
+ TTraces Traces;
+ bool DestructiveActionsAllowed;
+ TCustomActionFactory CustomActionFactory;
+ THolder<TRunLogShuttleActionExecutor<TCyclicDepot>> SerializingExecutor;
+
+ public:
+ static constexpr ui64 RemoteTraceIdx = 0;
+
+ public:
+ TManager(TProbeRegistry& registry, bool allowDestructiveActions);
+ ~TManager();
+ bool HasTrace(const TString& id) const;
+ const TSession* GetTrace(const TString& id) const;
+ void New(const TString& id, const NLWTrace::TQuery& query);
+ void Delete(const TString& id);
+ void Stop(const TString& id);
+
+ template <class TReader>
+ void ReadProbes(TReader& reader) const {
+ TProbeRegistry::TProbesAccessor probes(Registry);
+ for (auto& kv : probes) {
+ TProbe* probe = kv.first;
+ reader.Push(probe);
+ }
+ }
+
+ template <class TReader>
+ void ReadTraces(TReader& reader) const {
+ TGuard<TMutex> g(Mtx);
+ for (const auto& Trace : Traces) {
+ const TString& id = Trace.first;
+ const TSession* trace = Trace.second;
+ reader.Push(id, trace);
+ }
+ }
+
+ template <class TReader>
+ void ReadLog(const TString& id, TReader& reader) {
+ TGuard<TMutex> g(Mtx);
+ TTraces::iterator it = Traces.find(id);
+ if (it == Traces.end()) {
+ ythrow yexception() << "trace id '" << id << "' is not used";
+ } else {
+ TSession* trace = it->second;
+ trace->ReadItems(reader);
+ }
+ }
+
+ template <class TReader>
+ void ReadLog(const TString& id, ui64 now, TReader& reader) {
+ TGuard<TMutex> g(Mtx);
+ TTraces::iterator it = Traces.find(id);
+ if (it == Traces.end()) {
+ ythrow yexception() << "trace id '" << id << "' is not used";
+ } else {
+ TSession* trace = it->second;
+ trace->ReadItems(now, reader);
+ }
+ }
+
+ template <class TReader>
+ void ReadDepot(const TString& id, TReader& reader) {
+ TGuard<TMutex> g(Mtx);
+ TTraces::iterator it = Traces.find(id);
+ if (it == Traces.end()) {
+ ythrow yexception() << "trace id '" << id << "' is not used";
+ } else {
+ TSession* trace = it->second;
+ trace->ReadDepotItems(reader);
+ }
+ }
+
+ template <class TReader>
+ void ReadDepot(const TString& id, ui64 now, TReader& reader) {
+ TGuard<TMutex> g(Mtx);
+ TTraces::iterator it = Traces.find(id);
+ if (it == Traces.end()) {
+ ythrow yexception() << "trace id '" << id << "' is not used";
+ } else {
+ TSession* trace = it->second;
+ trace->ReadDepotItems(now, reader);
+ }
+ }
+
+ bool GetDestructiveActionsAllowed() {
+ return DestructiveActionsAllowed;
+ }
+
+ void RegisterCustomAction(const TString& name, const TCustomActionFactory::TCallback& callback) {
+ CustomActionFactory.Register(name, callback);
+ }
+
+ template <class T>
+ void RegisterCustomAction() {
+ CustomActionFactory.Register(T::GetActionName(), [=](TProbe* probe, const TCustomAction& action, TSession* trace) {
+ return new T(probe, action, trace);
+ });
+ }
+
+ TProbeMap GetProbesMap();
+
+ void CreateTraceRequest(TTraceRequest& msg, TOrbit& orbit);
+
+ bool HandleTraceRequest(
+ const TTraceRequest& msg,
+ TOrbit& orbit);
+
+ TTraceDeserializeStatus HandleTraceResponse(
+ const TTraceResponse& msg,
+ const TProbeMap& probesMap,
+ TOrbit& orbit,
+ i64 timeOffset = 0,
+ double timeScale = 1);
+
+ void CreateTraceResponse(
+ TTraceResponse& msg,
+ TOrbit& orbit);
+
+ bool IsTraced(TOrbit& orbit) {
+ return orbit.HasShuttle(TManager::RemoteTraceIdx);
+ }
+ };
+}