diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/lwtrace/log_shuttle.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/lwtrace/log_shuttle.h')
-rw-r--r-- | library/cpp/lwtrace/log_shuttle.h | 359 |
1 files changed, 359 insertions, 0 deletions
diff --git a/library/cpp/lwtrace/log_shuttle.h b/library/cpp/lwtrace/log_shuttle.h new file mode 100644 index 00000000000..729a38615fe --- /dev/null +++ b/library/cpp/lwtrace/log_shuttle.h @@ -0,0 +1,359 @@ +#pragma once + +#include "log.h" +#include "probe.h" + +#include <library/cpp/lwtrace/protos/lwtrace.pb.h> + +#include <util/system/spinlock.h> + +namespace NLWTrace { + template <class TDepot> + class TRunLogShuttleActionExecutor; + + //////////////////////////////////////////////////////////////////////////////// + + struct THostTimeCalculator { + double K = 0; + ui64 B = 0; + + THostTimeCalculator() { + TInstant now = TInstant::Now(); + ui64 tsNow = GetCycleCount(); + K = 1000000000 / NHPTimer::GetClockRate(); + B = now.NanoSeconds() - K * tsNow; + } + + ui64 CyclesToEpochNanoseconds(ui64 cycles) const { + return K*cycles + B; + } + + ui64 EpochNanosecondsToCycles(ui64 ns) const { + return (ns - B) / K; + } + }; + + inline ui64 CyclesToEpochNanoseconds(ui64 cycles) { + return Singleton<THostTimeCalculator>()->CyclesToEpochNanoseconds(cycles); + } + + inline ui64 EpochNanosecondsToCycles(ui64 ns) { + return Singleton<THostTimeCalculator>()->EpochNanosecondsToCycles(ns); + } + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + class TLogShuttle: public IShuttle { + private: + using TExecutor = TRunLogShuttleActionExecutor<TDepot>; + TTrackLog TrackLog; + TExecutor* Executor; + bool Ignore = false; + size_t MaxTrackLength; + TAdaptiveLock Lock; + TAtomic ForkFailed = 0; + + public: + explicit TLogShuttle(TExecutor* executor) + : IShuttle(executor->GetTraceIdx(), executor->NewSpanId()) + , Executor(executor) + , MaxTrackLength(Executor->GetAction().GetMaxTrackLength() ? Executor->GetAction().GetMaxTrackLength() : 100) + { + } + + bool DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) override; + void DoEndOfTrack() override; + void DoDrop() override; + void DoSerialize(TShuttleTrace& msg) override; + bool DoFork(TShuttlePtr& child) override; + bool DoJoin(const TShuttlePtr& child) override; + + void SetIgnore(bool ignore); + void Clear(); + + const TTrackLog& GetTrackLog() const { + return TrackLog; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + class TLogShuttleActionBase: public IExecutor { + private: + const ui64 TraceIdx; + + public: + explicit TLogShuttleActionBase(ui64 traceIdx) + : TraceIdx(traceIdx) + { + } + ui64 GetTraceIdx() const { + return TraceIdx; + } + + static TLogShuttle<TDepot>* Cast(const TShuttlePtr& shuttle); + static TLogShuttle<TDepot>* Cast(IShuttle* shuttle); + }; + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + class TRunLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> { + private: + TSpinLock Lock; + TVector<TShuttlePtr> AllShuttles; + TVector<TShuttlePtr> Parking; + TRunLogShuttleAction Action; + TDepot* Depot; + + TAtomic MissedTracks = 0; + TAtomic* LastTrackId; + TAtomic* LastSpanId; + + static constexpr int MaxShuttles = 100000; + + public: + TRunLogShuttleActionExecutor(ui64 traceIdx, const TRunLogShuttleAction& action, TDepot* depot, TAtomic* lastTrackId, TAtomic* lastSpanId); + ~TRunLogShuttleActionExecutor(); + bool DoExecute(TOrbit& orbit, const TParams& params) override; + void RecordShuttle(TLogShuttle<TDepot>* shuttle); + void ParkShuttle(TLogShuttle<TDepot>* shuttle); + void DiscardShuttle(); + TShuttlePtr RentShuttle(); + ui64 NewSpanId(); + const TRunLogShuttleAction& GetAction() const { + return Action; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + class TEditLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> { + private: + TEditLogShuttleAction Action; + + public: + TEditLogShuttleActionExecutor(ui64 traceIdx, const TEditLogShuttleAction& action); + bool DoExecute(TOrbit& orbit, const TParams& params) override; + }; + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + class TDropLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> { + private: + TDropLogShuttleAction Action; + + public: + TDropLogShuttleActionExecutor(ui64 traceIdx, const TDropLogShuttleAction& action); + bool DoExecute(TOrbit& orbit, const TParams& params) override; + }; + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + bool TLogShuttle<TDepot>::DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) { + with_lock (Lock) { + if (TrackLog.Items.size() >= MaxTrackLength) { + TrackLog.Truncated = true; + return true; + } + TrackLog.Items.emplace_back(); + TTrackLog::TItem* item = &TrackLog.Items.back(); + item->ThreadId = 0; // TODO[serxa]: check if it is fast to run TThread::CurrentThreadId(); + item->Probe = probe; + if ((item->SavedParamsCount = probe->Event.Signature.ParamCount) > 0) { + probe->Event.Signature.CloneParams(item->Params, params); + } + item->TimestampCycles = timestamp ? timestamp : GetCycleCount(); + } + + return true; + } + + template <class TDepot> + void TLogShuttle<TDepot>::DoEndOfTrack() { + // Record track log if not ignored + if (!Ignore) { + if (AtomicGet(ForkFailed)) { + Executor->DiscardShuttle(); + } else { + Executor->RecordShuttle(this); + } + } + Executor->ParkShuttle(this); + } + + template <class TDepot> + void TLogShuttle<TDepot>::DoDrop() { + // Do not track log results of dropped shuttles + Executor->ParkShuttle(this); + } + + template <class TDepot> + void TLogShuttle<TDepot>::SetIgnore(bool ignore) { + Ignore = ignore; + } + + template <class TDepot> + void TLogShuttle<TDepot>::Clear() { + TrackLog.Clear(); + } + + template <class TDepot> + void TLogShuttle<TDepot>::DoSerialize(TShuttleTrace& msg) + { + with_lock (Lock) + { + if (!GetTrackLog().Items.size()) { + return ; + } + for (auto& record : GetTrackLog().Items) { + auto *rec = msg.AddEvents(); + rec->SetName(record.Probe->Event.Name); + rec->SetProvider(record.Probe->Event.GetProvider()); + rec->SetTimestampNanosec( + CyclesToEpochNanoseconds(record.TimestampCycles)); + record.Probe->Event.Signature.SerializeToPb(record.Params, *rec->MutableParams()); + } + } + } + + template <class TDepot> + TLogShuttle<TDepot>* TLogShuttleActionBase<TDepot>::Cast(const TShuttlePtr& shuttle) { + return static_cast<TLogShuttle<TDepot>*>(shuttle.Get()); + } + + template <class TDepot> + TLogShuttle<TDepot>* TLogShuttleActionBase<TDepot>::Cast(IShuttle* shuttle) { + return static_cast<TLogShuttle<TDepot>*>(shuttle); + } + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + TRunLogShuttleActionExecutor<TDepot>::TRunLogShuttleActionExecutor( + ui64 traceIdx, + const TRunLogShuttleAction& action, + TDepot* depot, + TAtomic* lastTrackId, + TAtomic* lastSpanId) + : TLogShuttleActionBase<TDepot>(traceIdx) + , Action(action) + , Depot(depot) + , LastTrackId(lastTrackId) + , LastSpanId(lastSpanId) + { + ui64 size = Min<ui64>(Action.GetShuttlesCount() ? Action.GetShuttlesCount() : 1000, MaxShuttles); // Do not allow to allocate too much memory + AllShuttles.reserve(size); + Parking.reserve(size); + for (ui64 i = 0; i < size; i++) { + TShuttlePtr shuttle(new TLogShuttle<TDepot>(this)); + AllShuttles.emplace_back(shuttle); + Parking.emplace_back(shuttle); + } + } + + template <class TDepot> + TRunLogShuttleActionExecutor<TDepot>::~TRunLogShuttleActionExecutor() { + for (TShuttlePtr& shuttle : AllShuttles) { + shuttle->Kill(); + } + } + + template <class TDepot> + bool TRunLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) { + Y_UNUSED(params); + if (TShuttlePtr shuttle = RentShuttle()) { + this->Cast(shuttle)->SetIgnore(Action.GetIgnore()); + orbit.AddShuttle(shuttle); + } else { + AtomicIncrement(MissedTracks); + } + return true; + } + + template <class TDepot> + void TRunLogShuttleActionExecutor<TDepot>::DiscardShuttle() { + AtomicIncrement(MissedTracks); + } + + template <class TDepot> + void TRunLogShuttleActionExecutor<TDepot>::RecordShuttle(TLogShuttle<TDepot>* shuttle) { + if (Depot == nullptr) { + return; + } + typename TDepot::TAccessor a(*Depot); + if (TTrackLog* trackLog = a.Add()) { + *trackLog = shuttle->GetTrackLog(); + trackLog->Id = AtomicIncrement(*LastTrackId); // Track id is assigned at reporting time + } + } + + template <class TDepot> + TShuttlePtr TRunLogShuttleActionExecutor<TDepot>::RentShuttle() { + TGuard<TSpinLock> g(Lock); + if (Parking.empty()) { + return TShuttlePtr(); + } else { + TShuttlePtr shuttle = Parking.back(); + Parking.pop_back(); + return shuttle; + } + } + + template <class TDepot> + void TRunLogShuttleActionExecutor<TDepot>::ParkShuttle(TLogShuttle<TDepot>* shuttle) { + shuttle->Clear(); + TGuard<TSpinLock> g(Lock); + Parking.emplace_back(shuttle); + } + + template <class TDepot> + ui64 TRunLogShuttleActionExecutor<TDepot>::NewSpanId() + { + return LastSpanId ? AtomicIncrement(*LastSpanId) : 0; + } + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + TEditLogShuttleActionExecutor<TDepot>::TEditLogShuttleActionExecutor(ui64 traceIdx, const TEditLogShuttleAction& action) + : TLogShuttleActionBase<TDepot>(traceIdx) + , Action(action) + { + } + + template <class TDepot> + bool TEditLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) { + Y_UNUSED(params); + bool ignore = Action.GetIgnore(); + orbit.ForEachShuttle(this->GetTraceIdx(), [=](IShuttle* shuttle) { + this->Cast(shuttle)->SetIgnore(ignore); + return true; + }); + return true; + } + + //////////////////////////////////////////////////////////////////////////////// + + template <class TDepot> + TDropLogShuttleActionExecutor<TDepot>::TDropLogShuttleActionExecutor(ui64 traceIdx, const TDropLogShuttleAction& action) + : TLogShuttleActionBase<TDepot>(traceIdx) + , Action(action) + { + } + + template <class TDepot> + bool TDropLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) { + Y_UNUSED(params); + orbit.ForEachShuttle(this->GetTraceIdx(), [](IShuttle*) { + return false; // Erase shuttle from orbit + }); + return true; + } + +} |