#include "all.h"
#include "kill_action.h"
#include "log_shuttle.h"
#include "preprocessor.h"
#include "sleep_action.h"
#include "stderr_writer.h"
#include "google/protobuf/repeated_field.h"

#include <util/generic/map.h>
#include <util/random/random.h>

#include <functional>

namespace NLWTrace {
#ifndef LWTRACE_DISABLE

// Define static strings for name of each parameter type
#define FOREACH_PARAMTYPE_MACRO(n, t, v)       \
    const char* TParamType<t>::NameString = n; \
    /**/
    FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO)
    FOR_NIL_PARAMTYPE(FOREACH_PARAMTYPE_MACRO)
#undef FOREACH_PARAMTYPE_MACRO

#endif

    void TProbeRegistry::AddProbesList(TProbe** reg) {
        TGuard<TMutex> g(Mutex);
        if (reg == nullptr) {
            return;
        }
        for (TProbe** i = reg; *i != nullptr; i++) {
            AddProbeNoLock(new TStaticBox(*i));
        }
    }

    void TProbeRegistry::AddProbe(const TBoxPtr& box) {
        TGuard<TMutex> g(Mutex);
        AddProbeNoLock(box);
    }

    void TProbeRegistry::RemoveProbe(TProbe* probe) {
        TGuard<TMutex> g(Mutex);
        RemoveProbeNoLock(probe);
    }

    void TProbeRegistry::AddProbeNoLock(const TBoxPtr& box) {
        TProbe* probe = box->GetProbe();
        if (Probes.contains(probe)) {
            return; // silently skip probe double registration
        }
        TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name);
        Y_ABORT_UNLESS(Ids.count(key) == 0, "duplicate provider:probe pair %s:%s", key.first.data(), key.second.data());
        Probes.emplace(probe, box);
        Ids.insert(key);
    }

    void TProbeRegistry::RemoveProbeNoLock(TProbe* probe) {
        auto iter = Probes.find(probe);
        if (iter != Probes.end()) {
            TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name);
            Ids.erase(key);
            Probes.erase(iter);
        } else {
            // silently skip probe double unregistration
        }
    }

    TAtomic* GetVariablePtr(TSession::TTraceVariables& traceVariables, const TString& name) {
        TSession::TTraceVariables::iterator it = traceVariables.find(name);
        if (it == traceVariables.end()) {
            TAtomicBase zero = 0;
            traceVariables[name] = zero;
            return &traceVariables[name];
        }
        return &((*it).second);
    }

    typedef enum {
        OT_LITERAL = 0,
        OT_PARAMETER = 1,
        OT_VARIABLE = 2
    } EOperandType;

    template <class T, EOperandType>
    class TOperand;

    template <class T>
    class TOperand<T, OT_LITERAL> {
    private:
        T ImmediateValue;

    public:
        TOperand(TSession::TTraceVariables&, const TString&, const TString& value, size_t) {
            ImmediateValue = TParamConv<T>::FromString(value);
        }
        const T& Get(const TParams&) {
            return ImmediateValue;
        }
    };

    template <class T>
    class TOperand<T, OT_PARAMETER> {
    private:
        size_t Idx;

    public:
        TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t idx) {
            Idx = idx;
        }

        const T& Get(const TParams& params) {
            return params.Param[Idx].template Get<T>();
        }
    };

    template <class T>
    class TOperand<T, OT_VARIABLE> {
    private:
        TAtomic* Variable;

    public:
        TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) {
            Variable = GetVariablePtr(traceVariables, name);
        }

        const T Get(const TParams&) {
            return (T)AtomicGet(*Variable);
        }

        void Set(const T& value) {
            AtomicSet(*Variable, value);
        }

        void Inc() {
            AtomicIncrement(*Variable);
        }

        void Dec() {
            AtomicDecrement(*Variable);
        }

        void Add(const TAtomicBase value) {
            AtomicAdd(*Variable, value);
        }

        void Sub(const TAtomicBase value) {
            AtomicSub(*Variable, value);
        }
    };

    template <>
    class TOperand<TCheck, OT_VARIABLE> {
    private:
        TAtomic* Variable;

    public:
        TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) {
            Variable = GetVariablePtr(traceVariables, name);
        }

        const TCheck Get(const TParams&) {
            return TCheck(AtomicGet(*Variable));
        }

        void Set(const TCheck& value) {
            AtomicSet(*Variable, value.Value);
        }

        void Add(const TCheck& value) {
            AtomicAdd(*Variable, value.Value);
        }

        void Sub(const TCheck value) {
            AtomicSub(*Variable, value.Value);
        }

        void Inc() {
            AtomicIncrement(*Variable);
        }

        void Dec() {
            AtomicDecrement(*Variable);
        }
    };

    template <>
    class TOperand<TString, OT_VARIABLE> {
    private:
        TString Dummy;

    public:
        TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) {
        }

        const TString Get(const TParams&) {
            return Dummy;
        }

        void Set(const TString&) {
        }
    };

    template <>
    class TOperand<TSymbol, OT_VARIABLE> {
    private:
        TSymbol Dummy;

    public:
        TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) {
        }

        const TSymbol Get(const TParams&) {
            return Dummy;
        }

        void Set(const TSymbol&) {
        }
    };

    // IOperandGetter: hide concrete EOperandType, to save compilation time
    template <class T>
    struct IOperandGetter {
        virtual const T Get(const TParams& params) = 0;
        virtual ~IOperandGetter() {
        }
    };

    template <class T, EOperandType TParam>
    class TOperandGetter: public IOperandGetter<T> {
    private:
        TOperand<T, TParam> Op;

    public:
        TOperandGetter(const TOperand<T, TParam>& op)
            : Op(op)
        {
        }

        const T Get(const TParams& params) override {
            return Op.Get(params);
        }
    };

    template <class T>
    class TReceiver: public TOperand<T, OT_VARIABLE> {
    public:
        TReceiver(TSession::TTraceVariables& traceVariables, const TString& name)
            : TOperand<T, OT_VARIABLE>(traceVariables, name, {}, 0)
        {
        }
    };

    template <class TP, class TPredicate>
    static bool CmpFunc(TP a, TP b) {
        return TPredicate()(a, b);
    }

    template <class TP, class TFunc, EOperandType TLhs, EOperandType TRhs>
    class TOperatorExecutor: public IExecutor {
    private:
        bool InvertCompare;
        TOperand<TP, TLhs> Lhs;
        TOperand<TP, TRhs> Rhs;

        bool DoExecute(TOrbit&, const TParams& params) override {
            return TFunc()(Lhs.Get(params), Rhs.Get(params)) != InvertCompare;
        }

    public:
        TOperatorExecutor(const TOperand<TP, TLhs>& lhs, const TOperand<TP, TRhs>& rhs, bool invertCompare)
            : InvertCompare(invertCompare)
            , Lhs(lhs)
            , Rhs(rhs)
        {
        }
    };

    template <class TR, class TP>
    struct TAddEq {
        void operator()(TR& x, TP y) const {
            x.Add(y);
        }
    };
    template <class TR, class TP>
    struct TSubEq {
        void operator()(TR& x, TP y) const {
            x.Sub(y);
        }
    };
    template <class TR>
    struct TInc {
        void operator()(TR& x) const {
            x.Inc();
        }
    };
    template <class TR>
    struct TDec {
        void operator()(TR& x) const {
            x.Dec();
        }
    };

    template <class TP, class TFunc>
    class TUnaryInplaceStatementExecutor: public IExecutor {
    private:
        TFunc Func;
        TReceiver<TP> Receiver;

        bool DoExecute(TOrbit&, const TParams&) override {
            Func(Receiver);
            return true;
        }

    public:
        TUnaryInplaceStatementExecutor(TReceiver<TP>& receiver)
            : Receiver(receiver)
        {
        }
    };

    template <class TP, class TFunc, EOperandType TParam>
    class TBinaryInplaceStatementExecutor: public IExecutor {
    private:
        TFunc Func;
        TReceiver<TP> Receiver;
        TOperand<TP, TParam> Param;

        bool DoExecute(TOrbit&, const TParams& params) override {
            Func(Receiver, Param.Get(params));
            return true;
        }

    public:
        TBinaryInplaceStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TParam>& param)
            : Receiver(receiver)
            , Param(param)
        {
        }
    };

    template <class TP, class TFunc, EOperandType TFirstParam>
    class TBinaryStatementExecutor: public IExecutor {
    private:
        TFunc Func;
        TReceiver<TP> Receiver;
        TOperand<TP, TFirstParam> FirstParam;

        bool DoExecute(TOrbit&, const TParams& params) override {
            Receiver.Set(Func(Receiver.Get(params), FirstParam.Get(params)));
            return true;
        }

    public:
        TBinaryStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TFirstParam>& firstParam)
            : Receiver(receiver)
            , FirstParam(firstParam)
        {
        }
    };

    template <class TP, class TFunc>
    class TTernaryStatementExecutor: public IExecutor {
    private:
        TFunc Func;
        TReceiver<TP> Receiver;

        TAutoPtr<IOperandGetter<TP>> FirstParam;
        TAutoPtr<IOperandGetter<TP>> SecondParam;

        bool DoExecute(TOrbit&, const TParams& params) override {
            Receiver.Set(Func(FirstParam->Get(params), SecondParam->Get(params)));
            return true;
        }

    public:
        TTernaryStatementExecutor(const TReceiver<TP>& receiver,
                                  TAutoPtr<IOperandGetter<TP>> firstParam,
                                  TAutoPtr<IOperandGetter<TP>> secondParam)
            : Receiver(receiver)
            , FirstParam(firstParam)
            , SecondParam(secondParam)
        {
        }
    };

    template <class TLog>
    class TLogActionExecutor: public IExecutor {
    private:
        bool LogParams;
        bool LogTimestamp;
        intptr_t* MaxRecords;
        TAtomic Records;
        TProbe* Probe;
        TLog* Log;

        bool DoExecute(TOrbit&, const TParams& params) override {
            if (MaxRecords != nullptr) {
                while (true) {
                    intptr_t a = AtomicGet(Records);
                    if (a >= *MaxRecords) {
                        return true;
                    }
                    if (AtomicCas(&Records, a + 1, a)) {
                        Write(params);
                        return true;
                    }
                }
            } else {
                Write(params);
                return true;
            }
        }

        void Write(const TParams& params) {
            typename TLog::TAccessor la(*Log);
            if (typename TLog::TItem* item = la.Add()) {
                item->Probe = Probe;
                if (LogParams) {
                    if ((item->SavedParamsCount = Probe->Event.Signature.ParamCount) > 0) {
                        Probe->Event.Signature.CloneParams(item->Params, params);
                    }
                } else {
                    item->SavedParamsCount = 0;
                }
                if (LogTimestamp) {
                    item->Timestamp = TInstant::Now();
                }
                item->TimestampCycles = GetCycleCount();
            }
        }

    public:
        TLogActionExecutor(TProbe* probe, const TLogAction& action, TLog* log)
            : LogParams(!action.GetDoNotLogParams())
            , LogTimestamp(action.GetLogTimestamp())
            , MaxRecords(action.GetMaxRecords() ? new intptr_t(action.GetMaxRecords()) : nullptr)
            , Records(0)
            , Probe(probe)
            , Log(log)
        {
        }

        ~TLogActionExecutor() override {
            delete MaxRecords;
        }
    };

    class TSamplingExecutor: public IExecutor {
    private:
        double SampleRate;

    public:
        explicit TSamplingExecutor(double sampleRate)
            : SampleRate(sampleRate)
        {}

        bool DoExecute(TOrbit&, const TParams&) override {
            return RandomNumber<double>() < SampleRate;
        }
    };

    typedef struct {
        EOperandType Type;
        size_t ParamIdx;
    } TArgumentDescription;

    using TArgumentList = TVector<TArgumentDescription>;

    template <class T>
    void ParseArguments(const T& op, const TSignature& signature, const TString& exceptionPrefix, size_t expectedArgumentCount, TArgumentList& arguments) {
        arguments.clear();
        size_t firstParamIdx = size_t(-1);
        for (size_t argumentIdx = 0; argumentIdx < op.ArgumentSize(); ++argumentIdx) {
            const TArgument& arg = op.GetArgument(argumentIdx);
            TArgumentDescription operand;
            operand.ParamIdx = size_t(-1);
            if (arg.GetVariable()) {
                operand.Type = OT_VARIABLE;
            } else if (arg.GetValue()) {
                operand.Type = OT_LITERAL;
            } else if (arg.GetParam()) {
                operand.Type = OT_PARAMETER;
                operand.ParamIdx = signature.FindParamIndex(arg.GetParam());
                if (operand.ParamIdx == size_t(-1)) {
                    ythrow yexception() << exceptionPrefix
                                        << " argument #" << argumentIdx << " param '" << arg.GetParam()
                                        << "' doesn't exist";
                }
                if (firstParamIdx == size_t(-1)) {
                    firstParamIdx = operand.ParamIdx;
                } else {
                    if (strcmp(signature.ParamTypes[firstParamIdx], signature.ParamTypes[operand.ParamIdx]) != 0) {
                        ythrow yexception() << exceptionPrefix
                                            << " param types do not match";
                    }
                }
            } else {
                ythrow yexception() << exceptionPrefix
                                    << " argument #" << argumentIdx
                                    << " is empty";
            }
            arguments.push_back(operand);
        }
        if (arguments.size() != expectedArgumentCount) {
            ythrow yexception() << exceptionPrefix
                                << " incorrect number of arguments (" << arguments.size()
                                << " present, " << expectedArgumentCount << " expected)";
        }
    }

    template <class TArg1, class TArg2>
    struct TTraceSecondArg {
        // implementation of deprecated std::project2nd
        TArg1 operator()(const TArg1&, const TArg2& y) const {
            return y;
        }
    };

    void TSession::InsertExecutor(
        TTraceVariables& traceVariables, size_t bi, const TPredicate* pred,
        const NProtoBuf::RepeatedPtrField<TAction>& actions, TProbe* probe,
        const bool destructiveActionsAllowed,
        const TCustomActionFactory& customActionFactory) {
#ifndef LWTRACE_DISABLE
        THolder<IExecutor> exec;
        IExecutor* last = nullptr;
        TArgumentList arguments;
        if (pred) {
            double sampleRate = pred->GetSampleRate();
            if (sampleRate != 0.0) {
                if (!(0.0 < sampleRate && sampleRate <= 1.0)) {
                    ythrow yexception() << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " sampling operator"
                                        << " invalid sample rate " << sampleRate << ", expected [0;1]";
                }
                exec.Reset(new TSamplingExecutor(sampleRate));
                last = exec.Get();
            }

            for (size_t i = 0; i < pred->OperatorsSize(); i++) {
                const TOperator& op = pred->GetOperators(i);
                TString exceptionPrefix;
                TStringOutput exceptionPrefixOutput(exceptionPrefix);
                exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " operator #" << i + 1;
                ParseArguments<TOperator>(op, probe->Event.Signature, exceptionPrefix, 2, arguments);
                THolder<IExecutor> opExec;

                TArgumentDescription arg0 = arguments.at(0);
                TArgumentDescription arg1 = arguments.at(1);

                const char* tName0 = arg0.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg0.ParamIdx];
                const char* tName1 = arg1.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx];

                TString var0 = op.GetArgument(0).GetVariable();
                TString var1 = op.GetArgument(1).GetVariable();

                TString val0 = op.GetArgument(0).GetValue();
                TString val1 = op.GetArgument(1).GetValue();

#define FOREACH_OPERAND_TYPE_RT(n, t, v, fn, lt, rt)                                    \
    if (rt == arg1.Type) {                                                              \
        TOperand<t, rt> rhs(traceVariables, var1, val1, arg1.ParamIdx);                 \
        opExec.Reset(new TOperatorExecutor<t, fn<t>, lt, rt>(lhs, rhs, invertCompare)); \
        break;                                                                          \
    }

#define FOREACH_OPERAND_TYPE_LT(n, t, v, fn, lt)                        \
    if (lt == arg0.Type) {                                              \
        TOperand<t, lt> lhs(traceVariables, var0, val0, arg0.ParamIdx); \
        FOREACH_RIGHT_TYPE(FOREACH_OPERAND_TYPE_RT, n, t, v, fn, lt)    \
    }

#define FOREACH_PARAMTYPE_MACRO(n, t, v, fn)                                                                                  \
    if ((arg0.ParamIdx == size_t(-1) || strcmp(tName0, n) == 0) && (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0)) { \
        FOREACH_LEFT_TYPE(FOREACH_OPERAND_TYPE_LT, n, t, v, fn);                                                              \
    }

                bool invertCompare = EqualToOneOf(op.GetType(), OT_NE, OT_GE, OT_LE);

                switch (op.GetType()) {
                    case OT_EQ:
                    case OT_NE:
                        FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::equal_to);
                        break;
                    case OT_LT:
                    case OT_GE:
                        FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::less);
                        break;
                    case OT_GT:
                    case OT_LE:
                        FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::greater);
                        break;
                    default:
                        ythrow yexception() << exceptionPrefix
                                            << " has not supported operator type #" << int(op.GetType());
                }

