diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/lwtrace/trace.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/lwtrace/trace.cpp')
-rw-r--r-- | library/cpp/lwtrace/trace.cpp | 1051 |
1 files changed, 1051 insertions, 0 deletions
diff --git a/library/cpp/lwtrace/trace.cpp b/library/cpp/lwtrace/trace.cpp new file mode 100644 index 0000000000..3c974c85a0 --- /dev/null +++ b/library/cpp/lwtrace/trace.cpp @@ -0,0 +1,1051 @@ +#include "all.h" +#include "kill_action.h" +#include "log_shuttle.h" +#include "preprocessor.h" +#include "sleep_action.h" +#include "stderr_writer.h" +#include "google/protobuf/repeated_field.h" + +#include <util/generic/map.h> +#include <util/random/random.h> + +#include <functional> + +namespace NLWTrace { +#ifndef LWTRACE_DISABLE + +// Define static strings for name of each parameter type +#define FOREACH_PARAMTYPE_MACRO(n, t, v) \ + const char* TParamType<t>::NameString = n; \ + /**/ + FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO) + FOR_NIL_PARAMTYPE(FOREACH_PARAMTYPE_MACRO) +#undef FOREACH_PARAMTYPE_MACRO + +#endif + + void TProbeRegistry::AddProbesList(TProbe** reg) { + TGuard<TMutex> g(Mutex); + if (reg == nullptr) { + return; + } + for (TProbe** i = reg; *i != nullptr; i++) { + AddProbeNoLock(new TStaticBox(*i)); + } + } + + void TProbeRegistry::AddProbe(const TBoxPtr& box) { + TGuard<TMutex> g(Mutex); + AddProbeNoLock(box); + } + + void TProbeRegistry::RemoveProbe(TProbe* probe) { + TGuard<TMutex> g(Mutex); + RemoveProbeNoLock(probe); + } + + void TProbeRegistry::AddProbeNoLock(const TBoxPtr& box) { + TProbe* probe = box->GetProbe(); + if (Probes.contains(probe)) { + return; // silently skip probe double registration + } + TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name); + Y_VERIFY(Ids.count(key) == 0, "duplicate provider:probe pair %s:%s", key.first.data(), key.second.data()); + Probes.emplace(probe, box); + Ids.insert(key); + } + + void TProbeRegistry::RemoveProbeNoLock(TProbe* probe) { + auto iter = Probes.find(probe); + if (iter != Probes.end()) { + TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name); + Ids.erase(key); + Probes.erase(iter); + } else { + // silently skip probe double unregistration + } + } + + TAtomic* GetVariablePtr(TSession::TTraceVariables& traceVariables, const TString& name) { + TSession::TTraceVariables::iterator it = traceVariables.find(name); + if (it == traceVariables.end()) { + TAtomicBase zero = 0; + traceVariables[name] = zero; + return &traceVariables[name]; + } + return &((*it).second); + } + + typedef enum { + OT_LITERAL = 0, + OT_PARAMETER = 1, + OT_VARIABLE = 2 + } EOperandType; + + template <class T, EOperandType> + class TOperand; + + template <class T> + class TOperand<T, OT_LITERAL> { + private: + T ImmediateValue; + + public: + TOperand(TSession::TTraceVariables&, const TString&, const TString& value, size_t) { + ImmediateValue = TParamConv<T>::FromString(value); + } + const T& Get(const TParams&) { + return ImmediateValue; + } + }; + + template <class T> + class TOperand<T, OT_PARAMETER> { + private: + size_t Idx; + + public: + TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t idx) { + Idx = idx; + } + + const T& Get(const TParams& params) { + return params.Param[Idx].template Get<T>(); + } + }; + + template <class T> + class TOperand<T, OT_VARIABLE> { + private: + TAtomic* Variable; + + public: + TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) { + Variable = GetVariablePtr(traceVariables, name); + } + + const T Get(const TParams&) { + return (T)AtomicGet(*Variable); + } + + void Set(const T& value) { + AtomicSet(*Variable, value); + } + + void Inc() { + AtomicIncrement(*Variable); + } + + void Dec() { + AtomicDecrement(*Variable); + } + + void Add(const TAtomicBase value) { + AtomicAdd(*Variable, value); + } + + void Sub(const TAtomicBase value) { + AtomicSub(*Variable, value); + } + }; + + template <> + class TOperand<TCheck, OT_VARIABLE> { + private: + TAtomic* Variable; + + public: + TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) { + Variable = GetVariablePtr(traceVariables, name); + } + + const TCheck Get(const TParams&) { + return TCheck(AtomicGet(*Variable)); + } + + void Set(const TCheck& value) { + AtomicSet(*Variable, value.Value); + } + + void Add(const TCheck& value) { + AtomicAdd(*Variable, value.Value); + } + + void Sub(const TCheck value) { + AtomicSub(*Variable, value.Value); + } + + void Inc() { + AtomicIncrement(*Variable); + } + + void Dec() { + AtomicDecrement(*Variable); + } + }; + + template <> + class TOperand<TString, OT_VARIABLE> { + private: + TString Dummy; + + public: + TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) { + } + + const TString Get(const TParams&) { + return Dummy; + } + + void Set(const TString&) { + } + }; + + template <> + class TOperand<TSymbol, OT_VARIABLE> { + private: + TSymbol Dummy; + + public: + TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) { + } + + const TSymbol Get(const TParams&) { + return Dummy; + } + + void Set(const TSymbol&) { + } + }; + + // IOperandGetter: hide concrete EOperandType, to save compilation time + template <class T> + struct IOperandGetter { + virtual const T Get(const TParams& params) = 0; + virtual ~IOperandGetter() { + } + }; + + template <class T, EOperandType TParam> + class TOperandGetter: public IOperandGetter<T> { + private: + TOperand<T, TParam> Op; + + public: + TOperandGetter(const TOperand<T, TParam>& op) + : Op(op) + { + } + + const T Get(const TParams& params) override { + return Op.Get(params); + } + }; + + template <class T> + class TReceiver: public TOperand<T, OT_VARIABLE> { + public: + TReceiver(TSession::TTraceVariables& traceVariables, const TString& name) + : TOperand<T, OT_VARIABLE>(traceVariables, name, nullptr, 0) + { + } + }; + + template <class TP, class TPredicate> + static bool CmpFunc(TP a, TP b) { + return TPredicate()(a, b); + } + + template <class TP, class TFunc, EOperandType TLhs, EOperandType TRhs> + class TOperatorExecutor: public IExecutor { + private: + bool InvertCompare; + TOperand<TP, TLhs> Lhs; + TOperand<TP, TRhs> Rhs; + + bool DoExecute(TOrbit&, const TParams& params) override { + return TFunc()(Lhs.Get(params), Rhs.Get(params)) != InvertCompare; + } + + public: + TOperatorExecutor(const TOperand<TP, TLhs>& lhs, const TOperand<TP, TRhs>& rhs, bool invertCompare) + : InvertCompare(invertCompare) + , Lhs(lhs) + , Rhs(rhs) + { + } + }; + + template <class TR, class TP> + struct TAddEq { + void operator()(TR& x, TP y) const { + x.Add(y); + } + }; + template <class TR, class TP> + struct TSubEq { + void operator()(TR& x, TP y) const { + x.Sub(y); + } + }; + template <class TR> + struct TInc { + void operator()(TR& x) const { + x.Inc(); + } + }; + template <class TR> + struct TDec { + void operator()(TR& x) const { + x.Dec(); + } + }; + + template <class TP, class TFunc> + class TUnaryInplaceStatementExecutor: public IExecutor { + private: + TFunc Func; + TReceiver<TP> Receiver; + + bool DoExecute(TOrbit&, const TParams&) override { + Func(Receiver); + return true; + } + + public: + TUnaryInplaceStatementExecutor(TReceiver<TP>& receiver) + : Receiver(receiver) + { + } + }; + + template <class TP, class TFunc, EOperandType TParam> + class TBinaryInplaceStatementExecutor: public IExecutor { + private: + TFunc Func; + TReceiver<TP> Receiver; + TOperand<TP, TParam> Param; + + bool DoExecute(TOrbit&, const TParams& params) override { + Func(Receiver, Param.Get(params)); + return true; + } + + public: + TBinaryInplaceStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TParam>& param) + : Receiver(receiver) + , Param(param) + { + } + }; + + template <class TP, class TFunc, EOperandType TFirstParam> + class TBinaryStatementExecutor: public IExecutor { + private: + TFunc Func; + TReceiver<TP> Receiver; + TOperand<TP, TFirstParam> FirstParam; + + bool DoExecute(TOrbit&, const TParams& params) override { + Receiver.Set(Func(Receiver.Get(params), FirstParam.Get(params))); + return true; + } + + public: + TBinaryStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TFirstParam>& firstParam) + : Receiver(receiver) + , FirstParam(firstParam) + { + } + }; + + template <class TP, class TFunc> + class TTernaryStatementExecutor: public IExecutor { + private: + TFunc Func; + TReceiver<TP> Receiver; + + TAutoPtr<IOperandGetter<TP>> FirstParam; + TAutoPtr<IOperandGetter<TP>> SecondParam; + + bool DoExecute(TOrbit&, const TParams& params) override { + Receiver.Set(Func(FirstParam->Get(params), SecondParam->Get(params))); + return true; + } + + public: + TTernaryStatementExecutor(const TReceiver<TP>& receiver, + TAutoPtr<IOperandGetter<TP>> firstParam, + TAutoPtr<IOperandGetter<TP>> secondParam) + : Receiver(receiver) + , FirstParam(firstParam) + , SecondParam(secondParam) + { + } + }; + + template <class TLog> + class TLogActionExecutor: public IExecutor { + private: + bool LogParams; + bool LogTimestamp; + intptr_t* MaxRecords; + TAtomic Records; + TProbe* Probe; + TLog* Log; + + bool DoExecute(TOrbit&, const TParams& params) override { + if (MaxRecords != nullptr) { + while (true) { + intptr_t a = AtomicGet(Records); + if (a >= *MaxRecords) { + return true; + } + if (AtomicCas(&Records, a + 1, a)) { + Write(params); + return true; + } + } + } else { + Write(params); + return true; + } + } + + void Write(const TParams& params) { + typename TLog::TAccessor la(*Log); + if (typename TLog::TItem* item = la.Add()) { + item->Probe = Probe; + if (LogParams) { + if ((item->SavedParamsCount = Probe->Event.Signature.ParamCount) > 0) { + Probe->Event.Signature.CloneParams(item->Params, params); + } + } else { + item->SavedParamsCount = 0; + } + if (LogTimestamp) { + item->Timestamp = TInstant::Now(); + } + item->TimestampCycles = GetCycleCount(); + } + } + + public: + TLogActionExecutor(TProbe* probe, const TLogAction& action, TLog* log) + : LogParams(!action.GetDoNotLogParams()) + , LogTimestamp(action.GetLogTimestamp()) + , MaxRecords(action.GetMaxRecords() ? new intptr_t(action.GetMaxRecords()) : nullptr) + , Records(0) + , Probe(probe) + , Log(log) + { + } + + ~TLogActionExecutor() override { + delete MaxRecords; + } + }; + + class TSamplingExecutor: public IExecutor { + private: + double SampleRate; + + public: + explicit TSamplingExecutor(double sampleRate) + : SampleRate(sampleRate) + {} + + bool DoExecute(TOrbit&, const TParams&) override { + return RandomNumber<double>() < SampleRate; + } + }; + + typedef struct { + EOperandType Type; + size_t ParamIdx; + } TArgumentDescription; + + using TArgumentList = TVector<TArgumentDescription>; + + template <class T> + void ParseArguments(const T& op, const TSignature& signature, const TString& exceptionPrefix, size_t expectedArgumentCount, TArgumentList& arguments) { + arguments.clear(); + size_t firstParamIdx = size_t(-1); + for (size_t argumentIdx = 0; argumentIdx < op.ArgumentSize(); ++argumentIdx) { + const TArgument& arg = op.GetArgument(argumentIdx); + TArgumentDescription operand; + operand.ParamIdx = size_t(-1); + if (arg.GetVariable()) { + operand.Type = OT_VARIABLE; + } else if (arg.GetValue()) { + operand.Type = OT_LITERAL; + } else if (arg.GetParam()) { + operand.Type = OT_PARAMETER; + operand.ParamIdx = signature.FindParamIndex(arg.GetParam()); + if (operand.ParamIdx == size_t(-1)) { + ythrow yexception() << exceptionPrefix + << " argument #" << argumentIdx << " param '" << arg.GetParam() + << "' doesn't exist"; + } + if (firstParamIdx == size_t(-1)) { + firstParamIdx = operand.ParamIdx; + } else { + if (strcmp(signature.ParamTypes[firstParamIdx], signature.ParamTypes[operand.ParamIdx]) != 0) { + ythrow yexception() << exceptionPrefix + << " param types do not match"; + } + } + } else { + ythrow yexception() << exceptionPrefix + << " argument #" << argumentIdx + << " is empty"; + } + arguments.push_back(operand); + } + if (arguments.size() != expectedArgumentCount) { + ythrow yexception() << exceptionPrefix + << " incorrect number of arguments (" << arguments.size() + << " present, " << expectedArgumentCount << " expected)"; + } + } + + template <class TArg1, class TArg2> + struct TTraceSecondArg { + // implementation of deprecated std::project2nd + TArg1 operator()(const TArg1&, const TArg2& y) const { + return y; + } + }; + + void TSession::InsertExecutor( + TTraceVariables& traceVariables, size_t bi, const TPredicate* pred, + const NProtoBuf::RepeatedPtrField<TAction>& actions, TProbe* probe, + const bool destructiveActionsAllowed, + const TCustomActionFactory& customActionFactory) { +#ifndef LWTRACE_DISABLE + THolder<IExecutor> exec; + IExecutor* last = nullptr; + TArgumentList arguments; + if (pred) { + double sampleRate = pred->GetSampleRate(); + if (sampleRate != 0.0) { + if (!(0.0 < sampleRate && sampleRate <= 1.0)) { + ythrow yexception() << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " sampling operator" + << " invalid sample rate " << sampleRate << ", expected [0;1]"; + } + exec.Reset(new TSamplingExecutor(sampleRate)); + last = exec.Get(); + } + + for (size_t i = 0; i < pred->OperatorsSize(); i++) { + const TOperator& op = pred->GetOperators(i); + TString exceptionPrefix; + TStringOutput exceptionPrefixOutput(exceptionPrefix); + exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " operator #" << i + 1; + ParseArguments<TOperator>(op, probe->Event.Signature, exceptionPrefix, 2, arguments); + THolder<IExecutor> opExec; + + TArgumentDescription arg0 = arguments.at(0); + TArgumentDescription arg1 = arguments.at(1); + + const char* tName0 = arg0.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg0.ParamIdx]; + const char* tName1 = arg1.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx]; + + TString var0 = op.GetArgument(0).GetVariable(); + TString var1 = op.GetArgument(1).GetVariable(); + + TString val0 = op.GetArgument(0).GetValue(); + TString val1 = op.GetArgument(1).GetValue(); + +#define FOREACH_OPERAND_TYPE_RT(n, t, v, fn, lt, rt) \ + if (rt == arg1.Type) { \ + TOperand<t, rt> rhs(traceVariables, var1, val1, arg1.ParamIdx); \ + opExec.Reset(new TOperatorExecutor<t, fn<t>, lt, rt>(lhs, rhs, invertCompare)); \ + break; \ + } + +#define FOREACH_OPERAND_TYPE_LT(n, t, v, fn, lt) \ + if (lt == arg0.Type) { \ + TOperand<t, lt> lhs(traceVariables, var0, val0, arg0.ParamIdx); \ + FOREACH_RIGHT_TYPE(FOREACH_OPERAND_TYPE_RT, n, t, v, fn, lt) \ + } + +#define FOREACH_PARAMTYPE_MACRO(n, t, v, fn) \ + if ((arg0.ParamIdx == size_t(-1) || strcmp(tName0, n) == 0) && (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0)) { \ + FOREACH_LEFT_TYPE(FOREACH_OPERAND_TYPE_LT, n, t, v, fn); \ + } + + bool invertCompare = EqualToOneOf(op.GetType(), OT_NE, OT_GE, OT_LE); + + switch (op.GetType()) { + case OT_EQ: + case OT_NE: + FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::equal_to); + break; + case OT_LT: + case OT_GE: + FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::less); + break; + case OT_GT: + case OT_LE: + FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::greater); + break; + default: + ythrow yexception() << exceptionPrefix + << " has not supported operator type #" << int(op.GetType()); + } + +#undef FOREACH_OPERAND_TYPE_RT +#undef FOREACH_OPERAND_TYPE_LT +#undef FOREACH_PARAMTYPE_MACRO + + if (!opExec) { + ythrow yexception() << exceptionPrefix + << " has not supported left param #" << arg0.ParamIdx + 1 << " type '" + << (arg0.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg0.ParamIdx] : "?") + << "', or right param #" << arg0.ParamIdx + 1 << " type '" + << (arg1.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg1.ParamIdx] : "?") + << "'"; + } + + if (!exec) { + exec.Reset(opExec.Release()); + last = exec.Get(); + } else { + last->SetNext(opExec.Release()); + last = last->GetNext(); + } + } + } + + for (int i = 0; i < actions.size(); ++i) { + const TAction& action = actions.Get(i); + THolder<IExecutor> actExec; + if (action.HasPrintToStderrAction()) { + actExec.Reset(new TStderrActionExecutor(probe)); + } else if (action.HasLogAction()) { + if (Query.GetLogDurationUs()) { + actExec.Reset(new TLogActionExecutor<TDurationLog>(probe, action.GetLogAction(), &DurationLog)); + } else { + actExec.Reset(new TLogActionExecutor<TCyclicLog>(probe, action.GetLogAction(), &CyclicLog)); + } + } else if (action.HasRunLogShuttleAction()) { + if (Query.GetLogDurationUs()) { + actExec.Reset(new TRunLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetRunLogShuttleAction(), &DurationDepot, &LastTrackId, &LastSpanId)); + } else { + actExec.Reset(new TRunLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetRunLogShuttleAction(), &CyclicDepot, &LastTrackId, &LastSpanId)); + } + } else if (action.HasEditLogShuttleAction()) { + if (Query.GetLogDurationUs()) { + actExec.Reset(new TEditLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetEditLogShuttleAction())); + } else { + actExec.Reset(new TEditLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetEditLogShuttleAction())); + } + } else if (action.HasDropLogShuttleAction()) { + if (Query.GetLogDurationUs()) { + actExec.Reset(new TDropLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetDropLogShuttleAction())); + } else { + actExec.Reset(new TDropLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetDropLogShuttleAction())); + } + } else if (action.HasCustomAction()) { + THolder<TCustomActionExecutor> customExec(customActionFactory.Create(probe, action.GetCustomAction(), this)); + if (customExec) { + if (!customExec->IsDestructive() || destructiveActionsAllowed) { + actExec.Reset(customExec.Release()); + } else { + ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 + << " contains destructive CustomAction, but destructive actions are disabled." + << " Please, consider using --unsafe-lwtrace command line parameter."; + } + } else { + ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 + << " contains unregistered CustomAction '" << action.GetCustomAction().GetName() << "'"; + } + } else if (action.HasKillAction()) { + if (destructiveActionsAllowed) { + actExec.Reset(new NPrivate::TKillActionExecutor(probe)); + } else { + ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 + << " contains destructive KillAction, but destructive actions are disabled." + << " Please, consider using --unsafe-lwtrace command line parameter."; + } + } else if (action.HasSleepAction()) { + if (destructiveActionsAllowed) { + const TSleepAction& sleepAction = action.GetSleepAction(); + if (sleepAction.GetNanoSeconds()) { + ui64 nanoSeconds = sleepAction.GetNanoSeconds(); + actExec.Reset(new NPrivate::TSleepActionExecutor(probe, nanoSeconds)); + } else { + ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 + << " SleepAction missing parameter 'NanoSeconds'"; + } + } else { + ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 + << " contains destructive SleepAction, but destructive actions are disabled." + << " Please, consider using --unsafe-lwtrace command line parameter."; + } + } else if (action.HasStatementAction()) { + const TStatementAction& statement = action.GetStatementAction(); + TString exceptionPrefix; + TStringOutput exceptionPrefixOutput(exceptionPrefix); + exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " action #" << i + 1; + size_t expectedArgumentCount = 3; + if (statement.GetType() == ST_MOV || statement.GetType() == ST_ADD_EQ || statement.GetType() == ST_SUB_EQ) { + expectedArgumentCount = 2; + } else if (statement.GetType() == ST_INC || statement.GetType() == ST_DEC) { + expectedArgumentCount = 1; + } + ParseArguments<TStatementAction>(statement, probe->Event.Signature, exceptionPrefix, expectedArgumentCount, arguments); + + TArgumentDescription arg0 = (expectedArgumentCount <= 0) ? TArgumentDescription() : arguments.at(0); + TArgumentDescription arg1 = (expectedArgumentCount <= 1) ? TArgumentDescription() : arguments.at(1); + TArgumentDescription arg2 = (expectedArgumentCount <= 2) ? TArgumentDescription() : arguments.at(2); + + TString var0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetVariable(); + TString var1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetVariable(); + TString var2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetVariable(); + + TString val0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetValue(); + TString val1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetValue(); + TString val2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetValue(); + + const char* tName1 = (expectedArgumentCount <= 1 || arg1.ParamIdx == size_t(-1)) + ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx]; + const char* tName2 = (expectedArgumentCount <= 2 || arg2.ParamIdx == size_t(-1)) + ? nullptr : probe->Event.Signature.ParamTypes[arg2.ParamIdx]; + + if (arg0.Type == OT_VARIABLE) { + switch (statement.GetType()) { +#define PARSE_UNARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \ + { \ + typedef TUnaryInplaceStatementExecutor<t, fn<TReceiver<t>>> TExec; \ + TReceiver<t> receiver(traceVariables, var0); \ + actExec.Reset(new TExec(receiver)); \ + break; \ + } + +#define PARSE_BINARY_INPLACE_STATEMENT_MACRO2(n, t, v, fn, ft) \ + if (arg1.Type == ft) { \ + typedef TBinaryInplaceStatementExecutor<t, fn<TReceiver<t>, t>, ft> TExec; \ + TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \ + actExec.Reset(new TExec(receiver, firstParam)); \ + break; \ + } + +#define PARSE_BINARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \ + if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \ + TReceiver<t> receiver(traceVariables, var0); \ + FOREACH_RIGHT_TYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO2, n, t, v, fn); \ + } + +#define PARSE_BINARY_STATEMENT_MACRO2(n, t, v, fn, ft) \ + if (arg1.Type == ft) { \ + typedef TBinaryStatementExecutor<t, fn<t, t>, ft> TExec; \ + TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \ + actExec.Reset(new TExec(receiver, firstParam)); \ + break; \ + } + +#define PARSE_BINARY_STATEMENT_MACRO(n, t, v, fn) \ + if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \ + TReceiver<t> receiver(traceVariables, var0); \ + FOREACH_RIGHT_TYPE(PARSE_BINARY_STATEMENT_MACRO2, n, t, v, fn); \ + } + +#define CREATE_OPERAND_GETTER_N(N, type, arg_type) \ + if (arg##N.Type == arg_type) { \ + operand##N.Reset(new TOperandGetter<type, arg_type>(TOperand<type, arg_type>(traceVariables, var##N, val##N, arg##N.ParamIdx))); \ + } + +#define TERNARY_ON_TYPE(n, t, v, fn) \ + if ((arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) && (arg2.ParamIdx == size_t(-1) || strcmp(tName2, n) == 0)) { \ + TAutoPtr<IOperandGetter<t>> operand1, operand2; \ + FOREACH_LEFT_TYPE(CREATE_OPERAND_GETTER_N, 1, t); \ + FOREACH_RIGHT_TYPE(CREATE_OPERAND_GETTER_N, 2, t); \ + if (operand1 && operand2) { \ + actExec.Reset(new TTernaryStatementExecutor<t, fn<t>>( \ + TReceiver<t>(traceVariables, var0), \ + operand1, \ + operand2)); \ + } \ + break; \ + } + +#define IMPLEMENT_TERNARY_STATEMENT(fn) FOR_MATH_PARAMTYPE(TERNARY_ON_TYPE, fn) + + case ST_INC: + FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TInc); + break; + case ST_DEC: + FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TDec); + break; + case ST_MOV: + FOR_MATH_PARAMTYPE(PARSE_BINARY_STATEMENT_MACRO, TTraceSecondArg); + break; + case ST_ADD_EQ: + FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TAddEq); + break; + case ST_SUB_EQ: + FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TSubEq); + break; + case ST_ADD: + IMPLEMENT_TERNARY_STATEMENT(std::plus) + break; + case ST_SUB: + IMPLEMENT_TERNARY_STATEMENT(std::minus) + break; + case ST_MUL: + IMPLEMENT_TERNARY_STATEMENT(std::multiplies) + break; + case ST_DIV: + IMPLEMENT_TERNARY_STATEMENT(std::divides) + break; + case ST_MOD: + IMPLEMENT_TERNARY_STATEMENT(std::modulus) + break; + default: + ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1 + << " has not supported statement type #" << int(statement.GetType()); + } + } + if (!actExec) { + ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1 + << " can't create action"; + } +#undef CREATE_OPERAND_GETTER_N +#undef TERNARY_ON_TYPE +#undef IMPLEMENT_TERNARY_STATEMENT +#undef PARSE_TERNARY_STATEMENT_MACRO +#undef PARSE_BINARY_STATEMENT_MACRO +#undef PARSE_BINARY_INPLACE_STATEMENT_MACRO +#undef PARSE_UNARY_INPLACE_STATEMENT_MACRO + } else { + ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1 + << " has not supported action '" << action.ShortDebugString() << "'"; + } + if (!exec) { + exec.Reset(actExec.Release()); + last = exec.Get(); + } else { + last->SetNext(actExec.Release()); + last = last->GetNext(); + } + } + + if (!probe->Attach(exec.Get())) { + ythrow yexception() << "block #" << bi + 1 + << " cannot be attached to probe '" << probe->Event.Name << "': no free slots"; + } + Probes.push_back(std::make_pair(probe, exec.Release())); +#else + Y_UNUSED(bi); + Y_UNUSED(pred); + Y_UNUSED(actions); + Y_UNUSED(probe); + Y_UNUSED(destructiveActionsAllowed); + Y_UNUSED(traceVariables); + Y_UNUSED(customActionFactory); +#endif + } + + TSession::TSession(ui64 traceIdx, + TProbeRegistry& registry, + const TQuery& query, + const bool destructiveActionsAllowed, + const TCustomActionFactory& customActionFactory) + : StartTime(TInstant::Now()) + , TraceIdx(traceIdx) + , Registry(registry) + , StoreDuration(TDuration::MicroSeconds(query.GetLogDurationUs() * 11 / 10)) // +10% to try avoid truncation while reading multiple threads/traces + , ReadDuration(TDuration::MicroSeconds(query.GetLogDurationUs())) + , CyclicLog(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000) + , DurationLog(StoreDuration) + , CyclicDepot(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000) + , DurationDepot(StoreDuration) + , LastTrackId(0) + , LastSpanId(0) + , Attached(true) + , Query(query) + { + try { + for (size_t bi = 0; bi < query.BlocksSize(); bi++) { + const TBlock& block = query.GetBlocks(bi); + if (!block.HasProbeDesc()) { + ythrow yexception() << "block #" << bi + 1 << " has no probe description"; + } + const TProbeDesc& pdesc = block.GetProbeDesc(); + const TPredicate* pred = block.HasPredicate() ? &block.GetPredicate() : nullptr; + if (block.ActionSize() < 1) { + ythrow yexception() << "block #" << bi + 1 << " has no action"; + } + const NProtoBuf::RepeatedPtrField<TAction>& actions = block.action(); + if (pdesc.GetName() && pdesc.GetProvider()) { + TProbeRegistry::TProbesAccessor probes(Registry); + bool found = false; + for (auto& kv : probes) { + TProbe* probe = kv.first; + if (probe->Event.Name == pdesc.GetName() && probe->Event.GetProvider() == pdesc.GetProvider()) { + InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory); + found = true; + break; + } + } + if (!found) { + ythrow yexception() << "block #" << bi + 1 << " has no matching probe with name '" + << pdesc.GetName() << "' provider '" << pdesc.GetProvider() << "'"; + } + } else if (pdesc.GetGroup()) { + bool found = false; + TProbeRegistry::TProbesAccessor probes(Registry); + for (auto& kv : probes) { + TProbe* probe = kv.first; + for (const char* const* gi = probe->Event.Groups; *gi != nullptr; gi++) { + if (*gi == pdesc.GetGroup()) { + InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory); + found = true; + break; + } + } + } + if (!found) { + ythrow yexception() << "block #" << bi + 1 + << " has no matching probes for group '" << pdesc.GetGroup() << "'"; + } + } else { + ythrow yexception() << "block #" << bi + 1 << " has bad probe description: name '" << pdesc.GetName() + << "' provider '" << pdesc.GetProvider() + << "' group '" << pdesc.GetGroup() << "'"; + } + } + } catch (...) { + Destroy(); + throw; + } + } + + void TSession::Destroy() { + Detach(); + for (auto& probe : Probes) { + delete probe.second; + } + } + + TSession::~TSession() { + Destroy(); + } + + void TSession::Detach() { + if (Attached) { + for (auto& p : Probes) { + TProbe* probe = p.first; + IExecutor* exec = p.second; + probe->Detach(exec); + } + Attached = false; + } + } + + size_t TSession::GetEventsCount() const { + return CyclicLog.GetEventsCount() + DurationLog.GetEventsCount() + CyclicDepot.GetEventsCount() + DurationDepot.GetEventsCount(); + } + + size_t TSession::GetThreadsCount() const { + return CyclicLog.GetThreadsCount() + DurationLog.GetThreadsCount() + CyclicDepot.GetThreadsCount() + DurationDepot.GetThreadsCount(); + } + + class TReadToProtobuf { + private: + TMap<TThread::TId, TVector<TLogItem>> Items; + + public: + void ToProtobuf(TLogPb& pb) const { + TSet<TProbe*> probes; + ui64 eventsCount = 0; + for (auto kv : Items) { + TThreadLogPb* tpb = pb.AddThreadLogs(); + tpb->SetThreadId(kv.first); + for (TLogItem& item : kv.second) { + item.ToProtobuf(*tpb->AddLogItems()); + probes.insert(item.Probe); + eventsCount++; + } + } + pb.SetEventsCount(eventsCount); + for (TProbe* probe : probes) { + probe->Event.ToProtobuf(*pb.AddEvents()); + } + } + + void Push(TThread::TId tid, const TLogItem& item) { + // Avoid any expansive operations in Push(), because it executes under lock and blocks program being traced + Items[tid].push_back(item); + } + }; + + void TSession::ToProtobuf(TLogPb& pb) const { + TReadToProtobuf reader; + ReadItems(reader); + reader.ToProtobuf(pb); + pb.MutableQuery()->CopyFrom(Query); + pb.SetCrtTime(TInstant::Now().GetValue()); + } + + TManager::TManager(TProbeRegistry& registry, bool allowDestructiveActions) + : Registry(registry) + , DestructiveActionsAllowed(allowDestructiveActions) + , SerializingExecutor(new TRunLogShuttleActionExecutor<TCyclicDepot>(0, {}, nullptr, nullptr, nullptr)) + { + } + + TManager::~TManager() { + for (auto& trace : Traces) { + delete trace.second; + } + } + + bool TManager::HasTrace(const TString& id) const { + TGuard<TMutex> g(Mtx); + return Traces.contains(id); + } + + const TSession* TManager::GetTrace(const TString& id) const { + TGuard<TMutex> g(Mtx); + TTraces::const_iterator it = Traces.find(id); + if (it == Traces.end()) { + ythrow yexception() << "trace id '" << id << "' is not used"; + } else { + return it->second; + } + } + + void TManager::New(const TString& id, const TQuery& query) { + TGuard<TMutex> g(Mtx); + if (Traces.find(id) == Traces.end()) { + TSession* trace = new TSession(++LastTraceIdx, Registry, query, GetDestructiveActionsAllowed(), CustomActionFactory); + Traces[id] = trace; + } else { + ythrow yexception() << "trace id '" << id << "' is already used"; + } + } + + void TManager::Delete(const TString& id) { + TGuard<TMutex> g(Mtx); + TTraces::iterator it = Traces.find(id); + if (it == Traces.end()) { + ythrow yexception() << "trace id '" << id << "' is not used"; + } else { + delete it->second; + Traces.erase(it); + } + } + + void TManager::Stop(const TString& id) { + TGuard<TMutex> g(Mtx); + TTraces::iterator it = Traces.find(id); + if (it == Traces.end()) { + ythrow yexception() << "trace id '" << id << "' is not used"; + } else { + it->second->Detach(); + } + } +} |