aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/lwtrace/log_shuttle.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/lwtrace/log_shuttle.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/lwtrace/log_shuttle.h')
-rw-r--r--library/cpp/lwtrace/log_shuttle.h359
1 files changed, 359 insertions, 0 deletions
diff --git a/library/cpp/lwtrace/log_shuttle.h b/library/cpp/lwtrace/log_shuttle.h
new file mode 100644
index 00000000000..729a38615fe
--- /dev/null
+++ b/library/cpp/lwtrace/log_shuttle.h
@@ -0,0 +1,359 @@
+#pragma once
+
+#include "log.h"
+#include "probe.h"
+
+#include <library/cpp/lwtrace/protos/lwtrace.pb.h>
+
+#include <util/system/spinlock.h>
+
+namespace NLWTrace {
+ template <class TDepot>
+ class TRunLogShuttleActionExecutor;
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ struct THostTimeCalculator {
+ double K = 0;
+ ui64 B = 0;
+
+ THostTimeCalculator() {
+ TInstant now = TInstant::Now();
+ ui64 tsNow = GetCycleCount();
+ K = 1000000000 / NHPTimer::GetClockRate();
+ B = now.NanoSeconds() - K * tsNow;
+ }
+
+ ui64 CyclesToEpochNanoseconds(ui64 cycles) const {
+ return K*cycles + B;
+ }
+
+ ui64 EpochNanosecondsToCycles(ui64 ns) const {
+ return (ns - B) / K;
+ }
+ };
+
+ inline ui64 CyclesToEpochNanoseconds(ui64 cycles) {
+ return Singleton<THostTimeCalculator>()->CyclesToEpochNanoseconds(cycles);
+ }
+
+ inline ui64 EpochNanosecondsToCycles(ui64 ns) {
+ return Singleton<THostTimeCalculator>()->EpochNanosecondsToCycles(ns);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ class TLogShuttle: public IShuttle {
+ private:
+ using TExecutor = TRunLogShuttleActionExecutor<TDepot>;
+ TTrackLog TrackLog;
+ TExecutor* Executor;
+ bool Ignore = false;
+ size_t MaxTrackLength;
+ TAdaptiveLock Lock;
+ TAtomic ForkFailed = 0;
+
+ public:
+ explicit TLogShuttle(TExecutor* executor)
+ : IShuttle(executor->GetTraceIdx(), executor->NewSpanId())
+ , Executor(executor)
+ , MaxTrackLength(Executor->GetAction().GetMaxTrackLength() ? Executor->GetAction().GetMaxTrackLength() : 100)
+ {
+ }
+
+ bool DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) override;
+ void DoEndOfTrack() override;
+ void DoDrop() override;
+ void DoSerialize(TShuttleTrace& msg) override;
+ bool DoFork(TShuttlePtr& child) override;
+ bool DoJoin(const TShuttlePtr& child) override;
+
+ void SetIgnore(bool ignore);
+ void Clear();
+
+ const TTrackLog& GetTrackLog() const {
+ return TrackLog;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ class TLogShuttleActionBase: public IExecutor {
+ private:
+ const ui64 TraceIdx;
+
+ public:
+ explicit TLogShuttleActionBase(ui64 traceIdx)
+ : TraceIdx(traceIdx)
+ {
+ }
+ ui64 GetTraceIdx() const {
+ return TraceIdx;
+ }
+
+ static TLogShuttle<TDepot>* Cast(const TShuttlePtr& shuttle);
+ static TLogShuttle<TDepot>* Cast(IShuttle* shuttle);
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ class TRunLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> {
+ private:
+ TSpinLock Lock;
+ TVector<TShuttlePtr> AllShuttles;
+ TVector<TShuttlePtr> Parking;
+ TRunLogShuttleAction Action;
+ TDepot* Depot;
+
+ TAtomic MissedTracks = 0;
+ TAtomic* LastTrackId;
+ TAtomic* LastSpanId;
+
+ static constexpr int MaxShuttles = 100000;
+
+ public:
+ TRunLogShuttleActionExecutor(ui64 traceIdx, const TRunLogShuttleAction& action, TDepot* depot, TAtomic* lastTrackId, TAtomic* lastSpanId);
+ ~TRunLogShuttleActionExecutor();
+ bool DoExecute(TOrbit& orbit, const TParams& params) override;
+ void RecordShuttle(TLogShuttle<TDepot>* shuttle);
+ void ParkShuttle(TLogShuttle<TDepot>* shuttle);
+ void DiscardShuttle();
+ TShuttlePtr RentShuttle();
+ ui64 NewSpanId();
+ const TRunLogShuttleAction& GetAction() const {
+ return Action;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ class TEditLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> {
+ private:
+ TEditLogShuttleAction Action;
+
+ public:
+ TEditLogShuttleActionExecutor(ui64 traceIdx, const TEditLogShuttleAction& action);
+ bool DoExecute(TOrbit& orbit, const TParams& params) override;
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ class TDropLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> {
+ private:
+ TDropLogShuttleAction Action;
+
+ public:
+ TDropLogShuttleActionExecutor(ui64 traceIdx, const TDropLogShuttleAction& action);
+ bool DoExecute(TOrbit& orbit, const TParams& params) override;
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ bool TLogShuttle<TDepot>::DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) {
+ with_lock (Lock) {
+ if (TrackLog.Items.size() >= MaxTrackLength) {
+ TrackLog.Truncated = true;
+ return true;
+ }
+ TrackLog.Items.emplace_back();
+ TTrackLog::TItem* item = &TrackLog.Items.back();
+ item->ThreadId = 0; // TODO[serxa]: check if it is fast to run TThread::CurrentThreadId();
+ item->Probe = probe;
+ if ((item->SavedParamsCount = probe->Event.Signature.ParamCount) > 0) {
+ probe->Event.Signature.CloneParams(item->Params, params);
+ }
+ item->TimestampCycles = timestamp ? timestamp : GetCycleCount();
+ }
+
+ return true;
+ }
+
+ template <class TDepot>
+ void TLogShuttle<TDepot>::DoEndOfTrack() {
+ // Record track log if not ignored
+ if (!Ignore) {
+ if (AtomicGet(ForkFailed)) {
+ Executor->DiscardShuttle();
+ } else {
+ Executor->RecordShuttle(this);
+ }
+ }
+ Executor->ParkShuttle(this);
+ }
+
+ template <class TDepot>
+ void TLogShuttle<TDepot>::DoDrop() {
+ // Do not track log results of dropped shuttles
+ Executor->ParkShuttle(this);
+ }
+
+ template <class TDepot>
+ void TLogShuttle<TDepot>::SetIgnore(bool ignore) {
+ Ignore = ignore;
+ }
+
+ template <class TDepot>
+ void TLogShuttle<TDepot>::Clear() {
+ TrackLog.Clear();
+ }
+
+ template <class TDepot>
+ void TLogShuttle<TDepot>::DoSerialize(TShuttleTrace& msg)
+ {
+ with_lock (Lock)
+ {
+ if (!GetTrackLog().Items.size()) {
+ return ;
+ }
+ for (auto& record : GetTrackLog().Items) {
+ auto *rec = msg.AddEvents();
+ rec->SetName(record.Probe->Event.Name);
+ rec->SetProvider(record.Probe->Event.GetProvider());
+ rec->SetTimestampNanosec(
+ CyclesToEpochNanoseconds(record.TimestampCycles));
+ record.Probe->Event.Signature.SerializeToPb(record.Params, *rec->MutableParams());
+ }
+ }
+ }
+
+ template <class TDepot>
+ TLogShuttle<TDepot>* TLogShuttleActionBase<TDepot>::Cast(const TShuttlePtr& shuttle) {
+ return static_cast<TLogShuttle<TDepot>*>(shuttle.Get());
+ }
+
+ template <class TDepot>
+ TLogShuttle<TDepot>* TLogShuttleActionBase<TDepot>::Cast(IShuttle* shuttle) {
+ return static_cast<TLogShuttle<TDepot>*>(shuttle);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ TRunLogShuttleActionExecutor<TDepot>::TRunLogShuttleActionExecutor(
+ ui64 traceIdx,
+ const TRunLogShuttleAction& action,
+ TDepot* depot,
+ TAtomic* lastTrackId,
+ TAtomic* lastSpanId)
+ : TLogShuttleActionBase<TDepot>(traceIdx)
+ , Action(action)
+ , Depot(depot)
+ , LastTrackId(lastTrackId)
+ , LastSpanId(lastSpanId)
+ {
+ ui64 size = Min<ui64>(Action.GetShuttlesCount() ? Action.GetShuttlesCount() : 1000, MaxShuttles); // Do not allow to allocate too much memory
+ AllShuttles.reserve(size);
+ Parking.reserve(size);
+ for (ui64 i = 0; i < size; i++) {
+ TShuttlePtr shuttle(new TLogShuttle<TDepot>(this));
+ AllShuttles.emplace_back(shuttle);
+ Parking.emplace_back(shuttle);
+ }
+ }
+
+ template <class TDepot>
+ TRunLogShuttleActionExecutor<TDepot>::~TRunLogShuttleActionExecutor() {
+ for (TShuttlePtr& shuttle : AllShuttles) {
+ shuttle->Kill();
+ }
+ }
+
+ template <class TDepot>
+ bool TRunLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) {
+ Y_UNUSED(params);
+ if (TShuttlePtr shuttle = RentShuttle()) {
+ this->Cast(shuttle)->SetIgnore(Action.GetIgnore());
+ orbit.AddShuttle(shuttle);
+ } else {
+ AtomicIncrement(MissedTracks);
+ }
+ return true;
+ }
+
+ template <class TDepot>
+ void TRunLogShuttleActionExecutor<TDepot>::DiscardShuttle() {
+ AtomicIncrement(MissedTracks);
+ }
+
+ template <class TDepot>
+ void TRunLogShuttleActionExecutor<TDepot>::RecordShuttle(TLogShuttle<TDepot>* shuttle) {
+ if (Depot == nullptr) {
+ return;
+ }
+ typename TDepot::TAccessor a(*Depot);
+ if (TTrackLog* trackLog = a.Add()) {
+ *trackLog = shuttle->GetTrackLog();
+ trackLog->Id = AtomicIncrement(*LastTrackId); // Track id is assigned at reporting time
+ }
+ }
+
+ template <class TDepot>
+ TShuttlePtr TRunLogShuttleActionExecutor<TDepot>::RentShuttle() {
+ TGuard<TSpinLock> g(Lock);
+ if (Parking.empty()) {
+ return TShuttlePtr();
+ } else {
+ TShuttlePtr shuttle = Parking.back();
+ Parking.pop_back();
+ return shuttle;
+ }
+ }
+
+ template <class TDepot>
+ void TRunLogShuttleActionExecutor<TDepot>::ParkShuttle(TLogShuttle<TDepot>* shuttle) {
+ shuttle->Clear();
+ TGuard<TSpinLock> g(Lock);
+ Parking.emplace_back(shuttle);
+ }
+
+ template <class TDepot>
+ ui64 TRunLogShuttleActionExecutor<TDepot>::NewSpanId()
+ {
+ return LastSpanId ? AtomicIncrement(*LastSpanId) : 0;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ TEditLogShuttleActionExecutor<TDepot>::TEditLogShuttleActionExecutor(ui64 traceIdx, const TEditLogShuttleAction& action)
+ : TLogShuttleActionBase<TDepot>(traceIdx)
+ , Action(action)
+ {
+ }
+
+ template <class TDepot>
+ bool TEditLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) {
+ Y_UNUSED(params);
+ bool ignore = Action.GetIgnore();
+ orbit.ForEachShuttle(this->GetTraceIdx(), [=](IShuttle* shuttle) {
+ this->Cast(shuttle)->SetIgnore(ignore);
+ return true;
+ });
+ return true;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <class TDepot>
+ TDropLogShuttleActionExecutor<TDepot>::TDropLogShuttleActionExecutor(ui64 traceIdx, const TDropLogShuttleAction& action)
+ : TLogShuttleActionBase<TDepot>(traceIdx)
+ , Action(action)
+ {
+ }
+
+ template <class TDepot>
+ bool TDropLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) {
+ Y_UNUSED(params);
+ orbit.ForEachShuttle(this->GetTraceIdx(), [](IShuttle*) {
+ return false; // Erase shuttle from orbit
+ });
+ return true;
+ }
+
+}