#undef FOREACH_OPERAND_TYPE_RT
#undef FOREACH_OPERAND_TYPE_LT
#undef FOREACH_PARAMTYPE_MACRO

                if (!opExec) {
                    ythrow yexception() << exceptionPrefix
                                        << " has not supported left param #" << arg0.ParamIdx + 1 << " type '"
                                        << (arg0.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg0.ParamIdx] : "?")
                                        << "', or right param #" << arg0.ParamIdx + 1 << " type '"
                                        << (arg1.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg1.ParamIdx] : "?")
                                        << "'";
                }

                if (!exec) {
                    exec.Reset(opExec.Release());
                    last = exec.Get();
                } else {
                    last->SetNext(opExec.Release());
                    last = last->GetNext();
                }
            }
        }

        for (int i = 0; i < actions.size(); ++i) {
            const TAction& action = actions.Get(i);
            THolder<IExecutor> actExec;
            if (action.HasPrintToStderrAction()) {
                actExec.Reset(new TStderrActionExecutor(probe));
            } else if (action.HasLogAction()) {
                if (Query.GetLogDurationUs()) {
                    actExec.Reset(new TLogActionExecutor<TDurationLog>(probe, action.GetLogAction(), &DurationLog));
                } else {
                    actExec.Reset(new TLogActionExecutor<TCyclicLog>(probe, action.GetLogAction(), &CyclicLog));
                }
            } else if (action.HasRunLogShuttleAction()) {
                if (Query.GetLogDurationUs()) {
                    actExec.Reset(new TRunLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetRunLogShuttleAction(), &DurationDepot, &LastTrackId, &LastSpanId));
                } else {
                    actExec.Reset(new TRunLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetRunLogShuttleAction(), &CyclicDepot, &LastTrackId, &LastSpanId));
                }
            } else if (action.HasEditLogShuttleAction()) {
                if (Query.GetLogDurationUs()) {
                    actExec.Reset(new TEditLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetEditLogShuttleAction()));
                } else {
                    actExec.Reset(new TEditLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetEditLogShuttleAction()));
                }
            } else if (action.HasDropLogShuttleAction()) {
                if (Query.GetLogDurationUs()) {
                    actExec.Reset(new TDropLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetDropLogShuttleAction()));
                } else {
                    actExec.Reset(new TDropLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetDropLogShuttleAction()));
                }
            } else if (action.HasCustomAction()) {
                THolder<TCustomActionExecutor> customExec(customActionFactory.Create(probe, action.GetCustomAction(), this));
                if (customExec) {
                    if (!customExec->IsDestructive() || destructiveActionsAllowed) {
                        actExec.Reset(customExec.Release());
                    } else {
                        ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
                                            << " contains destructive CustomAction, but destructive actions are disabled."
                                            << " Please, consider using --unsafe-lwtrace command line parameter.";
                    }
                } else {
                    ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
                                        << " contains unregistered CustomAction '" << action.GetCustomAction().GetName() << "'";
                }
            } else if (action.HasKillAction()) {
                if (destructiveActionsAllowed) {
                    actExec.Reset(new NPrivate::TKillActionExecutor(probe));
                } else {
                    ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
                                        << " contains destructive KillAction, but destructive actions are disabled."
                                        << " Please, consider using --unsafe-lwtrace command line parameter.";
                }
            } else if (action.HasSleepAction()) {
                if (destructiveActionsAllowed) {
                    const TSleepAction& sleepAction = action.GetSleepAction();
                    if (sleepAction.GetNanoSeconds()) {
                        ui64 nanoSeconds = sleepAction.GetNanoSeconds();
                        actExec.Reset(new NPrivate::TSleepActionExecutor(probe, nanoSeconds));
                    } else {
                        ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
                                            << " SleepAction missing parameter 'NanoSeconds'";
                    }
                } else {
                    ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
                                        << " contains destructive SleepAction, but destructive actions are disabled."
                                        << " Please, consider using --unsafe-lwtrace command line parameter.";
                }
            } else if (action.HasStatementAction()) {
                const TStatementAction& statement = action.GetStatementAction();
                TString exceptionPrefix;
                TStringOutput exceptionPrefixOutput(exceptionPrefix);
                exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " action #" << i + 1;
                size_t expectedArgumentCount = 3;
                if (statement.GetType() == ST_MOV || statement.GetType() == ST_ADD_EQ || statement.GetType() == ST_SUB_EQ) {
                    expectedArgumentCount = 2;
                } else if (statement.GetType() == ST_INC || statement.GetType() == ST_DEC) {
                    expectedArgumentCount = 1;
                }
                ParseArguments<TStatementAction>(statement, probe->Event.Signature, exceptionPrefix, expectedArgumentCount, arguments);

                TArgumentDescription arg0 = (expectedArgumentCount <= 0) ? TArgumentDescription() : arguments.at(0);
                TArgumentDescription arg1 = (expectedArgumentCount <= 1) ? TArgumentDescription() : arguments.at(1);
                TArgumentDescription arg2 = (expectedArgumentCount <= 2) ? TArgumentDescription() : arguments.at(2);

                TString var0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetVariable();
                TString var1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetVariable();
                TString var2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetVariable();

                TString val0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetValue();
                TString val1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetValue();
                TString val2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetValue();

                const char* tName1 = (expectedArgumentCount <= 1 || arg1.ParamIdx == size_t(-1))
                    ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx];
                const char* tName2 = (expectedArgumentCount <= 2 || arg2.ParamIdx == size_t(-1))
                    ? nullptr : probe->Event.Signature.ParamTypes[arg2.ParamIdx];

                if (arg0.Type == OT_VARIABLE) {
                    switch (statement.GetType()) {
#define PARSE_UNARY_INPLACE_STATEMENT_MACRO(n, t, v, fn)                   \
    {                                                                      \
        typedef TUnaryInplaceStatementExecutor<t, fn<TReceiver<t>>> TExec; \
        TReceiver<t> receiver(traceVariables, var0);                       \
        actExec.Reset(new TExec(receiver));                                \
        break;                                                             \
    }

#define PARSE_BINARY_INPLACE_STATEMENT_MACRO2(n, t, v, fn, ft)                     \
    if (arg1.Type == ft) {                                                         \
        typedef TBinaryInplaceStatementExecutor<t, fn<TReceiver<t>, t>, ft> TExec; \
        TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx);     \
        actExec.Reset(new TExec(receiver, firstParam));                            \
        break;                                                                     \
    }

#define PARSE_BINARY_INPLACE_STATEMENT_MACRO(n, t, v, fn)                       \
    if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) {                \
        TReceiver<t> receiver(traceVariables, var0);                            \
        FOREACH_RIGHT_TYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO2, n, t, v, fn); \
    }

