#include "yql_execution.h"
#include "yql_expr_optimize.h"
#include "yql_opt_proposed_by_data.h"

#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/yql_panic.h>

#include <util/string/builder.h>
#include <util/string/join.h>
#include <util/system/env.h>
#include <util/generic/queue.h>


namespace NYql {

namespace {

const bool RewriteSanityCheck = false;

class TExecutionTransformer : public TGraphTransformerBase {
public:
    struct TState : public TThrRefBase {
        TAdaptiveLock Lock;

        struct TItem : public TIntrusiveListItem<TItem> {
            TExprNode* Node = nullptr;
            IDataProvider* DataProvider = nullptr;
            NThreading::TFuture<void> Future;
        };

        using TQueueType = TIntrusiveListWithAutoDelete<TState::TItem, TDelete>;
        TQueueType Completed;
        TQueueType Inflight;
        NThreading::TPromise<void> Promise;
        bool HasResult = false;
    };

    using TStatePtr = TIntrusivePtr<TState>;

    TExecutionTransformer(TTypeAnnotationContext& types,
        TOperationProgressWriter writer,
        bool withFinalize)
        : Types(types)
        , Writer(writer)
        , WithFinalize(withFinalize)
        , DeterministicMode(GetEnv("YQL_DETERMINISTIC_MODE"))
    {
        Rewind();
    }

    TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
        if (FinalizingTransformer) {
            YQL_CLOG(INFO, CoreExecution) << "FinalizingTransformer, root #" << input->UniqueId();
            auto status = FinalizingTransformer->Transform(input, output, ctx);
            YQL_CLOG(INFO, CoreExecution) << "FinalizingTransformer done, output #" << output->UniqueId() << ", status: " << status;
            return status;
        }

        YQL_CLOG(INFO, CoreExecution) << "Begin, root #" << input->UniqueId();
        output = input;
        if (RewriteSanityCheck) {
            VisitExpr(input, [&](const TExprNode::TPtr& localInput) {
                if (NewNodes.cend() != NewNodes.find(localInput.Get())) {
                    Cerr << "found old node: #" << localInput->UniqueId() << "\n" << input->Dump();
                    YQL_ENSURE(false);
                }
                return true;
            });
        }

        auto status = CollectUnusedNodes(*input, ctx);
        YQL_CLOG(INFO, CoreExecution) << "Collect unused nodes for root #" << input->UniqueId() << ", status: " << status;
        if (status != TStatus::Ok) {
            return status;
        }

        status = status.Combine(ExecuteNode(input, output, ctx, 0));
        for (auto node: FreshPendingNodes) {
            if (TExprNode::EState::ExecutionPending == node->GetState()) {
                node->SetState(TExprNode::EState::ConstrComplete);
            }
        }
        FreshPendingNodes.clear();
        if (!ReplaceNewNodes(output, ctx)) {
            return TStatus::Error;
        }
        YQL_CLOG(INFO, CoreExecution) << "Finish, output #" << output->UniqueId() << ", status: " << status;

        if (status != TStatus::Ok || !WithFinalize) {
            return status;
        }

        YQL_CLOG(INFO, CoreExecution) << "Creating finalizing transformer, output #" << output->UniqueId();
        FinalizingTransformer = CreateCompositeFinalizingTransformer(Types);
        return FinalizingTransformer->Transform(input, output, ctx);
    }

    NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
        return FinalizingTransformer ?
            FinalizingTransformer->GetAsyncFuture(input) :
            State->Promise.GetFuture();
    }

    TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
        if (FinalizingTransformer) {
            return FinalizingTransformer->ApplyAsyncChanges(input, output, ctx);
        }

        output = input;

        TStatus combinedStatus = TStatus::Ok;

        TState::TQueueType completed;
        auto newPromise = NThreading::NewPromise();
        {
            TGuard<TAdaptiveLock> guard(State->Lock);
            completed.Swap(State->Completed);
            State->Promise.Swap(newPromise);
            State->HasResult = false;
        }

