diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-07 12:29:36 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2024-11-07 13:49:47 +0300 |
commit | d4c258e9431675bab6745c8638df6e3dfd4dca6b (patch) | |
tree | b5efcfa11351152a4c872fccaea35749141c0b11 /yql/essentials/core/yql_execution.cpp | |
parent | 13a4f274caef5cfdaf0263b24e4d6bdd5521472b (diff) | |
download | ydb-d4c258e9431675bab6745c8638df6e3dfd4dca6b.tar.gz |
Moved other yql/essentials libs YQL-19206
init
commit_hash:7d4c435602078407bbf20dd3c32f9c90d2bbcbc0
Diffstat (limited to 'yql/essentials/core/yql_execution.cpp')
-rw-r--r-- | yql/essentials/core/yql_execution.cpp | 963 |
1 files changed, 963 insertions, 0 deletions
diff --git a/yql/essentials/core/yql_execution.cpp b/yql/essentials/core/yql_execution.cpp new file mode 100644 index 0000000000..2fe21e1535 --- /dev/null +++ b/yql/essentials/core/yql_execution.cpp @@ -0,0 +1,963 @@ +#include "yql_execution.h" +#include "yql_expr_optimize.h" +#include "yql_opt_proposed_by_data.h" + +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/yql_panic.h> + +#include <util/string/builder.h> +#include <util/string/join.h> +#include <util/system/env.h> +#include <util/generic/queue.h> + + +namespace NYql { + +namespace { + +const bool RewriteSanityCheck = false; + +class TExecutionTransformer : public TGraphTransformerBase { +public: + struct TState : public TThrRefBase { + TAdaptiveLock Lock; + + struct TItem : public TIntrusiveListItem<TItem> { + TExprNode* Node = nullptr; + IDataProvider* DataProvider = nullptr; + NThreading::TFuture<void> Future; + }; + + using TQueueType = TIntrusiveListWithAutoDelete<TState::TItem, TDelete>; + TQueueType Completed; + TQueueType Inflight; + NThreading::TPromise<void> Promise; + bool HasResult = false; + }; + + using TStatePtr = TIntrusivePtr<TState>; + + TExecutionTransformer(TTypeAnnotationContext& types, + TOperationProgressWriter writer, + bool withFinalize) + : Types(types) + , Writer(writer) + , WithFinalize(withFinalize) + , DeterministicMode(GetEnv("YQL_DETERMINISTIC_MODE")) + { + Rewind(); + } + + TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { + if (FinalizingTransformer) { + YQL_CLOG(INFO, CoreExecution) << "FinalizingTransformer, root #" << input->UniqueId(); + auto status = FinalizingTransformer->Transform(input, output, ctx); + YQL_CLOG(INFO, CoreExecution) << "FinalizingTransformer done, output #" << output->UniqueId() << ", status: " << status; + return status; + } + + YQL_CLOG(INFO, CoreExecution) << "Begin, root #" << input->UniqueId(); + output = input; + if (RewriteSanityCheck) { + VisitExpr(input, [&](const TExprNode::TPtr& localInput) { + if (NewNodes.cend() != NewNodes.find(localInput.Get())) { + Cerr << "found old node: #" << localInput->UniqueId() << "\n" << input->Dump(); + YQL_ENSURE(false); + } + return true; + }); + } + + auto status = CollectUnusedNodes(*input, ctx); + YQL_CLOG(INFO, CoreExecution) << "Collect unused nodes for root #" << input->UniqueId() << ", status: " << status; + if (status != TStatus::Ok) { + return status; + } + + status = status.Combine(ExecuteNode(input, output, ctx, 0)); + for (auto node: FreshPendingNodes) { + if (TExprNode::EState::ExecutionPending == node->GetState()) { + node->SetState(TExprNode::EState::ConstrComplete); + } + } + FreshPendingNodes.clear(); + if (!ReplaceNewNodes(output, ctx)) { + return TStatus::Error; + } + YQL_CLOG(INFO, CoreExecution) << "Finish, output #" << output->UniqueId() << ", status: " << status; + + if (status != TStatus::Ok || !WithFinalize) { + return status; + } + + YQL_CLOG(INFO, CoreExecution) << "Creating finalizing transformer, output #" << output->UniqueId(); + FinalizingTransformer = CreateCompositeFinalizingTransformer(Types); + return FinalizingTransformer->Transform(input, output, ctx); + } + + NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { + return FinalizingTransformer ? + FinalizingTransformer->GetAsyncFuture(input) : + State->Promise.GetFuture(); + } + + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { + if (FinalizingTransformer) { + return FinalizingTransformer->ApplyAsyncChanges(input, output, ctx); + } + + output = input; + + TStatus combinedStatus = TStatus::Ok; + + TState::TQueueType completed; + auto newPromise = NThreading::NewPromise(); + { + TGuard<TAdaptiveLock> guard(State->Lock); + completed.Swap(State->Completed); + State->Promise.Swap(newPromise); + State->HasResult = false; + } + + for (auto& item : completed) { + auto collectedIt = CollectingNodes.find(item.Node); + if (collectedIt != CollectingNodes.end()) { + YQL_CLOG(INFO, CoreExecution) << "Completed async cleanup for node #" << item.Node->UniqueId(); + TExprNode::TPtr callableOutput; + auto status = item.DataProvider->GetTrackableNodeProcessor().GetCleanupTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx); + combinedStatus = combinedStatus.Combine(status); + CollectingNodes.erase(collectedIt); + continue; + } + + YQL_CLOG(INFO, CoreExecution) << "Completed async execution for node #" << item.Node->UniqueId(); + auto asyncIt = AsyncNodes.find(item.Node); + YQL_ENSURE(asyncIt != AsyncNodes.end()); + TExprNode::TPtr callableOutput; + auto status = item.DataProvider->GetCallableExecutionTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx); + Y_ABORT_UNLESS(callableOutput); + YQL_ENSURE(status != TStatus::Async); + combinedStatus = combinedStatus.Combine(status); + if (status.Level == TStatus::Error) { + item.Node->SetState(TExprNode::EState::Error); + } else if (status.Level == TStatus::Repeat) { + if (callableOutput != item.Node) { + YQL_CLOG(INFO, CoreExecution) << "Rewrite node #" << item.Node->UniqueId() << " to #" << callableOutput->UniqueId() + << " in ApplyAsyncChanges()"; + NewNodes[item.Node] = callableOutput; + combinedStatus = combinedStatus.Combine(TStatus(TStatus::Repeat, true)); + FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput); + } + } + if (callableOutput == item.Node) { + YQL_CLOG(INFO, CoreExecution) << "State is " << item.Node->GetState() + << " after apply async changes for node #" << item.Node->UniqueId(); + } + + if (item.Node->GetState() == TExprNode::EState::ExecutionComplete || + item.Node->GetState() == TExprNode::EState::Error) + { + FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput); + } + + AsyncNodes.erase(asyncIt); + } + + if (!ReplaceNewNodes(output, ctx)) { + return TStatus::Error; + } + + if (!completed.Empty() && combinedStatus.Level == TStatus::Ok) { + combinedStatus = TStatus::Repeat; + } + + return combinedStatus; + } + + bool ReplaceNewNodes(TExprNode::TPtr& output, TExprContext& ctx) { + if (!NewNodes.empty()) { + TOptimizeExprSettings settings(&Types); + settings.VisitChanges = true; + settings.VisitStarted = true; + auto replaceStatus = OptimizeExpr(output, output, [&](const TExprNode::TPtr& input, TExprContext& ctx) -> TExprNode::TPtr { + Y_UNUSED(ctx); + const auto replace = NewNodes.find(input.Get()); + if (NewNodes.cend() != replace) { + return replace->second; + } + + return input; + }, ctx, settings); + + if (!RewriteSanityCheck) { + NewNodes.clear(); + } + + if (replaceStatus.Level == TStatus::Error) { + return false; + } + } + + if (RewriteSanityCheck) { + VisitExpr(output, [&](const TExprNode::TPtr& localInput) { + if (NewNodes.cend() != NewNodes.find(localInput.Get())) { + Cerr << "found old node: #" << localInput->UniqueId() << "\n" << output->Dump(); + YQL_ENSURE(false); + } + return true; + }); + } + return true; + } + + void Rewind() override { + State = MakeIntrusive<TState>(); + State->Promise = NThreading::NewPromise(); + State->HasResult = false; + NewNodes.clear(); + FinalizingTransformer.Reset(); + + TrackableNodes.clear(); + CollectingNodes.clear(); + ProvidersCache.clear(); + AsyncNodes.clear(); + } + + TStatus ExecuteNode(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) { + output = node; + bool changed = false; + const auto knownNode = NewNodes.find(node.Get()); + if (NewNodes.cend() != knownNode) { + output = knownNode->second; + changed = true; + } + + switch (output->GetState()) { + case TExprNode::EState::Initial: + case TExprNode::EState::TypeInProgress: + case TExprNode::EState::TypePending: + case TExprNode::EState::TypeComplete: + case TExprNode::EState::ConstrInProgress: + case TExprNode::EState::ConstrPending: + return TStatus(TStatus::Repeat, true); + case TExprNode::EState::ExecutionInProgress: + return TStatus::Async; + case TExprNode::EState::ExecutionPending: + return ExecuteChildren(output, output, ctx, depth + 1); + case TExprNode::EState::ConstrComplete: + case TExprNode::EState::ExecutionRequired: + break; + case TExprNode::EState::ExecutionComplete: + YQL_ENSURE(output->HasResult()); + OnNodeExecutionComplete(output, ctx); + return changed ? TStatus(TStatus::Repeat, true) : TStatus(TStatus::Ok); + case TExprNode::EState::Error: + return TStatus::Error; + default: + YQL_ENSURE(false, "Unknown state"); + } + + switch (output->Type()) { + case TExprNode::Atom: + case TExprNode::Argument: + case TExprNode::Arguments: + case TExprNode::Lambda: + ctx.AddError(TIssue(ctx.GetPosition(output->Pos()), TStringBuilder() << "Failed to execute node with type: " << output->Type())); + output->SetState(TExprNode::EState::Error); + return TStatus::Error; + + case TExprNode::List: + case TExprNode::Callable: + { + auto prevOutput = output; + auto status = output->Type() == TExprNode::Callable + ? ExecuteCallable(output, output, ctx, depth) + : ExecuteList(output, ctx); + if (status.Level == TStatus::Error) { + output->SetState(TExprNode::EState::Error); + } else if (status.Level == TStatus::Ok) { + output->SetState(TExprNode::EState::ExecutionComplete); + OnNodeExecutionComplete(output, ctx); + YQL_ENSURE(output->HasResult()); + } else if (status.Level == TStatus::Repeat) { + if (!status.HasRestart) { + output->SetState(TExprNode::EState::ExecutionPending); + status = ExecuteChildren(output, output, ctx, depth + 1); + if (TExprNode::EState::ExecutionPending == output->GetState()) { + FreshPendingNodes.push_back(output.Get()); + } + if (status.Level != TStatus::Repeat) { + return status; + } + } + if (output != prevOutput) { + YQL_CLOG(INFO, CoreExecution) << "Rewrite node #" << node->UniqueId() << " to #" << output->UniqueId(); + NewNodes[node.Get()] = output; + } + return TStatus(TStatus::Repeat, output != prevOutput); + } else if (status.Level == TStatus::Async) { + output->SetState(TExprNode::EState::ExecutionInProgress); + } + + return status; + } + + case TExprNode::World: + output->SetState(TExprNode::EState::ExecutionComplete); + return TStatus::Ok; + + default: + YQL_ENSURE(false, "Unknown type"); + } + } + + TStatus ExecuteChildren(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) { + TStatus combinedStatus = TStatus::Ok; + TExprNode::TListType newChildren; + bool newNode = false; + for (auto& child : node->Children()) { + auto newChild = child; + if (child->GetState() == TExprNode::EState::ExecutionRequired) { + auto childStatus = ExecuteNode(child, newChild, ctx, depth); + if (childStatus.Level == TStatus::Error) + return childStatus; + + combinedStatus = combinedStatus.Combine(childStatus); + } else if (child->GetState() == TExprNode::EState::ExecutionInProgress) { + combinedStatus = combinedStatus.Combine(TStatus::Async); + } else if (child->GetState() == TExprNode::EState::ExecutionPending) { + combinedStatus = combinedStatus.Combine(TStatus::Repeat); + } + newChildren.push_back(newChild); + if (newChild != child) { + newNode = true; + } + } + + if (combinedStatus.Level == TStatus::Ok) { + Y_DEBUG_ABORT_UNLESS(!newNode); + node->SetState(TExprNode::EState::ConstrComplete); + return ExecuteNode(node, output, ctx, depth - 1); + } else { + if (combinedStatus.Level == TStatus::Error) { + node->SetState(TExprNode::EState::Error); + } + if (newNode) { + output = ctx.ChangeChildren(*node, std::move(newChildren)); + } + + return combinedStatus; + } + } + + TStatus ExecuteList(const TExprNode::TPtr& node, TExprContext& ctx) { + IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok; + for (ui32 i = 0; i < node->ChildrenSize(); ++i) { + combinedStatus = combinedStatus.Combine(RequireChild(*node, i)); + } + + if (combinedStatus.Level != IGraphTransformer::TStatus::Ok) { + return combinedStatus; + } + + node->SetResult(ctx.NewWorld(node->Pos())); + return TStatus::Ok; + } + + IDataProvider* GetDataProvider(const TExprNode& node) const { + IDataProvider* dataProvider = nullptr; + for (const auto& x : Types.DataSources) { + if (x->CanExecute(node)) { + dataProvider = x.Get(); + break; + } + } + + if (!dataProvider) { + for (const auto& x : Types.DataSinks) { + if (x->CanExecute(node)) { + dataProvider = x.Get(); + } + } + } + return dataProvider; + } + + TStatus ExecuteCallable(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) { + YQL_CLOG(TRACE, CoreExecution) << '{' << depth << "}, callable #" + << node->UniqueId() << " <" << node->Content() << '>'; + if (node->Content() == CommitName) { + auto requireStatus = RequireChild(*node, 0); + if (requireStatus.Level != TStatus::Ok) { + return requireStatus; + } + + auto category = node->Child(1)->Child(0)->Content(); + auto datasink = Types.DataSinkMap.FindPtr(category); + YQL_ENSURE(datasink); + output = node; + auto status = (*datasink)->GetCallableExecutionTransformer().Transform(node, output, ctx); + if (status.Level == TStatus::Async) { + Y_DEBUG_ABORT_UNLESS(output == node); + StartNode(category, *node); + AddCallable(node, (*datasink).Get(), ctx); + } else { + if (output->GetState() == TExprNode::EState::ExecutionComplete || + output->GetState() == TExprNode::EState::Error) + { + StartNode(category, *node); + FinishNode(category, *node, *output); + } + } + + return status; + } + + if (node->Content() == SyncName) { + return ExecuteList(node, ctx); + } + + if (node->Content() == LeftName) { + auto requireStatus = RequireChild(*node, 0); + if (requireStatus.Level != TStatus::Ok) { + return requireStatus; + } + + node->SetResult(ctx.NewWorld(node->Pos())); + return TStatus::Ok; + } + + if (node->Content() == RightName) { + auto requireStatus = RequireChild(*node, 0); + if (requireStatus.Level != TStatus::Ok) { + return requireStatus; + } + + node->SetResult(ctx.NewWorld(node->Pos())); + return TStatus::Ok; + } + + IDataProvider* dataProvider = GetDataProvider(*node); + if (!dataProvider) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute callable with name: " << node->Content())); + return TStatus::Error; + } + + output = node; + TStatus status = dataProvider->GetCallableExecutionTransformer().Transform(node, output, ctx); + if (status.Level == TStatus::Async) { + Y_DEBUG_ABORT_UNLESS(output == node); + StartNode(dataProvider->GetName(), *node); + AddCallable(node, dataProvider, ctx); + } else { + if (output->GetState() == TExprNode::EState::ExecutionComplete || + output->GetState() == TExprNode::EState::Error) + { + StartNode(dataProvider->GetName(), *node); + FinishNode(dataProvider->GetName(), *node, *output); + } + } + + return status; + } + + void AddCallable(const TExprNode::TPtr& node, IDataProvider* dataProvider, TExprContext& ctx) { + Y_UNUSED(ctx); + YQL_CLOG(INFO, CoreExecution) << "Register async execution for node #" << node->UniqueId(); + auto future = dataProvider->GetCallableExecutionTransformer().GetAsyncFuture(*node); + AsyncNodes[node.Get()] = node; + SubscribeAsyncFuture(node, dataProvider, future); + } + + static void ProcessFutureResultQueue(TStatePtr state) { + NThreading::TPromise<void> promiseToSet; + bool hasResult = false; + + TGuard<TAdaptiveLock> guard(state->Lock); + while (!state->Inflight.Empty()) { + auto* first = state->Inflight.Front(); + if (first->Future.HasValue()) { + state->Inflight.PopFront(); + state->Completed.PushBack(first); + hasResult = true; + } else { + break; + } + } + guard.Release(); + + if (hasResult && !state->HasResult) { + state->HasResult = true; + promiseToSet = state->Promise; + } + + if (promiseToSet.Initialized()) { + promiseToSet.SetValue(); + } + } + + static void ProcessAsyncFutureResult(TStatePtr state, TAutoPtr<TState::TItem> item) { + NThreading::TPromise<void> promiseToSet; + { + TGuard<TAdaptiveLock> guard(state->Lock); + state->Completed.PushBack(item.Release()); + if (!state->HasResult) { + state->HasResult = true; + promiseToSet = state->Promise; + } + } + + if (promiseToSet.Initialized()) { + promiseToSet.SetValue(); + } + } + + void SubscribeAsyncFuture(const TExprNode::TPtr& node, IDataProvider* dataProvider, const NThreading::TFuture<void>& future) + { + auto state = State; + if (DeterministicMode) { + TAutoPtr<TState::TItem> item = new TState::TItem; + item->Node = node.Get(); item->DataProvider = dataProvider; item->Future = future; + + TGuard<TAdaptiveLock> guard(state->Lock); + state->Inflight.PushBack(item.Release()); + } + + if (DeterministicMode) { + future.Subscribe([state](const NThreading::TFuture<void>& future) { + YQL_ENSURE(!future.HasException()); + ProcessFutureResultQueue(state); + }); + } else { + future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) { + YQL_ENSURE(!future.HasException()); + + TAutoPtr<TState::TItem> item = new TState::TItem; + item->Node = node; item->DataProvider = dataProvider; + ProcessAsyncFutureResult(state, item.Release()); + }); + } + } + + void StartNode(TStringBuf category, const TExprNode& node) { + auto publicId = Types.TranslateOperationId(node.UniqueId()); + if (publicId) { + auto x = Progresses.insert({ *publicId, + TOperationProgress(TString(category), *publicId, TOperationProgress::EState::Started) }); + if (x.second) { + Writer(x.first->second); + } + } + } + + void FinishNode(TStringBuf category, const TExprNode& node, const TExprNode& newNode) { + auto publicId = Types.TranslateOperationId(node.UniqueId()); + if (publicId) { + if (newNode.UniqueId() != node.UniqueId()) { + Types.NodeToOperationId[newNode.UniqueId()] = *publicId; + } + + auto progIt = Progresses.find(*publicId); + YQL_ENSURE(progIt != Progresses.end()); + + auto newState = (node.GetState() == TExprNode::EState::ExecutionComplete) + ? TOperationProgress::EState::Finished + : TOperationProgress::EState::Failed; + + if (progIt->second.State != newState) { + TString stage = progIt->second.Stage.first; + progIt->second = TOperationProgress(TString(category), *publicId, newState, stage); + Writer(progIt->second); + } + } + } + + void OnNodeExecutionComplete(const TExprNode::TPtr& node, TExprContext& ctx) { + auto nodeId = node->UniqueId(); + YQL_CLOG(INFO, CoreExecution) << "Node #" << nodeId << "<" << node->Content() << "> finished execution"; + + auto dataProvider = GetDataProvider(*node); + if (!dataProvider) { + return; + } + + TVector<ITrackableNodeProcessor::TExprNodeAndId> createdNodes; + dataProvider->GetTrackableNodeProcessor().GetCreatedNodes(*node, createdNodes, ctx); + + TVector<TString> ids; + for (const auto& c : createdNodes) { + auto& info = TrackableNodes[c.Id]; + info.Provider = dataProvider; + info.Node = c.Node; + ids.push_back(c.Id); + } + YQL_CLOG(INFO, CoreExecution) << "Node #" << nodeId << "<" << node->Content() << "> created " << ids.size() + << " trackable nodes: " << JoinSeq(", ", ids); + } + + TStatus CollectUnusedNodes(const TExprNode& root, TExprContext& ctx) { + if (TrackableNodes.empty()) { + return TStatus::Ok; + } + + YQL_CLOG(TRACE, CoreExecution) << "Collecting unused nodes on root #" << root.UniqueId(); + + THashSet<ui64> visited; + THashSet<TString> usedIds; + VisitExpr(root, [&](const TExprNode& node) { + if (node.GetState() == TExprNode::EState::ExecutionComplete) { + return false; + } + + auto nodeId = node.UniqueId(); + visited.insert(nodeId); + + TIntrusivePtr<IDataProvider> dataProvider; + auto providerIt = ProvidersCache.find(nodeId); + if (providerIt != ProvidersCache.end()) { + YQL_ENSURE(providerIt->second); + dataProvider = providerIt->second; + } else { + dataProvider = GetDataProvider(node); + if (dataProvider) { + ProvidersCache[nodeId] = dataProvider; + } + } + + if (dataProvider) { + TVector<TString> usedNodes; + dataProvider->GetTrackableNodeProcessor().GetUsedNodes(node, usedNodes); + usedIds.insert(usedNodes.begin(), usedNodes.end()); + } + + return true; + }); + + for (auto i = ProvidersCache.begin(); i != ProvidersCache.end();) { + if (visited.count(i->first) == 0) { + ProvidersCache.erase(i++); + } else { + ++i; + } + } + + THashMap<TIntrusivePtr<IDataProvider>, TExprNode::TListType> toCollect; + for (auto i = TrackableNodes.begin(); i != TrackableNodes.end();) { + TString id = i->first; + TTrackableNodeInfo info = i->second; + if (!usedIds.contains(id)) { + YQL_ENSURE(info.Node); + YQL_ENSURE(info.Provider); + YQL_CLOG(INFO, CoreExecution) << "Marking node " << id << " for collection"; + toCollect[info.Provider].push_back(info.Node); + TrackableNodes.erase(i++); + } else { + ++i; + } + } + + TStatus collectStatus = TStatus::Ok; + for (auto& i : toCollect) { + const auto& provider = i.first; + YQL_ENSURE(!i.second.empty()); + auto pos = i.second.front()->Pos(); + TExprNode::TPtr collectNode = ctx.NewList(pos, std::move(i.second)); + TExprNode::TPtr output; + TStatus status = provider->GetTrackableNodeProcessor().GetCleanupTransformer().Transform(collectNode, output, ctx); + YQL_ENSURE(status != TStatus::Repeat); + + collectStatus = collectStatus.Combine(status); + if (status == TStatus::Error) { + break; + } + + if (status == TStatus::Async) { + CollectingNodes[collectNode.Get()] = collectNode; + auto future = provider->GetTrackableNodeProcessor().GetCleanupTransformer().GetAsyncFuture(*collectNode); + SubscribeAsyncFuture(collectNode, provider.Get(), future); + } + } + + return collectStatus; + } + +private: + TTypeAnnotationContext& Types; + TOperationProgressWriter Writer; + const bool WithFinalize; + TStatePtr State; + TNodeOnNodeOwnedMap NewNodes; + TAutoPtr<IGraphTransformer> FinalizingTransformer; + THashMap<ui32, TOperationProgress> Progresses; + + struct TTrackableNodeInfo + { + TIntrusivePtr<IDataProvider> Provider; + TExprNode::TPtr Node; + }; + + THashMap<TString, TTrackableNodeInfo> TrackableNodes; + TNodeOnNodeOwnedMap CollectingNodes; + THashMap<ui64, TIntrusivePtr<IDataProvider>> ProvidersCache; + TExprNode::TListType FreshPendingNodes; + + bool DeterministicMode; + TNodeOnNodeOwnedMap AsyncNodes; +}; + +IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited); + +IGraphTransformer::TStatus ValidateList(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited) { + IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok; + for (ui32 i = 0; i < node->ChildrenSize(); ++i) { + combinedStatus = combinedStatus.Combine(ValidateExecution(node->ChildPtr(i), ctx, types, visited)); + } + + return combinedStatus; +} + +IGraphTransformer::TStatus ValidateCallable(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited) { + using TStatus = IGraphTransformer::TStatus; + + if (node->Content() == CommitName) { + auto datasink = types.DataSinkMap.FindPtr(node->Child(1)->Child(0)->Content()); + if (!datasink) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown datasink: " << node->Child(1)->Child(0)->Content())); + return TStatus::Error; + } + + return ValidateExecution(node->ChildPtr(0), ctx, types, visited); + } + + if (node->Content() == SyncName) { + return ValidateList(node, ctx, types, visited); + } + + if (node->Content() == LeftName) { + return ValidateExecution(node->ChildPtr(0), ctx, types, visited); + } + + if (node->Content() == RightName) { + return ValidateExecution(node->ChildPtr(0), ctx, types, visited); + } + + IDataProvider* dataProvider = nullptr; + for (auto& x : types.DataSources) { + if (x->CanExecute(*node)) { + dataProvider = x.Get(); + break; + } + } + + if (!dataProvider) { + for (auto& x : types.DataSinks) { + if (x->CanExecute(*node)) { + dataProvider = x.Get(); + break; + } + } + } + + if (!dataProvider) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute callable with name: " << node->Content() + << ", you possibly used cross provider/cluster operations or pulled not materialized result in refselect mode")); + return TStatus::Error; + } + + if (node->ChildrenSize() < 2) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Executable callable should have at least 2 children")); + return TStatus::Error; + } + + if (!dataProvider->ValidateExecution(*node, ctx)) { + return TStatus::Error; + } + + TExprNode::TListType childrenToCheck; + dataProvider->GetRequiredChildren(*node, childrenToCheck); + IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok; + for (ui32 i = 0; i < childrenToCheck.size(); ++i) { + combinedStatus = combinedStatus.Combine(ValidateExecution(childrenToCheck[i], ctx, types, visited)); + } + + return combinedStatus; +} + +IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx, + const TTypeAnnotationContext& types, TNodeSet& visited) { + using TStatus = IGraphTransformer::TStatus; + if (node->GetState() == TExprNode::EState::ExecutionComplete) { + return TStatus::Ok; + } + + if (!node->GetTypeAnn()) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Node has no type annotation")); + return TStatus::Error; + } + + TStatus status = TStatus::Ok; + switch (node->Type()) { + case TExprNode::Atom: + case TExprNode::Argument: + case TExprNode::Arguments: + case TExprNode::Lambda: + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute node with type: " << node->Type())); + return TStatus::Error; + + case TExprNode::List: + return ValidateList(node, ctx, types, visited); + + case TExprNode::Callable: + if (visited.cend() != visited.find(node.Get())) { + return TStatus::Ok; + } + + status = ValidateCallable(node, ctx, types, visited); + if (status.Level == TStatus::Ok) { + visited.insert(node.Get()); + } + + break; + + case TExprNode::World: + break; + + default: + YQL_ENSURE(false, "Unknown type"); + } + + return status; +} + +} + +TAutoPtr<IGraphTransformer> CreateExecutionTransformer( + TTypeAnnotationContext& types, + TOperationProgressWriter writer, + bool withFinalize) { + return new TExecutionTransformer(types, writer, withFinalize); +} + +TAutoPtr<IGraphTransformer> CreateCheckExecutionTransformer(const TTypeAnnotationContext& types, bool checkWorld) { + return CreateFunctorTransformer([&types, checkWorld](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) + -> IGraphTransformer::TStatus { + output = input; + if (checkWorld) { + TNodeSet visited; + auto status = ValidateExecution(input, ctx, types, visited); + if (status.Level != IGraphTransformer::TStatus::Ok) { + return status; + } + } + + TParentsMap parentsMap; + THashSet<TExprNode*> overWinNodes; + GatherParents(*input, parentsMap); + bool hasErrors = false; + THashSet<TIssue> added; + auto funcCheckExecution = [&](const THashSet<TStringBuf>& notAllowList, bool collectCalcOverWindow, const TExprNode::TPtr& node) { + if (node->IsCallable("ErrorType")) { + hasErrors = true; + const auto err = node->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TErrorExprType>()->GetError(); + if (added.insert(err).second) { + ctx.AddError(err); + } + } else if (node->IsCallable("TablePath") || node->IsCallable("TableRecord")) { + auto issue = TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << node->Content() << " will be empty"); + SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_FREE_TABLE_PATH_RECORD, issue); + if (!ctx.AddWarning(issue)) { + hasErrors = true; + } + } else if (node->IsCallable("UnsafeTimestampCast")) { + auto issue = TIssue(ctx.GetPosition(node->Pos()), "Unsafe conversion integral value to Timestamp, consider using date types"); + SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_CAST_INTEGRAL_TO_TIMESTAMP_UNSAFE, issue); + if (!ctx.AddWarning(issue)) { + hasErrors = true; + } + } else if (collectCalcOverWindow && node->IsCallable({"CalcOverWindow", "CalcOverSessionWindow", "CalcOverWindowGroup"})) { + overWinNodes.emplace(node.Get()); + return false; + } else if (node->IsCallable(notAllowList)) { + hasErrors = true; + const auto err = TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Can't execute " << node->Content()); + if (added.insert(err).second) { + ctx.AddError(err); + } + } else if (node->Type() != TExprNode::Lambda && + (node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream || node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow)) { + auto parentsIt = parentsMap.find(node.Get()); + if (parentsIt != parentsMap.end()) { + ui32 usageCount = 0; + for (auto& x : parentsIt->second) { + if (x->IsCallable("DependsOn")) { + continue; + } + + for (auto& y : x->Children()) { + if (y.Get() == node.Get()) { + ++usageCount; + } + } + } + + if (usageCount > 1 && !node->GetConstraint<TEmptyConstraintNode>()) { + hasErrors = true; + const auto err = TIssue(ctx.GetPosition(node->Pos()), "Multiple stream clients"); + if (added.insert(err).second) { + ctx.AddError(err); + } + } + } + } + + return true; + }; + static const THashSet<TStringBuf> noExecutionList = {"InstanceOf", "Lag", "Lead", "RowNumber", "Rank", "DenseRank", "PercentRank", "CumeDist", "NTile"}; + static const THashSet<TStringBuf> noExecutionListForCalcOverWindow = {"InstanceOf"}; + VisitExpr(input, [funcCheckExecution](const TExprNode::TPtr& node) { + bool collectCalcOverWindow = true; + return funcCheckExecution(noExecutionList, collectCalcOverWindow, node); + }); + for (auto overWin: overWinNodes) { + VisitExpr(overWin, [funcCheckExecution](const TExprNode::TPtr& node) { + bool collectCalcOverWindow = false; + return funcCheckExecution(noExecutionListForCalcOverWindow, collectCalcOverWindow, node); + }); + } + + return hasErrors ? IGraphTransformer::TStatus::Error : IGraphTransformer::TStatus::Ok; + }); +}; + +IGraphTransformer::TStatus RequireChild(const TExprNode& node, ui32 index) { + switch (node.Child(index)->GetState()) { + case TExprNode::EState::Error: + case TExprNode::EState::ExecutionComplete: + return IGraphTransformer::TStatus::Ok; + case TExprNode::EState::ExecutionInProgress: + case TExprNode::EState::ExecutionPending: + return IGraphTransformer::TStatus::Repeat; + default: + break; + } + + node.Child(index)->SetState(TExprNode::EState::ExecutionRequired); + return IGraphTransformer::TStatus::Repeat; +} + +} + +template<> +void Out<NYql::TOperationProgress::EState>(class IOutputStream &o, NYql::TOperationProgress::EState x) { +#define YQL_OPERATION_PROGRESS_STATE_MAP_TO_STRING_IMPL(name, ...) \ + case NYql::TOperationProgress::EState::name: \ + o << #name; \ + return; + + switch (x) { + YQL_OPERATION_PROGRESS_STATE_MAP(YQL_OPERATION_PROGRESS_STATE_MAP_TO_STRING_IMPL) + default: + o << static_cast<int>(x); + return; + } +} |