#define PARSE_BINARY_STATEMENT_MACRO2(n, t, v, fn, ft)                         \
    if (arg1.Type == ft) {                                                     \
        typedef TBinaryStatementExecutor<t, fn<t, t>, ft> TExec;               \
        TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \
        actExec.Reset(new TExec(receiver, firstParam));                        \
        break;                                                                 \
    }

#define PARSE_BINARY_STATEMENT_MACRO(n, t, v, fn)                       \
    if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) {        \
        TReceiver<t> receiver(traceVariables, var0);                    \
        FOREACH_RIGHT_TYPE(PARSE_BINARY_STATEMENT_MACRO2, n, t, v, fn); \
    }

#define CREATE_OPERAND_GETTER_N(N, type, arg_type)                                                                                       \
    if (arg##N.Type == arg_type) {                                                                                                       \
        operand##N.Reset(new TOperandGetter<type, arg_type>(TOperand<type, arg_type>(traceVariables, var##N, val##N, arg##N.ParamIdx))); \
    }

#define TERNARY_ON_TYPE(n, t, v, fn)                                                                                          \
    if ((arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) && (arg2.ParamIdx == size_t(-1) || strcmp(tName2, n) == 0)) { \
        TAutoPtr<IOperandGetter<t>> operand1, operand2;                                                                       \
        FOREACH_LEFT_TYPE(CREATE_OPERAND_GETTER_N, 1, t);                                                                     \
        FOREACH_RIGHT_TYPE(CREATE_OPERAND_GETTER_N, 2, t);                                                                    \
        if (operand1 && operand2) {                                                                                           \
            actExec.Reset(new TTernaryStatementExecutor<t, fn<t>>(                                                            \
                TReceiver<t>(traceVariables, var0),                                                                           \
                operand1,                                                                                                     \
                operand2));                                                                                                   \
        }                                                                                                                     \
        break;                                                                                                                \
    }