        for (auto& item : completed) {
            auto collectedIt = CollectingNodes.find(item.Node);
            if (collectedIt != CollectingNodes.end()) {
                YQL_CLOG(INFO, CoreExecution) << "Completed async cleanup for node #" << item.Node->UniqueId();
                TExprNode::TPtr callableOutput;
                auto status = item.DataProvider->GetTrackableNodeProcessor().GetCleanupTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx);
                combinedStatus = combinedStatus.Combine(status);
                CollectingNodes.erase(collectedIt);
                continue;
            }

            YQL_CLOG(INFO, CoreExecution) << "Completed async execution for node #" << item.Node->UniqueId();
            auto asyncIt = AsyncNodes.find(item.Node);
            YQL_ENSURE(asyncIt != AsyncNodes.end());
            TExprNode::TPtr callableOutput;
            auto status = item.DataProvider->GetCallableExecutionTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx);
            Y_ABORT_UNLESS(callableOutput);
            YQL_ENSURE(status != TStatus::Async);
            combinedStatus = combinedStatus.Combine(status);
            if (status.Level == TStatus::Error) {
                item.Node->SetState(TExprNode::EState::Error);
            } else if (status.Level == TStatus::Repeat) {
                if (callableOutput != item.Node) {
                    YQL_CLOG(INFO, CoreExecution) << "Rewrite node #" << item.Node->UniqueId() << " to #" << callableOutput->UniqueId()
                        << " in ApplyAsyncChanges()";
                    NewNodes[item.Node] = callableOutput;
                    combinedStatus = combinedStatus.Combine(TStatus(TStatus::Repeat, true));
                    FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput);
                }
            }
            if (callableOutput == item.Node) {
                YQL_CLOG(INFO, CoreExecution) << "State is " << item.Node->GetState()
                        << " after apply async changes for node #" << item.Node->UniqueId();
            }

            if (item.Node->GetState() == TExprNode::EState::ExecutionComplete ||
                item.Node->GetState() == TExprNode::EState::Error)
            {
                FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput);
            }

            AsyncNodes.erase(asyncIt);
        }

        if (!ReplaceNewNodes(output, ctx)) {
            return TStatus::Error;
        }

        if (!completed.Empty() && combinedStatus.Level == TStatus::Ok) {
            combinedStatus = TStatus::Repeat;
        }

        return combinedStatus;
    }

    bool ReplaceNewNodes(TExprNode::TPtr& output, TExprContext& ctx) {
        if (!NewNodes.empty()) {
            TOptimizeExprSettings settings(&Types);
            settings.VisitChanges = true;
            settings.VisitStarted = true;
            auto replaceStatus = OptimizeExpr(output, output, [&](const TExprNode::TPtr& input, TExprContext& ctx) -> TExprNode::TPtr {
                Y_UNUSED(ctx);
                const auto replace = NewNodes.find(input.Get());
                if (NewNodes.cend() != replace) {
                    return replace->second;
                }

                return input;
            }, ctx, settings);

            if (!RewriteSanityCheck) {
                NewNodes.clear();
            }

            if (replaceStatus.Level == TStatus::Error) {
                return false;
            }
        }

        if (RewriteSanityCheck) {
            VisitExpr(output, [&](const TExprNode::TPtr& localInput) {
                if (NewNodes.cend() != NewNodes.find(localInput.Get())) {
                    Cerr << "found old node: #" << localInput->UniqueId() << "\n" << output->Dump();
                    YQL_ENSURE(false);
                }
                return true;
            });
        }
        return true;
    }

    void Rewind() override {
        State = MakeIntrusive<TState>();
        State->Promise = NThreading::NewPromise();
        State->HasResult = false;
        NewNodes.clear();
        FinalizingTransformer.Reset();

        TrackableNodes.clear();
        CollectingNodes.clear();
        ProvidersCache.clear();
        AsyncNodes.clear();
    }

    TStatus ExecuteNode(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
        output = node;
        bool changed = false;
        const auto knownNode = NewNodes.find(node.Get());
        if (NewNodes.cend() != knownNode) {
            output = knownNode->second;
            changed = true;
        }

        switch (output->GetState()) {
        case TExprNode::EState::Initial:
        case TExprNode::EState::TypeInProgress:
        case TExprNode::EState::TypePending:
        case TExprNode::EState::TypeComplete:
        case TExprNode::EState::ConstrInProgress:
        case TExprNode::EState::ConstrPending:
            return TStatus(TStatus::Repeat, true);
        case TExprNode::EState::ExecutionInProgress:
            return TStatus::Async;
        case TExprNode::EState::ExecutionPending:
            return ExecuteChildren(output, output, ctx, depth + 1);
        case TExprNode::EState::ConstrComplete:
        case TExprNode::EState::ExecutionRequired:
            break;
        case TExprNode::EState::ExecutionComplete:
            YQL_ENSURE(output->HasResult());
            OnNodeExecutionComplete(output, ctx);
            return changed ? TStatus(TStatus::Repeat, true) : TStatus(TStatus::Ok);
        case TExprNode::EState::Error:
            return TStatus::Error;
        default:
            YQL_ENSURE(false, "Unknown state");
        }

        switch (output->Type()) {
        case TExprNode::Atom:
        case TExprNode::Argument:
        case TExprNode::Arguments:
        case TExprNode::Lambda:
            ctx.AddError(TIssue(ctx.GetPosition(output->Pos()), TStringBuilder() << "Failed to execute node with type: " << output->Type()));
            output->SetState(TExprNode::EState::Error);
            return TStatus::Error;

        case TExprNode::List:
        case TExprNode::Callable:
        {
            auto prevOutput = output;
            auto status = output->Type() == TExprNode::Callable
                ? ExecuteCallable(output, output, ctx, depth)
                : ExecuteList(output, ctx);
            if (status.Level == TStatus::Error) {
                output->SetState(TExprNode::EState::Error);
            } else if (status.Level == TStatus::Ok) {
                output->SetState(TExprNode::EState::ExecutionComplete);
                OnNodeExecutionComplete(output, ctx);
                YQL_ENSURE(output->HasResult());
            } else if (status.Level == TStatus::Repeat) {
                if (!status.HasRestart) {
                    output->SetState(TExprNode::EState::ExecutionPending);
                    status = ExecuteChildren(output, output, ctx, depth + 1);
                    if (TExprNode::EState::ExecutionPending == output->GetState()) {
                        FreshPendingNodes.push_back(output.Get());
                    }
                    if (status.Level != TStatus::Repeat) {
                        return status;
                    }
                }
                if (output != prevOutput) {
                    YQL_CLOG(INFO, CoreExecution) << "Rewrite node #" << node->UniqueId() << " to #" << output->UniqueId();
                    NewNodes[node.Get()] = output;
                }
                return TStatus(TStatus::Repeat, output != prevOutput);
            } else if (status.Level == TStatus::Async) {
                output->SetState(TExprNode::EState::ExecutionInProgress);
            }

            return status;
        }

        case TExprNode::World:
            output->SetState(TExprNode::EState::ExecutionComplete);
            return TStatus::Ok;

        default:
            YQL_ENSURE(false, "Unknown type");
        }
    }

    TStatus ExecuteChildren(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
        TStatus combinedStatus = TStatus::Ok;
        TExprNode::TListType newChildren;
        bool newNode = false;
        for (auto& child : node->Children()) {
            auto newChild = child;
            if (child->GetState() == TExprNode::EState::ExecutionRequired) {
                auto childStatus = ExecuteNode(child, newChild, ctx, depth);
                if (childStatus.Level == TStatus::Error)
                    return childStatus;

                combinedStatus = combinedStatus.Combine(childStatus);
            } else if (child->GetState() == TExprNode::EState::ExecutionInProgress) {
                combinedStatus = combinedStatus.Combine(TStatus::Async);
            } else if (child->GetState() == TExprNode::EState::ExecutionPending) {
                combinedStatus = combinedStatus.Combine(TStatus::Repeat);
            }
            newChildren.push_back(newChild);
            if (newChild != child) {
                newNode = true;
            }
        }

        if (combinedStatus.Level == TStatus::Ok) {
            Y_DEBUG_ABORT_UNLESS(!newNode);
            node->SetState(TExprNode::EState::ConstrComplete);
            return ExecuteNode(node, output, ctx, depth - 1);
        } else {
            if (combinedStatus.Level == TStatus::Error) {
                node->SetState(TExprNode::EState::Error);
            }
            if (newNode) {
                output = ctx.ChangeChildren(*node, std::move(newChildren));
            }

            return combinedStatus;
        }
    }

    TStatus ExecuteList(const TExprNode::TPtr& node, TExprContext& ctx) {
        IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
        for (ui32 i = 0; i < node->ChildrenSize(); ++i) {
            combinedStatus = combinedStatus.Combine(RequireChild(*node, i));
        }

        if (combinedStatus.Level != IGraphTransformer::TStatus::Ok) {
            return combinedStatus;
        }

        node->SetResult(ctx.NewWorld(node->Pos()));
        return TStatus::Ok;
    }

    IDataProvider* GetDataProvider(const TExprNode& node) const {
        IDataProvider* dataProvider = nullptr;
        for (const auto& x : Types.DataSources) {
            if (x->CanExecute(node)) {
                dataProvider = x.Get();
                break;
            }
        }

        if (!dataProvider) {
            for (const auto& x : Types.DataSinks) {
                if (x->CanExecute(node)) {
                    dataProvider = x.Get();
                }
            }
        }
        return dataProvider;
    }

    TStatus ExecuteCallable(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
        YQL_CLOG(TRACE, CoreExecution) << '{' << depth << "}, callable #"
                << node->UniqueId() << " <" << node->Content() << '>';
        if (node->Content() == CommitName) {
            auto requireStatus = RequireChild(*node, 0);
            if (requireStatus.Level != TStatus::Ok) {
                return requireStatus;
            }

            auto category = node->Child(1)->Child(0)->Content();
            auto datasink = Types.DataSinkMap.FindPtr(category);
            YQL_ENSURE(datasink);
            output = node;
            auto status = (*datasink)->GetCallableExecutionTransformer().Transform(node, output, ctx);
            if (status.Level == TStatus::Async) {
                Y_DEBUG_ABORT_UNLESS(output == node);
                StartNode(category, *node);
                AddCallable(node, (*datasink).Get(), ctx);
            } else {
                if (output->GetState() == TExprNode::EState::ExecutionComplete ||
                    output->GetState() == TExprNode::EState::Error)
                {
                    StartNode(category, *node);
                    FinishNode(category, *node, *output);
                }
            }

            return status;
        }

        if (node->Content() == SyncName) {
            return ExecuteList(node, ctx);
        }

        if (node->Content() == LeftName) {
            auto requireStatus = RequireChild(*node, 0);
            if (requireStatus.Level != TStatus::Ok) {
                return requireStatus;
            }

            node->SetResult(ctx.NewWorld(node->Pos()));
            return TStatus::Ok;
        }

        if (node->Content() == RightName) {
            auto requireStatus = RequireChild(*node, 0);
            if (requireStatus.Level != TStatus::Ok) {
                return requireStatus;
            }

            node->SetResult(ctx.NewWorld(node->Pos()));
            return TStatus::Ok;
        }

        IDataProvider* dataProvider = GetDataProvider(*node);
        if (!dataProvider) {
            ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute callable with name: " << node->Content()));
            return TStatus::Error;
        }

        output = node;
        TStatus status = dataProvider->GetCallableExecutionTransformer().Transform(node, output, ctx);
        if (status.Level == TStatus::Async) {
            Y_DEBUG_ABORT_UNLESS(output == node);
            StartNode(dataProvider->GetName(), *node);
            AddCallable(node, dataProvider, ctx);
        } else {
            if (output->GetState() == TExprNode::EState::ExecutionComplete ||
                output->GetState() == TExprNode::EState::Error)
            {
                StartNode(dataProvider->GetName(), *node);
                FinishNode(dataProvider->GetName(), *node, *output);
            }
        }

        return status;
    }

    void AddCallable(const TExprNode::TPtr& node, IDataProvider* dataProvider, TExprContext& ctx) {
        Y_UNUSED(ctx);
        YQL_CLOG(INFO, CoreExecution) << "Register async execution for node #" << node->UniqueId();
        auto future = dataProvider->GetCallableExecutionTransformer().GetAsyncFuture(*node);
        AsyncNodes[node.Get()] = node;
        SubscribeAsyncFuture(node, dataProvider, future);
    }

    static void ProcessFutureResultQueue(TStatePtr state) {
        NThreading::TPromise<void> promiseToSet;
        bool hasResult = false;

        TGuard<TAdaptiveLock> guard(state->Lock);
        while (!state->Inflight.Empty()) {
            auto* first = state->Inflight.Front();
            if (first->Future.HasValue()) {
                state->Inflight.PopFront();
                state->Completed.PushBack(first);
                hasResult = true;
            } else {
                break;
            }
        }
        guard.Release();

        if (hasResult && !state->HasResult) {
            state->HasResult = true;
            promiseToSet = state->Promise;
        }

        if (promiseToSet.Initialized()) {
            promiseToSet.SetValue();
        }
    }

    static void ProcessAsyncFutureResult(TStatePtr state, TAutoPtr<TState::TItem> item) {
        NThreading::TPromise<void> promiseToSet;
        {
            TGuard<TAdaptiveLock> guard(state->Lock);
            state->Completed.PushBack(item.Release());
            if (!state->HasResult) {
                state->HasResult = true;
                promiseToSet = state->Promise;
            }
        }

        if (promiseToSet.Initialized()) {
            promiseToSet.SetValue();
        }
    }

    void SubscribeAsyncFuture(const TExprNode::TPtr& node, IDataProvider* dataProvider, const NThreading::TFuture<void>& future)
    {
        auto state = State;
        if (DeterministicMode) {
            TAutoPtr<TState::TItem> item = new TState::TItem;
            item->Node = node.Get(); item->DataProvider = dataProvider; item->Future = future;

            TGuard<TAdaptiveLock> guard(state->Lock);
            state->Inflight.PushBack(item.Release());
        }

        if (DeterministicMode) {
            future.Subscribe([state](const NThreading::TFuture<void>& future) {
                HandleFutureException(future);
                ProcessFutureResultQueue(state);
            });
        } else {
            future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) {
                HandleFutureException(future);

                TAutoPtr<TState::TItem> item = new TState::TItem;
                item->Node = node; item->DataProvider = dataProvider;
                ProcessAsyncFutureResult(state, item.Release());
            });
        }
    }

    void StartNode(TStringBuf category, const TExprNode& node) {
        auto publicId = Types.TranslateOperationId(node.UniqueId());
        if (publicId) {
            auto x = Progresses.insert({ *publicId,
                TOperationProgress(TString(category), *publicId, TOperationProgress::EState::Started) });
            if (x.second) {
                Writer(x.first->second);
            }
        }
    }

    void FinishNode(TStringBuf category, const TExprNode& node, const TExprNode& newNode) {
        auto publicId = Types.TranslateOperationId(node.UniqueId());
        if (publicId) {
            if (newNode.UniqueId() != node.UniqueId()) {
                Types.NodeToOperationId[newNode.UniqueId()] = *publicId;
            }

            auto progIt = Progresses.find(*publicId);
            YQL_ENSURE(progIt != Progresses.end());

            auto newState = (node.GetState() == TExprNode::EState::ExecutionComplete)
                    ? TOperationProgress::EState::Finished
                    : TOperationProgress::EState::Failed;

            if (progIt->second.State != newState) {
                TString stage = progIt->second.Stage.first;
                progIt->second = TOperationProgress(TString(category), *publicId, newState, stage);
                Writer(progIt->second);
            }
        }
    }

    void OnNodeExecutionComplete(const TExprNode::TPtr& node, TExprContext& ctx) {
        auto nodeId = node->UniqueId();
        YQL_CLOG(INFO, CoreExecution) << "Node #" << nodeId << "<" << node->Content() << "> finished execution";

        auto dataProvider = GetDataProvider(*node);
        if (!dataProvider) {
            return;
        }

        TVector<ITrackableNodeProcessor::TExprNodeAndId> createdNodes;
        dataProvider->GetTrackableNodeProcessor().GetCreatedNodes(*node, createdNodes, ctx);

        TVector<TString> ids;
        for (const auto& c : createdNodes) {
            auto& info = TrackableNodes[c.Id];
            info.Provider = dataProvider;
            info.Node = c.Node;
            ids.push_back(c.Id);
        }
        YQL_CLOG(INFO, CoreExecution) << "Node #" << nodeId << "<" << node->Content() << "> created " << ids.size()
                                      << " trackable nodes: " << JoinSeq(", ", ids);
    }

    TStatus CollectUnusedNodes(const TExprNode& root, TExprContext& ctx) {
        if (TrackableNodes.empty()) {
            return TStatus::Ok;
        }

        YQL_CLOG(TRACE, CoreExecution) << "Collecting unused nodes on root #" << root.UniqueId();

        THashSet<ui64> visited;
        THashSet<TString> usedIds;
        VisitExpr(root, [&](const TExprNode& node) {
            if (node.GetState() == TExprNode::EState::ExecutionComplete) {
                return false;
            }

            auto nodeId = node.UniqueId();
            visited.insert(nodeId);

            TIntrusivePtr<IDataProvider> dataProvider;
            auto providerIt = ProvidersCache.find(nodeId);
            if (providerIt != ProvidersCache.end()) {
                YQL_ENSURE(providerIt->second);
                dataProvider = providerIt->second;
            } else {
                dataProvider = GetDataProvider(node);
                if (dataProvider) {
                    ProvidersCache[nodeId] = dataProvider;
                }
            }

            if (dataProvider) {
                TVector<TString> usedNodes;
                dataProvider->GetTrackableNodeProcessor().GetUsedNodes(node, usedNodes);
                usedIds.insert(usedNodes.begin(), usedNodes.end());
            }

            return true;
        });

        for (auto i = ProvidersCache.begin(); i != ProvidersCache.end();) {
            if (visited.count(i->first) == 0) {
                ProvidersCache.erase(i++);
            } else {
                ++i;
            }
        }

        THashMap<TIntrusivePtr<IDataProvider>, TExprNode::TListType> toCollect;
        for (auto i = TrackableNodes.begin(); i != TrackableNodes.end();) {
            TString id = i->first;
            TTrackableNodeInfo info = i->second;
            if (!usedIds.contains(id)) {
                YQL_ENSURE(info.Node);
                YQL_ENSURE(info.Provider);
                YQL_CLOG(INFO, CoreExecution) << "Marking node " << id << " for collection";
                toCollect[info.Provider].push_back(info.Node);
                TrackableNodes.erase(i++);
            } else {
                ++i;
            }
        }

        TStatus collectStatus = TStatus::Ok;
        for (auto& i : toCollect) {
            const auto& provider = i.first;
            YQL_ENSURE(!i.second.empty());
            auto pos = i.second.front()->Pos();
            TExprNode::TPtr collectNode = ctx.NewList(pos, std::move(i.second));
            TExprNode::TPtr output;
            TStatus status = provider->GetTrackableNodeProcessor().GetCleanupTransformer().Transform(collectNode, output, ctx);
            YQL_ENSURE(status != TStatus::Repeat);

            collectStatus = collectStatus.Combine(status);
            if (status == TStatus::Error) {
                break;
            }

            if (status == TStatus::Async) {
                CollectingNodes[collectNode.Get()] = collectNode;
                auto future = provider->GetTrackableNodeProcessor().GetCleanupTransformer().GetAsyncFuture(*collectNode);
                SubscribeAsyncFuture(collectNode, provider.Get(), future);
            }
        }

        return collectStatus;
    }

