aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/lwtrace/trace.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/lwtrace/trace.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/lwtrace/trace.cpp')
-rw-r--r--library/cpp/lwtrace/trace.cpp1051
1 files changed, 1051 insertions, 0 deletions
diff --git a/library/cpp/lwtrace/trace.cpp b/library/cpp/lwtrace/trace.cpp
new file mode 100644
index 0000000000..3c974c85a0
--- /dev/null
+++ b/library/cpp/lwtrace/trace.cpp
@@ -0,0 +1,1051 @@
+#include "all.h"
+#include "kill_action.h"
+#include "log_shuttle.h"
+#include "preprocessor.h"
+#include "sleep_action.h"
+#include "stderr_writer.h"
+#include "google/protobuf/repeated_field.h"
+
+#include <util/generic/map.h>
+#include <util/random/random.h>
+
+#include <functional>
+
+namespace NLWTrace {
+#ifndef LWTRACE_DISABLE
+
+// Define static strings for name of each parameter type
+#define FOREACH_PARAMTYPE_MACRO(n, t, v) \
+ const char* TParamType<t>::NameString = n; \
+ /**/
+ FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO)
+ FOR_NIL_PARAMTYPE(FOREACH_PARAMTYPE_MACRO)
+#undef FOREACH_PARAMTYPE_MACRO
+
+#endif
+
+ void TProbeRegistry::AddProbesList(TProbe** reg) {
+ TGuard<TMutex> g(Mutex);
+ if (reg == nullptr) {
+ return;
+ }
+ for (TProbe** i = reg; *i != nullptr; i++) {
+ AddProbeNoLock(new TStaticBox(*i));
+ }
+ }
+
+ void TProbeRegistry::AddProbe(const TBoxPtr& box) {
+ TGuard<TMutex> g(Mutex);
+ AddProbeNoLock(box);
+ }
+
+ void TProbeRegistry::RemoveProbe(TProbe* probe) {
+ TGuard<TMutex> g(Mutex);
+ RemoveProbeNoLock(probe);
+ }
+
+ void TProbeRegistry::AddProbeNoLock(const TBoxPtr& box) {
+ TProbe* probe = box->GetProbe();
+ if (Probes.contains(probe)) {
+ return; // silently skip probe double registration
+ }
+ TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name);
+ Y_VERIFY(Ids.count(key) == 0, "duplicate provider:probe pair %s:%s", key.first.data(), key.second.data());
+ Probes.emplace(probe, box);
+ Ids.insert(key);
+ }
+
+ void TProbeRegistry::RemoveProbeNoLock(TProbe* probe) {
+ auto iter = Probes.find(probe);
+ if (iter != Probes.end()) {
+ TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name);
+ Ids.erase(key);
+ Probes.erase(iter);
+ } else {
+ // silently skip probe double unregistration
+ }
+ }
+
+ TAtomic* GetVariablePtr(TSession::TTraceVariables& traceVariables, const TString& name) {
+ TSession::TTraceVariables::iterator it = traceVariables.find(name);
+ if (it == traceVariables.end()) {
+ TAtomicBase zero = 0;
+ traceVariables[name] = zero;
+ return &traceVariables[name];
+ }
+ return &((*it).second);
+ }
+
+ typedef enum {
+ OT_LITERAL = 0,
+ OT_PARAMETER = 1,
+ OT_VARIABLE = 2
+ } EOperandType;
+
+ template <class T, EOperandType>
+ class TOperand;
+
+ template <class T>
+ class TOperand<T, OT_LITERAL> {
+ private:
+ T ImmediateValue;
+
+ public:
+ TOperand(TSession::TTraceVariables&, const TString&, const TString& value, size_t) {
+ ImmediateValue = TParamConv<T>::FromString(value);
+ }
+ const T& Get(const TParams&) {
+ return ImmediateValue;
+ }
+ };
+
+ template <class T>
+ class TOperand<T, OT_PARAMETER> {
+ private:
+ size_t Idx;
+
+ public:
+ TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t idx) {
+ Idx = idx;
+ }
+
+ const T& Get(const TParams& params) {
+ return params.Param[Idx].template Get<T>();
+ }
+ };
+
+ template <class T>
+ class TOperand<T, OT_VARIABLE> {
+ private:
+ TAtomic* Variable;
+
+ public:
+ TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) {
+ Variable = GetVariablePtr(traceVariables, name);
+ }
+
+ const T Get(const TParams&) {
+ return (T)AtomicGet(*Variable);
+ }
+
+ void Set(const T& value) {
+ AtomicSet(*Variable, value);
+ }
+
+ void Inc() {
+ AtomicIncrement(*Variable);
+ }
+
+ void Dec() {
+ AtomicDecrement(*Variable);
+ }
+
+ void Add(const TAtomicBase value) {
+ AtomicAdd(*Variable, value);
+ }
+
+ void Sub(const TAtomicBase value) {
+ AtomicSub(*Variable, value);
+ }
+ };
+
+ template <>
+ class TOperand<TCheck, OT_VARIABLE> {
+ private:
+ TAtomic* Variable;
+
+ public:
+ TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) {
+ Variable = GetVariablePtr(traceVariables, name);
+ }
+
+ const TCheck Get(const TParams&) {
+ return TCheck(AtomicGet(*Variable));
+ }
+
+ void Set(const TCheck& value) {
+ AtomicSet(*Variable, value.Value);
+ }
+
+ void Add(const TCheck& value) {
+ AtomicAdd(*Variable, value.Value);
+ }
+
+ void Sub(const TCheck value) {
+ AtomicSub(*Variable, value.Value);
+ }
+
+ void Inc() {
+ AtomicIncrement(*Variable);
+ }
+
+ void Dec() {
+ AtomicDecrement(*Variable);
+ }
+ };
+
+ template <>
+ class TOperand<TString, OT_VARIABLE> {
+ private:
+ TString Dummy;
+
+ public:
+ TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) {
+ }
+
+ const TString Get(const TParams&) {
+ return Dummy;
+ }
+
+ void Set(const TString&) {
+ }
+ };
+
+ template <>
+ class TOperand<TSymbol, OT_VARIABLE> {
+ private:
+ TSymbol Dummy;
+
+ public:
+ TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) {
+ }
+
+ const TSymbol Get(const TParams&) {
+ return Dummy;
+ }
+
+ void Set(const TSymbol&) {
+ }
+ };
+
+ // IOperandGetter: hide concrete EOperandType, to save compilation time
+ template <class T>
+ struct IOperandGetter {
+ virtual const T Get(const TParams& params) = 0;
+ virtual ~IOperandGetter() {
+ }
+ };
+
+ template <class T, EOperandType TParam>
+ class TOperandGetter: public IOperandGetter<T> {
+ private:
+ TOperand<T, TParam> Op;
+
+ public:
+ TOperandGetter(const TOperand<T, TParam>& op)
+ : Op(op)
+ {
+ }
+
+ const T Get(const TParams& params) override {
+ return Op.Get(params);
+ }
+ };
+
+ template <class T>
+ class TReceiver: public TOperand<T, OT_VARIABLE> {
+ public:
+ TReceiver(TSession::TTraceVariables& traceVariables, const TString& name)
+ : TOperand<T, OT_VARIABLE>(traceVariables, name, nullptr, 0)
+ {
+ }
+ };
+
+ template <class TP, class TPredicate>
+ static bool CmpFunc(TP a, TP b) {
+ return TPredicate()(a, b);
+ }
+
+ template <class TP, class TFunc, EOperandType TLhs, EOperandType TRhs>
+ class TOperatorExecutor: public IExecutor {
+ private:
+ bool InvertCompare;
+ TOperand<TP, TLhs> Lhs;
+ TOperand<TP, TRhs> Rhs;
+
+ bool DoExecute(TOrbit&, const TParams& params) override {
+ return TFunc()(Lhs.Get(params), Rhs.Get(params)) != InvertCompare;
+ }
+
+ public:
+ TOperatorExecutor(const TOperand<TP, TLhs>& lhs, const TOperand<TP, TRhs>& rhs, bool invertCompare)
+ : InvertCompare(invertCompare)
+ , Lhs(lhs)
+ , Rhs(rhs)
+ {
+ }
+ };
+
+ template <class TR, class TP>
+ struct TAddEq {
+ void operator()(TR& x, TP y) const {
+ x.Add(y);
+ }
+ };
+ template <class TR, class TP>
+ struct TSubEq {
+ void operator()(TR& x, TP y) const {
+ x.Sub(y);
+ }
+ };
+ template <class TR>
+ struct TInc {
+ void operator()(TR& x) const {
+ x.Inc();
+ }
+ };
+ template <class TR>
+ struct TDec {
+ void operator()(TR& x) const {
+ x.Dec();
+ }
+ };
+
+ template <class TP, class TFunc>
+ class TUnaryInplaceStatementExecutor: public IExecutor {
+ private:
+ TFunc Func;
+ TReceiver<TP> Receiver;
+
+ bool DoExecute(TOrbit&, const TParams&) override {
+ Func(Receiver);
+ return true;
+ }
+
+ public:
+ TUnaryInplaceStatementExecutor(TReceiver<TP>& receiver)
+ : Receiver(receiver)
+ {
+ }
+ };
+
+ template <class TP, class TFunc, EOperandType TParam>
+ class TBinaryInplaceStatementExecutor: public IExecutor {
+ private:
+ TFunc Func;
+ TReceiver<TP> Receiver;
+ TOperand<TP, TParam> Param;
+
+ bool DoExecute(TOrbit&, const TParams& params) override {
+ Func(Receiver, Param.Get(params));
+ return true;
+ }
+
+ public:
+ TBinaryInplaceStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TParam>& param)
+ : Receiver(receiver)
+ , Param(param)
+ {
+ }
+ };
+
+ template <class TP, class TFunc, EOperandType TFirstParam>
+ class TBinaryStatementExecutor: public IExecutor {
+ private:
+ TFunc Func;
+ TReceiver<TP> Receiver;
+ TOperand<TP, TFirstParam> FirstParam;
+
+ bool DoExecute(TOrbit&, const TParams& params) override {
+ Receiver.Set(Func(Receiver.Get(params), FirstParam.Get(params)));
+ return true;
+ }
+
+ public:
+ TBinaryStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TFirstParam>& firstParam)
+ : Receiver(receiver)
+ , FirstParam(firstParam)
+ {
+ }
+ };
+
+ template <class TP, class TFunc>
+ class TTernaryStatementExecutor: public IExecutor {
+ private:
+ TFunc Func;
+ TReceiver<TP> Receiver;
+
+ TAutoPtr<IOperandGetter<TP>> FirstParam;
+ TAutoPtr<IOperandGetter<TP>> SecondParam;
+
+ bool DoExecute(TOrbit&, const TParams& params) override {
+ Receiver.Set(Func(FirstParam->Get(params), SecondParam->Get(params)));
+ return true;
+ }
+
+ public:
+ TTernaryStatementExecutor(const TReceiver<TP>& receiver,
+ TAutoPtr<IOperandGetter<TP>> firstParam,
+ TAutoPtr<IOperandGetter<TP>> secondParam)
+ : Receiver(receiver)
+ , FirstParam(firstParam)
+ , SecondParam(secondParam)
+ {
+ }
+ };
+
+ template <class TLog>
+ class TLogActionExecutor: public IExecutor {
+ private:
+ bool LogParams;
+ bool LogTimestamp;
+ intptr_t* MaxRecords;
+ TAtomic Records;
+ TProbe* Probe;
+ TLog* Log;
+
+ bool DoExecute(TOrbit&, const TParams& params) override {
+ if (MaxRecords != nullptr) {
+ while (true) {
+ intptr_t a = AtomicGet(Records);
+ if (a >= *MaxRecords) {
+ return true;
+ }
+ if (AtomicCas(&Records, a + 1, a)) {
+ Write(params);
+ return true;
+ }
+ }
+ } else {
+ Write(params);
+ return true;
+ }
+ }
+
+ void Write(const TParams& params) {
+ typename TLog::TAccessor la(*Log);
+ if (typename TLog::TItem* item = la.Add()) {
+ item->Probe = Probe;
+ if (LogParams) {
+ if ((item->SavedParamsCount = Probe->Event.Signature.ParamCount) > 0) {
+ Probe->Event.Signature.CloneParams(item->Params, params);
+ }
+ } else {
+ item->SavedParamsCount = 0;
+ }
+ if (LogTimestamp) {
+ item->Timestamp = TInstant::Now();
+ }
+ item->TimestampCycles = GetCycleCount();
+ }
+ }
+
+ public:
+ TLogActionExecutor(TProbe* probe, const TLogAction& action, TLog* log)
+ : LogParams(!action.GetDoNotLogParams())
+ , LogTimestamp(action.GetLogTimestamp())
+ , MaxRecords(action.GetMaxRecords() ? new intptr_t(action.GetMaxRecords()) : nullptr)
+ , Records(0)
+ , Probe(probe)
+ , Log(log)
+ {
+ }
+
+ ~TLogActionExecutor() override {
+ delete MaxRecords;
+ }
+ };
+
+ class TSamplingExecutor: public IExecutor {
+ private:
+ double SampleRate;
+
+ public:
+ explicit TSamplingExecutor(double sampleRate)
+ : SampleRate(sampleRate)
+ {}
+
+ bool DoExecute(TOrbit&, const TParams&) override {
+ return RandomNumber<double>() < SampleRate;
+ }
+ };
+
+ typedef struct {
+ EOperandType Type;
+ size_t ParamIdx;
+ } TArgumentDescription;
+
+ using TArgumentList = TVector<TArgumentDescription>;
+
+ template <class T>
+ void ParseArguments(const T& op, const TSignature& signature, const TString& exceptionPrefix, size_t expectedArgumentCount, TArgumentList& arguments) {
+ arguments.clear();
+ size_t firstParamIdx = size_t(-1);
+ for (size_t argumentIdx = 0; argumentIdx < op.ArgumentSize(); ++argumentIdx) {
+ const TArgument& arg = op.GetArgument(argumentIdx);
+ TArgumentDescription operand;
+ operand.ParamIdx = size_t(-1);
+ if (arg.GetVariable()) {
+ operand.Type = OT_VARIABLE;
+ } else if (arg.GetValue()) {
+ operand.Type = OT_LITERAL;
+ } else if (arg.GetParam()) {
+ operand.Type = OT_PARAMETER;
+ operand.ParamIdx = signature.FindParamIndex(arg.GetParam());
+ if (operand.ParamIdx == size_t(-1)) {
+ ythrow yexception() << exceptionPrefix
+ << " argument #" << argumentIdx << " param '" << arg.GetParam()
+ << "' doesn't exist";
+ }
+ if (firstParamIdx == size_t(-1)) {
+ firstParamIdx = operand.ParamIdx;
+ } else {
+ if (strcmp(signature.ParamTypes[firstParamIdx], signature.ParamTypes[operand.ParamIdx]) != 0) {
+ ythrow yexception() << exceptionPrefix
+ << " param types do not match";
+ }
+ }
+ } else {
+ ythrow yexception() << exceptionPrefix
+ << " argument #" << argumentIdx
+ << " is empty";
+ }
+ arguments.push_back(operand);
+ }
+ if (arguments.size() != expectedArgumentCount) {
+ ythrow yexception() << exceptionPrefix
+ << " incorrect number of arguments (" << arguments.size()
+ << " present, " << expectedArgumentCount << " expected)";
+ }
+ }
+
+ template <class TArg1, class TArg2>
+ struct TTraceSecondArg {
+ // implementation of deprecated std::project2nd
+ TArg1 operator()(const TArg1&, const TArg2& y) const {
+ return y;
+ }
+ };
+
+ void TSession::InsertExecutor(
+ TTraceVariables& traceVariables, size_t bi, const TPredicate* pred,
+ const NProtoBuf::RepeatedPtrField<TAction>& actions, TProbe* probe,
+ const bool destructiveActionsAllowed,
+ const TCustomActionFactory& customActionFactory) {
+#ifndef LWTRACE_DISABLE
+ THolder<IExecutor> exec;
+ IExecutor* last = nullptr;
+ TArgumentList arguments;
+ if (pred) {
+ double sampleRate = pred->GetSampleRate();
+ if (sampleRate != 0.0) {
+ if (!(0.0 < sampleRate && sampleRate <= 1.0)) {
+ ythrow yexception() << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " sampling operator"
+ << " invalid sample rate " << sampleRate << ", expected [0;1]";
+ }
+ exec.Reset(new TSamplingExecutor(sampleRate));
+ last = exec.Get();
+ }
+
+ for (size_t i = 0; i < pred->OperatorsSize(); i++) {
+ const TOperator& op = pred->GetOperators(i);
+ TString exceptionPrefix;
+ TStringOutput exceptionPrefixOutput(exceptionPrefix);
+ exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " operator #" << i + 1;
+ ParseArguments<TOperator>(op, probe->Event.Signature, exceptionPrefix, 2, arguments);
+ THolder<IExecutor> opExec;
+
+ TArgumentDescription arg0 = arguments.at(0);
+ TArgumentDescription arg1 = arguments.at(1);
+
+ const char* tName0 = arg0.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg0.ParamIdx];
+ const char* tName1 = arg1.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx];
+
+ TString var0 = op.GetArgument(0).GetVariable();
+ TString var1 = op.GetArgument(1).GetVariable();
+
+ TString val0 = op.GetArgument(0).GetValue();
+ TString val1 = op.GetArgument(1).GetValue();
+
+#define FOREACH_OPERAND_TYPE_RT(n, t, v, fn, lt, rt) \
+ if (rt == arg1.Type) { \
+ TOperand<t, rt> rhs(traceVariables, var1, val1, arg1.ParamIdx); \
+ opExec.Reset(new TOperatorExecutor<t, fn<t>, lt, rt>(lhs, rhs, invertCompare)); \
+ break; \
+ }
+
+#define FOREACH_OPERAND_TYPE_LT(n, t, v, fn, lt) \
+ if (lt == arg0.Type) { \
+ TOperand<t, lt> lhs(traceVariables, var0, val0, arg0.ParamIdx); \
+ FOREACH_RIGHT_TYPE(FOREACH_OPERAND_TYPE_RT, n, t, v, fn, lt) \
+ }
+
+#define FOREACH_PARAMTYPE_MACRO(n, t, v, fn) \
+ if ((arg0.ParamIdx == size_t(-1) || strcmp(tName0, n) == 0) && (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0)) { \
+ FOREACH_LEFT_TYPE(FOREACH_OPERAND_TYPE_LT, n, t, v, fn); \
+ }
+
+ bool invertCompare = EqualToOneOf(op.GetType(), OT_NE, OT_GE, OT_LE);
+
+ switch (op.GetType()) {
+ case OT_EQ:
+ case OT_NE:
+ FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::equal_to);
+ break;
+ case OT_LT:
+ case OT_GE:
+ FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::less);
+ break;
+ case OT_GT:
+ case OT_LE:
+ FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::greater);
+ break;
+ default:
+ ythrow yexception() << exceptionPrefix
+ << " has not supported operator type #" << int(op.GetType());
+ }
+
+#undef FOREACH_OPERAND_TYPE_RT
+#undef FOREACH_OPERAND_TYPE_LT
+#undef FOREACH_PARAMTYPE_MACRO
+
+ if (!opExec) {
+ ythrow yexception() << exceptionPrefix
+ << " has not supported left param #" << arg0.ParamIdx + 1 << " type '"
+ << (arg0.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg0.ParamIdx] : "?")
+ << "', or right param #" << arg0.ParamIdx + 1 << " type '"
+ << (arg1.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg1.ParamIdx] : "?")
+ << "'";
+ }
+
+ if (!exec) {
+ exec.Reset(opExec.Release());
+ last = exec.Get();
+ } else {
+ last->SetNext(opExec.Release());
+ last = last->GetNext();
+ }
+ }
+ }
+
+ for (int i = 0; i < actions.size(); ++i) {
+ const TAction& action = actions.Get(i);
+ THolder<IExecutor> actExec;
+ if (action.HasPrintToStderrAction()) {
+ actExec.Reset(new TStderrActionExecutor(probe));
+ } else if (action.HasLogAction()) {
+ if (Query.GetLogDurationUs()) {
+ actExec.Reset(new TLogActionExecutor<TDurationLog>(probe, action.GetLogAction(), &DurationLog));
+ } else {
+ actExec.Reset(new TLogActionExecutor<TCyclicLog>(probe, action.GetLogAction(), &CyclicLog));
+ }
+ } else if (action.HasRunLogShuttleAction()) {
+ if (Query.GetLogDurationUs()) {
+ actExec.Reset(new TRunLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetRunLogShuttleAction(), &DurationDepot, &LastTrackId, &LastSpanId));
+ } else {
+ actExec.Reset(new TRunLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetRunLogShuttleAction(), &CyclicDepot, &LastTrackId, &LastSpanId));
+ }
+ } else if (action.HasEditLogShuttleAction()) {
+ if (Query.GetLogDurationUs()) {
+ actExec.Reset(new TEditLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetEditLogShuttleAction()));
+ } else {
+ actExec.Reset(new TEditLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetEditLogShuttleAction()));
+ }
+ } else if (action.HasDropLogShuttleAction()) {
+ if (Query.GetLogDurationUs()) {
+ actExec.Reset(new TDropLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetDropLogShuttleAction()));
+ } else {
+ actExec.Reset(new TDropLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetDropLogShuttleAction()));
+ }
+ } else if (action.HasCustomAction()) {
+ THolder<TCustomActionExecutor> customExec(customActionFactory.Create(probe, action.GetCustomAction(), this));
+ if (customExec) {
+ if (!customExec->IsDestructive() || destructiveActionsAllowed) {
+ actExec.Reset(customExec.Release());
+ } else {
+ ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
+ << " contains destructive CustomAction, but destructive actions are disabled."
+ << " Please, consider using --unsafe-lwtrace command line parameter.";
+ }
+ } else {
+ ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
+ << " contains unregistered CustomAction '" << action.GetCustomAction().GetName() << "'";
+ }
+ } else if (action.HasKillAction()) {
+ if (destructiveActionsAllowed) {
+ actExec.Reset(new NPrivate::TKillActionExecutor(probe));
+ } else {
+ ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
+ << " contains destructive KillAction, but destructive actions are disabled."
+ << " Please, consider using --unsafe-lwtrace command line parameter.";
+ }
+ } else if (action.HasSleepAction()) {
+ if (destructiveActionsAllowed) {
+ const TSleepAction& sleepAction = action.GetSleepAction();
+ if (sleepAction.GetNanoSeconds()) {
+ ui64 nanoSeconds = sleepAction.GetNanoSeconds();
+ actExec.Reset(new NPrivate::TSleepActionExecutor(probe, nanoSeconds));
+ } else {
+ ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
+ << " SleepAction missing parameter 'NanoSeconds'";
+ }
+ } else {
+ ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
+ << " contains destructive SleepAction, but destructive actions are disabled."
+ << " Please, consider using --unsafe-lwtrace command line parameter.";
+ }
+ } else if (action.HasStatementAction()) {
+ const TStatementAction& statement = action.GetStatementAction();
+ TString exceptionPrefix;
+ TStringOutput exceptionPrefixOutput(exceptionPrefix);
+ exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " action #" << i + 1;
+ size_t expectedArgumentCount = 3;
+ if (statement.GetType() == ST_MOV || statement.GetType() == ST_ADD_EQ || statement.GetType() == ST_SUB_EQ) {
+ expectedArgumentCount = 2;
+ } else if (statement.GetType() == ST_INC || statement.GetType() == ST_DEC) {
+ expectedArgumentCount = 1;
+ }
+ ParseArguments<TStatementAction>(statement, probe->Event.Signature, exceptionPrefix, expectedArgumentCount, arguments);
+
+ TArgumentDescription arg0 = (expectedArgumentCount <= 0) ? TArgumentDescription() : arguments.at(0);
+ TArgumentDescription arg1 = (expectedArgumentCount <= 1) ? TArgumentDescription() : arguments.at(1);
+ TArgumentDescription arg2 = (expectedArgumentCount <= 2) ? TArgumentDescription() : arguments.at(2);
+
+ TString var0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetVariable();
+ TString var1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetVariable();
+ TString var2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetVariable();
+
+ TString val0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetValue();
+ TString val1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetValue();
+ TString val2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetValue();
+
+ const char* tName1 = (expectedArgumentCount <= 1 || arg1.ParamIdx == size_t(-1))
+ ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx];
+ const char* tName2 = (expectedArgumentCount <= 2 || arg2.ParamIdx == size_t(-1))
+ ? nullptr : probe->Event.Signature.ParamTypes[arg2.ParamIdx];
+
+ if (arg0.Type == OT_VARIABLE) {
+ switch (statement.GetType()) {
+#define PARSE_UNARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \
+ { \
+ typedef TUnaryInplaceStatementExecutor<t, fn<TReceiver<t>>> TExec; \
+ TReceiver<t> receiver(traceVariables, var0); \
+ actExec.Reset(new TExec(receiver)); \
+ break; \
+ }
+
+#define PARSE_BINARY_INPLACE_STATEMENT_MACRO2(n, t, v, fn, ft) \
+ if (arg1.Type == ft) { \
+ typedef TBinaryInplaceStatementExecutor<t, fn<TReceiver<t>, t>, ft> TExec; \
+ TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \
+ actExec.Reset(new TExec(receiver, firstParam)); \
+ break; \
+ }
+
+#define PARSE_BINARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \
+ if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \
+ TReceiver<t> receiver(traceVariables, var0); \
+ FOREACH_RIGHT_TYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO2, n, t, v, fn); \
+ }
+
+#define PARSE_BINARY_STATEMENT_MACRO2(n, t, v, fn, ft) \
+ if (arg1.Type == ft) { \
+ typedef TBinaryStatementExecutor<t, fn<t, t>, ft> TExec; \
+ TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \
+ actExec.Reset(new TExec(receiver, firstParam)); \
+ break; \
+ }
+
+#define PARSE_BINARY_STATEMENT_MACRO(n, t, v, fn) \
+ if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \
+ TReceiver<t> receiver(traceVariables, var0); \
+ FOREACH_RIGHT_TYPE(PARSE_BINARY_STATEMENT_MACRO2, n, t, v, fn); \
+ }
+
+#define CREATE_OPERAND_GETTER_N(N, type, arg_type) \
+ if (arg##N.Type == arg_type) { \
+ operand##N.Reset(new TOperandGetter<type, arg_type>(TOperand<type, arg_type>(traceVariables, var##N, val##N, arg##N.ParamIdx))); \
+ }
+
+#define TERNARY_ON_TYPE(n, t, v, fn) \
+ if ((arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) && (arg2.ParamIdx == size_t(-1) || strcmp(tName2, n) == 0)) { \
+ TAutoPtr<IOperandGetter<t>> operand1, operand2; \
+ FOREACH_LEFT_TYPE(CREATE_OPERAND_GETTER_N, 1, t); \
+ FOREACH_RIGHT_TYPE(CREATE_OPERAND_GETTER_N, 2, t); \
+ if (operand1 && operand2) { \
+ actExec.Reset(new TTernaryStatementExecutor<t, fn<t>>( \
+ TReceiver<t>(traceVariables, var0), \
+ operand1, \
+ operand2)); \
+ } \
+ break; \
+ }
+
+#define IMPLEMENT_TERNARY_STATEMENT(fn) FOR_MATH_PARAMTYPE(TERNARY_ON_TYPE, fn)
+
+ case ST_INC:
+ FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TInc);
+ break;
+ case ST_DEC:
+ FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TDec);
+ break;
+ case ST_MOV:
+ FOR_MATH_PARAMTYPE(PARSE_BINARY_STATEMENT_MACRO, TTraceSecondArg);
+ break;
+ case ST_ADD_EQ:
+ FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TAddEq);
+ break;
+ case ST_SUB_EQ:
+ FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TSubEq);
+ break;
+ case ST_ADD:
+ IMPLEMENT_TERNARY_STATEMENT(std::plus)
+ break;
+ case ST_SUB:
+ IMPLEMENT_TERNARY_STATEMENT(std::minus)
+ break;
+ case ST_MUL:
+ IMPLEMENT_TERNARY_STATEMENT(std::multiplies)
+ break;
+ case ST_DIV:
+ IMPLEMENT_TERNARY_STATEMENT(std::divides)
+ break;
+ case ST_MOD:
+ IMPLEMENT_TERNARY_STATEMENT(std::modulus)
+ break;
+ default:
+ ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
+ << " has not supported statement type #" << int(statement.GetType());
+ }
+ }
+ if (!actExec) {
+ ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
+ << " can't create action";
+ }
+#undef CREATE_OPERAND_GETTER_N
+#undef TERNARY_ON_TYPE
+#undef IMPLEMENT_TERNARY_STATEMENT
+#undef PARSE_TERNARY_STATEMENT_MACRO
+#undef PARSE_BINARY_STATEMENT_MACRO
+#undef PARSE_BINARY_INPLACE_STATEMENT_MACRO
+#undef PARSE_UNARY_INPLACE_STATEMENT_MACRO
+ } else {
+ ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
+ << " has not supported action '" << action.ShortDebugString() << "'";
+ }
+ if (!exec) {
+ exec.Reset(actExec.Release());
+ last = exec.Get();
+ } else {
+ last->SetNext(actExec.Release());
+ last = last->GetNext();
+ }
+ }
+
+ if (!probe->Attach(exec.Get())) {
+ ythrow yexception() << "block #" << bi + 1
+ << " cannot be attached to probe '" << probe->Event.Name << "': no free slots";
+ }
+ Probes.push_back(std::make_pair(probe, exec.Release()));
+#else
+ Y_UNUSED(bi);
+ Y_UNUSED(pred);
+ Y_UNUSED(actions);
+ Y_UNUSED(probe);
+ Y_UNUSED(destructiveActionsAllowed);
+ Y_UNUSED(traceVariables);
+ Y_UNUSED(customActionFactory);
+#endif
+ }
+
+ TSession::TSession(ui64 traceIdx,
+ TProbeRegistry& registry,
+ const TQuery& query,
+ const bool destructiveActionsAllowed,
+ const TCustomActionFactory& customActionFactory)
+ : StartTime(TInstant::Now())
+ , TraceIdx(traceIdx)
+ , Registry(registry)
+ , StoreDuration(TDuration::MicroSeconds(query.GetLogDurationUs() * 11 / 10)) // +10% to try avoid truncation while reading multiple threads/traces
+ , ReadDuration(TDuration::MicroSeconds(query.GetLogDurationUs()))
+ , CyclicLog(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000)
+ , DurationLog(StoreDuration)
+ , CyclicDepot(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000)
+ , DurationDepot(StoreDuration)
+ , LastTrackId(0)
+ , LastSpanId(0)
+ , Attached(true)
+ , Query(query)
+ {
+ try {
+ for (size_t bi = 0; bi < query.BlocksSize(); bi++) {
+ const TBlock& block = query.GetBlocks(bi);
+ if (!block.HasProbeDesc()) {
+ ythrow yexception() << "block #" << bi + 1 << " has no probe description";
+ }
+ const TProbeDesc& pdesc = block.GetProbeDesc();
+ const TPredicate* pred = block.HasPredicate() ? &block.GetPredicate() : nullptr;
+ if (block.ActionSize() < 1) {
+ ythrow yexception() << "block #" << bi + 1 << " has no action";
+ }
+ const NProtoBuf::RepeatedPtrField<TAction>& actions = block.action();
+ if (pdesc.GetName() && pdesc.GetProvider()) {
+ TProbeRegistry::TProbesAccessor probes(Registry);
+ bool found = false;
+ for (auto& kv : probes) {
+ TProbe* probe = kv.first;
+ if (probe->Event.Name == pdesc.GetName() && probe->Event.GetProvider() == pdesc.GetProvider()) {
+ InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ ythrow yexception() << "block #" << bi + 1 << " has no matching probe with name '"
+ << pdesc.GetName() << "' provider '" << pdesc.GetProvider() << "'";
+ }
+ } else if (pdesc.GetGroup()) {
+ bool found = false;
+ TProbeRegistry::TProbesAccessor probes(Registry);
+ for (auto& kv : probes) {
+ TProbe* probe = kv.first;
+ for (const char* const* gi = probe->Event.Groups; *gi != nullptr; gi++) {
+ if (*gi == pdesc.GetGroup()) {
+ InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory);
+ found = true;
+ break;
+ }
+ }
+ }
+ if (!found) {
+ ythrow yexception() << "block #" << bi + 1
+ << " has no matching probes for group '" << pdesc.GetGroup() << "'";
+ }
+ } else {
+ ythrow yexception() << "block #" << bi + 1 << " has bad probe description: name '" << pdesc.GetName()
+ << "' provider '" << pdesc.GetProvider()
+ << "' group '" << pdesc.GetGroup() << "'";
+ }
+ }
+ } catch (...) {
+ Destroy();
+ throw;
+ }
+ }
+
+ void TSession::Destroy() {
+ Detach();
+ for (auto& probe : Probes) {
+ delete probe.second;
+ }
+ }
+
+ TSession::~TSession() {
+ Destroy();
+ }
+
+ void TSession::Detach() {
+ if (Attached) {
+ for (auto& p : Probes) {
+ TProbe* probe = p.first;
+ IExecutor* exec = p.second;
+ probe->Detach(exec);
+ }
+ Attached = false;
+ }
+ }
+
+ size_t TSession::GetEventsCount() const {
+ return CyclicLog.GetEventsCount() + DurationLog.GetEventsCount() + CyclicDepot.GetEventsCount() + DurationDepot.GetEventsCount();
+ }
+
+ size_t TSession::GetThreadsCount() const {
+ return CyclicLog.GetThreadsCount() + DurationLog.GetThreadsCount() + CyclicDepot.GetThreadsCount() + DurationDepot.GetThreadsCount();
+ }
+
+ class TReadToProtobuf {
+ private:
+ TMap<TThread::TId, TVector<TLogItem>> Items;
+
+ public:
+ void ToProtobuf(TLogPb& pb) const {
+ TSet<TProbe*> probes;
+ ui64 eventsCount = 0;
+ for (auto kv : Items) {
+ TThreadLogPb* tpb = pb.AddThreadLogs();
+ tpb->SetThreadId(kv.first);
+ for (TLogItem& item : kv.second) {
+ item.ToProtobuf(*tpb->AddLogItems());
+ probes.insert(item.Probe);
+ eventsCount++;
+ }
+ }
+ pb.SetEventsCount(eventsCount);
+ for (TProbe* probe : probes) {
+ probe->Event.ToProtobuf(*pb.AddEvents());
+ }
+ }
+
+ void Push(TThread::TId tid, const TLogItem& item) {
+ // Avoid any expansive operations in Push(), because it executes under lock and blocks program being traced
+ Items[tid].push_back(item);
+ }
+ };
+
+ void TSession::ToProtobuf(TLogPb& pb) const {
+ TReadToProtobuf reader;
+ ReadItems(reader);
+ reader.ToProtobuf(pb);
+ pb.MutableQuery()->CopyFrom(Query);
+ pb.SetCrtTime(TInstant::Now().GetValue());
+ }
+
+ TManager::TManager(TProbeRegistry& registry, bool allowDestructiveActions)
+ : Registry(registry)
+ , DestructiveActionsAllowed(allowDestructiveActions)
+ , SerializingExecutor(new TRunLogShuttleActionExecutor<TCyclicDepot>(0, {}, nullptr, nullptr, nullptr))
+ {
+ }
+
+ TManager::~TManager() {
+ for (auto& trace : Traces) {
+ delete trace.second;
+ }
+ }
+
+ bool TManager::HasTrace(const TString& id) const {
+ TGuard<TMutex> g(Mtx);
+ return Traces.contains(id);
+ }
+
+ const TSession* TManager::GetTrace(const TString& id) const {
+ TGuard<TMutex> g(Mtx);
+ TTraces::const_iterator it = Traces.find(id);
+ if (it == Traces.end()) {
+ ythrow yexception() << "trace id '" << id << "' is not used";
+ } else {
+ return it->second;
+ }
+ }
+
+ void TManager::New(const TString& id, const TQuery& query) {
+ TGuard<TMutex> g(Mtx);
+ if (Traces.find(id) == Traces.end()) {
+ TSession* trace = new TSession(++LastTraceIdx, Registry, query, GetDestructiveActionsAllowed(), CustomActionFactory);
+ Traces[id] = trace;
+ } else {
+ ythrow yexception() << "trace id '" << id << "' is already used";
+ }
+ }
+
+ void TManager::Delete(const TString& id) {
+ TGuard<TMutex> g(Mtx);
+ TTraces::iterator it = Traces.find(id);
+ if (it == Traces.end()) {
+ ythrow yexception() << "trace id '" << id << "' is not used";
+ } else {
+ delete it->second;
+ Traces.erase(it);
+ }
+ }
+
+ void TManager::Stop(const TString& id) {
+ TGuard<TMutex> g(Mtx);
+ TTraces::iterator it = Traces.find(id);
+ if (it == Traces.end()) {
+ ythrow yexception() << "trace id '" << id << "' is not used";
+ } else {
+ it->second->Detach();
+ }
+ }
+}