#define IMPLEMENT_TERNARY_STATEMENT(fn) FOR_MATH_PARAMTYPE(TERNARY_ON_TYPE, fn)

                        case ST_INC:
                            FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TInc);
                            break;
                        case ST_DEC:
                            FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TDec);
                            break;
                        case ST_MOV:
                            FOR_MATH_PARAMTYPE(PARSE_BINARY_STATEMENT_MACRO, TTraceSecondArg);
                            break;
                        case ST_ADD_EQ:
                            FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TAddEq);
                            break;
                        case ST_SUB_EQ:
                            FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TSubEq);
                            break;
                        case ST_ADD:
                            IMPLEMENT_TERNARY_STATEMENT(std::plus)
                            break;
                        case ST_SUB:
                            IMPLEMENT_TERNARY_STATEMENT(std::minus)
                            break;
                        case ST_MUL:
                            IMPLEMENT_TERNARY_STATEMENT(std::multiplies)
                            break;
                        case ST_DIV:
                            IMPLEMENT_TERNARY_STATEMENT(std::divides)
                            break;
                        case ST_MOD:
                            IMPLEMENT_TERNARY_STATEMENT(std::modulus)
                            break;
                        default:
                            ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
                                                << " has not supported statement type #" << int(statement.GetType());
                    }
                }
                if (!actExec) {
                    ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
                                        << " can't create action";
                }