private:
    TTypeAnnotationContext& Types;
    TOperationProgressWriter Writer;
    const bool WithFinalize;
    TStatePtr State;
    TNodeOnNodeOwnedMap NewNodes;
    TAutoPtr<IGraphTransformer> FinalizingTransformer;
    THashMap<ui32, TOperationProgress> Progresses;

    struct TTrackableNodeInfo
    {
        TIntrusivePtr<IDataProvider> Provider;
        TExprNode::TPtr Node;
    };

    THashMap<TString, TTrackableNodeInfo> TrackableNodes;
    TNodeOnNodeOwnedMap CollectingNodes;
    THashMap<ui64, TIntrusivePtr<IDataProvider>> ProvidersCache;
    TExprNode::TListType FreshPendingNodes;

    bool DeterministicMode;
    TNodeOnNodeOwnedMap AsyncNodes;
};

IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited);

IGraphTransformer::TStatus ValidateList(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited) {
    IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
    for (ui32 i = 0; i < node->ChildrenSize(); ++i) {
        combinedStatus = combinedStatus.Combine(ValidateExecution(node->ChildPtr(i), ctx, types, visited));
    }

    return combinedStatus;
}

IGraphTransformer::TStatus ValidateCallable(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited) {
    using TStatus = IGraphTransformer::TStatus;

    if (node->Content() == CommitName) {
        auto datasink = types.DataSinkMap.FindPtr(node->Child(1)->Child(0)->Content());
        if (!datasink) {
            ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown datasink: " << node->Child(1)->Child(0)->Content()));
            return TStatus::Error;
        }

        return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
    }

    if (node->Content() == SyncName) {
        return ValidateList(node, ctx, types, visited);
    }

    if (node->Content() == LeftName) {
        return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
    }

    if (node->Content() == RightName) {
        return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
    }

    IDataProvider* dataProvider = nullptr;
    for (auto& x : types.DataSources) {
        if (x->CanExecute(*node)) {
            dataProvider = x.Get();
            break;
        }
    }

    if (!dataProvider) {
        for (auto& x : types.DataSinks) {
            if (x->CanExecute(*node)) {
                dataProvider = x.Get();
                break;
            }
        }
    }

    if (!dataProvider) {
        ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute callable with name: " << node->Content()
            << ", you possibly used cross provider/cluster operations or pulled not materialized result in refselect mode"));
        return TStatus::Error;
    }

    if (node->ChildrenSize() < 2) {
        ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Executable callable should have at least 2 children"));
        return TStatus::Error;
    }

    if (!dataProvider->ValidateExecution(*node, ctx)) {
        return TStatus::Error;
    }

    TExprNode::TListType childrenToCheck;
    dataProvider->GetRequiredChildren(*node, childrenToCheck);
    IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
    for (ui32 i = 0; i < childrenToCheck.size(); ++i) {
        combinedStatus = combinedStatus.Combine(ValidateExecution(childrenToCheck[i], ctx, types, visited));
    }

    return combinedStatus;
}

IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx,
    const TTypeAnnotationContext& types, TNodeSet& visited) {
    using TStatus = IGraphTransformer::TStatus;
    if (node->GetState() == TExprNode::EState::ExecutionComplete) {
        return TStatus::Ok;
    }

    if (!node->GetTypeAnn()) {
        ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Node has no type annotation"));
        return TStatus::Error;
    }

    TStatus status = TStatus::Ok;
    switch (node->Type()) {
    case TExprNode::Atom:
    case TExprNode::Argument:
    case TExprNode::Arguments:
    case TExprNode::Lambda:
        ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute node with type: " << node->Type()));
        return TStatus::Error;

    case TExprNode::List:
        return ValidateList(node, ctx, types, visited);

    case TExprNode::Callable:
        if (visited.cend() != visited.find(node.Get())) {
            return TStatus::Ok;
        }

        status = ValidateCallable(node, ctx, types, visited);
        if (status.Level == TStatus::Ok) {
            visited.insert(node.Get());
        }

        break;

    case TExprNode::World:
        break;

    default:
        YQL_ENSURE(false, "Unknown type");
    }

    return status;
}

}

TAutoPtr<IGraphTransformer> CreateExecutionTransformer(
    TTypeAnnotationContext& types,
    TOperationProgressWriter writer,
    bool withFinalize) {
    return new TExecutionTransformer(types, writer, withFinalize);
}

