aboutsummaryrefslogblamecommitdiffstats
path: root/yql/essentials/core/yql_execution.h
blob: e1714d5eda33b6ce95db3065f0428e0b6237cf8b (plain) (tree)































































































































































                                                                                                                                                     
#pragma once

#include "yql_data_provider.h"
#include "yql_type_annotation.h"
#include <yql/essentials/ast/yql_gc_nodes.h>
#include <util/system/mutex.h>

#ifndef YQL_OPERATION_STATISTICS_CUSTOM_FIELDS
#define YQL_OPERATION_STATISTICS_CUSTOM_FIELDS
#endif

namespace NYql {
    struct TOperationProgress {
#define YQL_OPERATION_PROGRESS_STATE_MAP(xx) \
    xx(Started, 0)                           \
    xx(InProgress, 1)                        \
    xx(Finished, 2)                          \
    xx(Failed, 3)                            \
    xx(Aborted, 4)

        enum class EState {
            YQL_OPERATION_PROGRESS_STATE_MAP(ENUM_VALUE_GEN)
        };

#define YQL_OPERATION_BLOCK_STATUS_MAP(xx) \
    xx(None, 0)                            \
    xx(Partial, 1)                         \
    xx(Full, 2)

        enum class EOpBlockStatus {
            YQL_OPERATION_BLOCK_STATUS_MAP(ENUM_VALUE_GEN)
        };

        TString Category;
        ui32 Id;
        EState State;
        TMaybe<EOpBlockStatus> BlockStatus;

        using TStage = std::pair<TString, TInstant>;
        TStage Stage;

        TString RemoteId;
        THashMap<TString, TString> RemoteData;

        struct TCounters {
            ui64 Completed = 0ULL;
            ui64 Running = 0ULL;
            ui64 Total = 0ULL;
            ui64 Aborted = 0ULL;
            ui64 Failed = 0ULL;
            ui64 Lost = 0ULL;
            ui64 Pending = 0ULL;
            THashMap<TString, i64> Custom = {};
            bool operator==(const TCounters& rhs) const noexcept {
                return Completed == rhs.Completed &&
                       Running == rhs.Running &&
                       Total == rhs.Total &&
                       Aborted == rhs.Aborted &&
                       Failed == rhs.Failed &&
                       Lost == rhs.Lost &&
                       Pending == rhs.Pending &&
                       Custom == rhs.Custom;
            }

            bool operator!=(const TCounters& rhs) const noexcept {
                return !operator==(rhs);
            }
        };

        TMaybe<TCounters> Counters;

        TOperationProgress(const TString& category, ui32 id,
            EState state, const TString& stage = "")
            : Category(category)
            , Id(id)
            , State(state)
            , Stage(stage, TInstant::Now())
        {
        }
    };

    struct TOperationStatistics {
        struct TEntry {
            TString Name;

            TMaybe<i64> Sum;
            TMaybe<i64> Max;
            TMaybe<i64> Min;
            TMaybe<i64> Avg;
            TMaybe<i64> Count;
            TMaybe<TString> Value;

            TEntry(TString name, TMaybe<i64> sum, TMaybe<i64> max, TMaybe<i64> min, TMaybe<i64> avg, TMaybe<i64> count)
                : Name(std::move(name))
                , Sum(std::move(sum))
                , Max(std::move(max))
                , Min(std::move(min))
                , Avg(std::move(avg))
                , Count(std::move(count))
            {
            }

            TEntry(TString name, TString value)
                : Name(std::move(name))
                , Value(std::move(value))
            {
            }
        };

        TVector<TEntry> Entries;
    };

    using TStatWriter = std::function<void(ui32, const TVector<TOperationStatistics::TEntry>&)>;
    using TOperationProgressWriter = std::function<void(const TOperationProgress&)>;

    inline TStatWriter ThreadSafeStatWriter(TStatWriter base) {
        struct TState : public TThrRefBase {
            TStatWriter Base;
            TMutex Mutex;
        };

        auto state = MakeIntrusive<TState>();
        state->Base = base;
        return [state](ui32 id, const TVector<TOperationStatistics::TEntry>& stat) {
            with_lock(state->Mutex) {
                state->Base(id, stat);
            }
        };
    }

    inline void NullProgressWriter(const TOperationProgress& progress) {
        Y_UNUSED(progress);
    }

    inline TOperationProgressWriter ChainProgressWriters(TOperationProgressWriter left, TOperationProgressWriter right) {
        return [=](const TOperationProgress& progress) {
            left(progress);
            right(progress);
        };
    }

    inline TOperationProgressWriter ThreadSafeProgressWriter(TOperationProgressWriter base) {
        struct TState : public TThrRefBase {
            TOperationProgressWriter Base;
            TMutex Mutex;
        };

        auto state = MakeIntrusive<TState>();
        state->Base = base;
        return [state](const TOperationProgress& progress) {
            with_lock(state->Mutex) {
                state->Base(progress);
            }
        };
    }

    TAutoPtr<IGraphTransformer> CreateCheckExecutionTransformer(const TTypeAnnotationContext& types, bool checkWorld = true);
    TAutoPtr<IGraphTransformer> CreateExecutionTransformer(TTypeAnnotationContext& types, TOperationProgressWriter writer, bool withFinalize = true);

    IGraphTransformer::TStatus RequireChild(const TExprNode& node, ui32 index);
}