#undef CREATE_OPERAND_GETTER_N
#undef TERNARY_ON_TYPE
#undef IMPLEMENT_TERNARY_STATEMENT
#undef PARSE_TERNARY_STATEMENT_MACRO
#undef PARSE_BINARY_STATEMENT_MACRO
#undef PARSE_BINARY_INPLACE_STATEMENT_MACRO
#undef PARSE_UNARY_INPLACE_STATEMENT_MACRO
            } else {
                ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
                                    << " has not supported action '" << action.ShortDebugString() << "'";
            }
            if (!exec) {
                exec.Reset(actExec.Release());
                last = exec.Get();
            } else {
                last->SetNext(actExec.Release());
                last = last->GetNext();
            }
        }

        if (!probe->Attach(exec.Get())) {
            ythrow yexception() << "block #" << bi + 1
                                << " cannot be attached to probe '" << probe->Event.Name << "': no free slots";
        }
        Probes.push_back(std::make_pair(probe, exec.Release()));
#else
        Y_UNUSED(bi);
        Y_UNUSED(pred);
        Y_UNUSED(actions);
        Y_UNUSED(probe);
        Y_UNUSED(destructiveActionsAllowed);
        Y_UNUSED(traceVariables);
        Y_UNUSED(customActionFactory);
#endif
    }

    TSession::TSession(ui64 traceIdx,
                   TProbeRegistry& registry,
                   const TQuery& query,
                   const bool destructiveActionsAllowed,
                   const TCustomActionFactory& customActionFactory)
        : StartTime(TInstant::Now())
        , TraceIdx(traceIdx)
        , Registry(registry)
        , StoreDuration(TDuration::MicroSeconds(query.GetLogDurationUs() * 11 / 10)) // +10% to try avoid truncation while reading multiple threads/traces
        , ReadDuration(TDuration::MicroSeconds(query.GetLogDurationUs()))
        , CyclicLog(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000)
        , DurationLog(StoreDuration)
        , CyclicDepot(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000)
        , DurationDepot(StoreDuration)
        , LastTrackId(0)
        , LastSpanId(0)
        , Attached(true)
        , Query(query)
    {
        try {
            for (size_t bi = 0; bi < query.BlocksSize(); bi++) {
                const TBlock& block = query.GetBlocks(bi);
                if (!block.HasProbeDesc()) {
                    ythrow yexception() << "block #" << bi + 1 << " has no probe description";
                }
                const TProbeDesc& pdesc = block.GetProbeDesc();
                const TPredicate* pred = block.HasPredicate() ? &block.GetPredicate() : nullptr;
                if (block.ActionSize() < 1) {
                    ythrow yexception() << "block #" << bi + 1 << " has no action";
                }
                const NProtoBuf::RepeatedPtrField<TAction>& actions = block.action();
                if (pdesc.GetName() && pdesc.GetProvider()) {
                    TProbeRegistry::TProbesAccessor probes(Registry);
                    bool found = false;
                    for (auto& kv : probes) {
                        TProbe* probe = kv.first;
                        if (probe->Event.Name == pdesc.GetName() && probe->Event.GetProvider() == pdesc.GetProvider()) {
                            InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory);
                            found = true;
                            break;
                        }
                    }
                    if (!found) {
                        ythrow yexception() << "block #" << bi + 1 << " has no matching probe with name '"
                                            << pdesc.GetName() << "' provider '" << pdesc.GetProvider() << "'";
                    }
                } else if (pdesc.GetGroup()) {
                    bool found = false;
                    TProbeRegistry::TProbesAccessor probes(Registry);
                    for (auto& kv : probes) {
                        TProbe* probe = kv.first;
                        for (const char* const* gi = probe->Event.Groups; *gi != nullptr; gi++) {
                            if (*gi == pdesc.GetGroup()) {
                                InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory);
                                found = true;
                                break;
                            }
                        }
                    }
                    if (!found) {
                        ythrow yexception() << "block #" << bi + 1
                                            << " has no matching probes for group '" << pdesc.GetGroup() << "'";
                    }
                } else {
                    ythrow yexception() << "block #" << bi + 1 << " has bad probe description: name '" << pdesc.GetName()
                                        << "' provider '" << pdesc.GetProvider()
                                        << "' group '" << pdesc.GetGroup() << "'";
                }
            }
        } catch (...) {
            Destroy();
            throw;
        }
    }

    void TSession::Destroy() {
        Detach();
        for (auto& probe : Probes) {
            delete probe.second;
        }
    }

    TSession::~TSession() {
        Destroy();
    }

    void TSession::Detach() {
        if (Attached) {
            for (auto& p : Probes) {
                TProbe* probe = p.first;
                IExecutor* exec = p.second;
                probe->Detach(exec);
            }
            Attached = false;
        }
    }

    size_t TSession::GetEventsCount() const {
        return CyclicLog.GetEventsCount() + DurationLog.GetEventsCount() + CyclicDepot.GetEventsCount() + DurationDepot.GetEventsCount();
    }

    size_t TSession::GetThreadsCount() const {
        return CyclicLog.GetThreadsCount() + DurationLog.GetThreadsCount() + CyclicDepot.GetThreadsCount() + DurationDepot.GetThreadsCount();
    }

    class TReadToProtobuf {
    private:
        TMap<TThread::TId, TVector<TLogItem>> Items;

    public:
        void ToProtobuf(TLogPb& pb) const {
            TSet<TProbe*> probes;
            ui64 eventsCount = 0;
            for (auto kv : Items) {
                TThreadLogPb* tpb = pb.AddThreadLogs();
                tpb->SetThreadId(kv.first);
                for (TLogItem& item : kv.second) {
                    item.ToProtobuf(*tpb->AddLogItems());
                    probes.insert(item.Probe);
                    eventsCount++;
                }
            }
            pb.SetEventsCount(eventsCount);
            for (TProbe* probe : probes) {
                probe->Event.ToProtobuf(*pb.AddEvents());
            }
        }

        void Push(TThread::TId tid, const TLogItem& item) {
            // Avoid any expansive operations in Push(), because it executes under lock and blocks program being traced
            Items[tid].push_back(item);
        }
    };

    void TSession::ToProtobuf(TLogPb& pb) const {
        TReadToProtobuf reader;
        ReadItems(reader);
        reader.ToProtobuf(pb);
        pb.MutableQuery()->CopyFrom(Query);
        pb.SetCrtTime(TInstant::Now().GetValue());
    }

    TManager::TManager(TProbeRegistry& registry, bool allowDestructiveActions)
        : Registry(registry)
        , DestructiveActionsAllowed(allowDestructiveActions)
        , SerializingExecutor(new TRunLogShuttleActionExecutor<TCyclicDepot>(0, {}, nullptr, nullptr, nullptr))
    {
    }

    TManager::~TManager() {
        for (auto& trace : Traces) {
            delete trace.second;
        }
    }

    bool TManager::HasTrace(const TString& id) const {
        TGuard<TMutex> g(Mtx);
        return Traces.contains(id);
    }

    const TSession* TManager::GetTrace(const TString& id) const {
        TGuard<TMutex> g(Mtx);
        TTraces::const_iterator it = Traces.find(id);
        if (it == Traces.end()) {
            ythrow yexception() << "trace id '" << id << "' is not used";
        } else {
            return it->second;
        }
    }

    void TManager::New(const TString& id, const TQuery& query) {
        TGuard<TMutex> g(Mtx);
        if (Traces.find(id) == Traces.end()) {
            TSession* trace = new TSession(++LastTraceIdx, Registry, query, GetDestructiveActionsAllowed(), CustomActionFactory);
            Traces[id] = trace;
        } else {
            ythrow yexception() << "trace id '" << id << "' is already used";
        }
    }

    void TManager::Delete(const TString& id) {
        TGuard<TMutex> g(Mtx);
        TTraces::iterator it = Traces.find(id);
        if (it == Traces.end()) {
            ythrow yexception() << "trace id '" << id << "' is not used";
        } else {
            delete it->second;
            Traces.erase(it);
        }
    }

    void TManager::Stop(const TString& id) {
        TGuard<TMutex> g(Mtx);
        TTraces::iterator it = Traces.find(id);
        if (it == Traces.end()) {
            ythrow yexception() << "trace id '" << id << "' is not used";
        } else {
            it->second->Detach();
        }
    }
}