TAutoPtr<IGraphTransformer> CreateCheckExecutionTransformer(const TTypeAnnotationContext& types, bool checkWorld) {
    return CreateFunctorTransformer([&types, checkWorld](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx)
        -> IGraphTransformer::TStatus {
        output = input;
        if (checkWorld) {
            TNodeSet visited;
            auto status = ValidateExecution(input, ctx, types, visited);
            if (status.Level != IGraphTransformer::TStatus::Ok) {
                return status;
            }
        }

        TParentsMap parentsMap;
        THashSet<TExprNode*> overWinNodes;
        GatherParents(*input, parentsMap);
        bool hasErrors = false;
        THashSet<TIssue> added;
        auto funcCheckExecution = [&](const THashSet<TStringBuf>& notAllowList, bool collectCalcOverWindow, const TExprNode::TPtr& node) {
            if (node->IsCallable("ErrorType")) {
                hasErrors = true;
                const auto err = node->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TErrorExprType>()->GetError();
                if (added.insert(err).second) {
                    ctx.AddError(err);
                }
            } else if (node->IsCallable("TablePath") || node->IsCallable("TableRecord")) {
                auto issue = TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << node->Content() << " will be empty");
                SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_FREE_TABLE_PATH_RECORD, issue);
                if (!ctx.AddWarning(issue)) {
                    hasErrors = true;
                }
            } else if (node->IsCallable("UnsafeTimestampCast")) {
                auto issue = TIssue(ctx.GetPosition(node->Pos()), "Unsafe conversion integral value to Timestamp, consider using date types");
                SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_CAST_INTEGRAL_TO_TIMESTAMP_UNSAFE, issue);
                if (!ctx.AddWarning(issue)) {
                    hasErrors = true;
                }
            } else if (collectCalcOverWindow && node->IsCallable({"CalcOverWindow", "CalcOverSessionWindow", "CalcOverWindowGroup"})) {
                overWinNodes.emplace(node.Get());
                return false;
            } else if (node->IsCallable(notAllowList)) {
                hasErrors = true;
                const auto err = TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Can't execute " << node->Content());
                if (added.insert(err).second) {
                    ctx.AddError(err);
                }
            } else if (node->Type() != TExprNode::Lambda &&
                (node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream || node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow)) {
                auto parentsIt = parentsMap.find(node.Get());
                if (parentsIt != parentsMap.end()) {
                    ui32 usageCount = 0;
                    for (auto& x : parentsIt->second) {
                        if (x->IsCallable("DependsOn")) {
                            continue;
                        }

                        for (auto& y : x->Children()) {
                            if (y.Get() == node.Get()) {
                                ++usageCount;
                            }
                        }
                    }

                    if (usageCount > 1 && !node->GetConstraint<TEmptyConstraintNode>()) {
                        hasErrors = true;
                        const auto err = TIssue(ctx.GetPosition(node->Pos()), "Multiple stream clients");
                        if (added.insert(err).second) {
                            ctx.AddError(err);
                        }
                    }
                }
            }

            return true;
        };
        static const THashSet<TStringBuf> noExecutionList = {"InstanceOf", "Lag", "Lead", "RowNumber", "Rank", "DenseRank", "PercentRank", "CumeDist", "NTile"};
        static const THashSet<TStringBuf> noExecutionListForCalcOverWindow = {"InstanceOf"};
        VisitExpr(input, [funcCheckExecution](const TExprNode::TPtr& node) {
            bool collectCalcOverWindow = true;
            return funcCheckExecution(noExecutionList, collectCalcOverWindow, node);
        });
        for (auto overWin: overWinNodes) {
            VisitExpr(overWin, [funcCheckExecution](const TExprNode::TPtr& node) {
                bool collectCalcOverWindow = false;
                return funcCheckExecution(noExecutionListForCalcOverWindow, collectCalcOverWindow, node);
            });
        }

        return hasErrors ? IGraphTransformer::TStatus::Error : IGraphTransformer::TStatus::Ok;
    });
};

IGraphTransformer::TStatus RequireChild(const TExprNode& node, ui32 index) {
    switch (node.Child(index)->GetState()) {
    case TExprNode::EState::Error:
    case TExprNode::EState::ExecutionComplete:
        return IGraphTransformer::TStatus::Ok;
    case TExprNode::EState::ExecutionInProgress:
    case TExprNode::EState::ExecutionPending:
        return IGraphTransformer::TStatus::Repeat;
    default:
        break;
    }

    node.Child(index)->SetState(TExprNode::EState::ExecutionRequired);
    return IGraphTransformer::TStatus::Repeat;
}

}

template<>
void Out<NYql::TOperationProgress::EState>(class IOutputStream &o, NYql::TOperationProgress::EState x) {
#define YQL_OPERATION_PROGRESS_STATE_MAP_TO_STRING_IMPL(name, ...) \
    case NYql::TOperationProgress::EState::name: \
        o << #name; \
        return;

    switch (x) {
        YQL_OPERATION_PROGRESS_STATE_MAP(YQL_OPERATION_PROGRESS_STATE_MAP_TO_STRING_IMPL)
    default:
        o << static_cast<int>(x);
        return;
    }
}