diff options
author | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-06-10 20:30:44 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-06-10 20:30:44 +0300 |
commit | 15ca532406441566b680acabb27207b68add52f1 (patch) | |
tree | 24f7c59fd4b7dfca8810c7e3506cc0a934c83cc9 | |
parent | 4e59df6e51cde36d20e6710e9f1ff6d684737f47 (diff) | |
download | ydb-15ca532406441566b680acabb27207b68add52f1.tar.gz |
[yql] Handle DqPrecompute in Result
YQL-12393
ref:a3040fde9907bf06ff96f46f6add98cf1c574b09
-rw-r--r-- | ydb/library/yql/providers/dq/opt/dqs_opt.cpp | 63 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp | 76 |
2 files changed, 93 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp index 850d24944f..c171924714 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp @@ -34,8 +34,7 @@ do { \ if (auto result = func(__VA_ARGS__); result.Raw() != node.Raw()) { \ YQL_CLOG(DEBUG, ProviderDq) << #func; \ - node = result; \ - return node.Ptr(); \ + return result.Ptr(); \ } \ } while (0) @@ -65,18 +64,12 @@ namespace NYql::NDqs { } THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext* typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) { - return CreateFunctorTransformer([typesCtx, funcRegistry](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - TProcessedNodesSet ignoreNodes; - VisitExpr(input, [&](const TExprNode::TPtr& node) { - if (node != input && (TDqReadWrapBase::Match(node.Get()) || TDqPhyPrecompute::Match(node.Get()))) { - ignoreNodes.insert(node->UniqueId()); - return false; - } - return true; - }); - + return CreateFunctorTransformer([typesCtx, funcRegistry](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus { TOptimizeExprSettings settings(typesCtx); - settings.ProcessedNodes = &ignoreNodes; + settings.VisitChecker = [&](const TExprNode& node) { + return input.Get() == &node || (!TDqReadWrapBase::Match(&node) && !TDqPhyPrecompute::Match(&node)); + }; + settings.VisitStarted = true; NKikimr::NMiniKQL::TScopedAlloc alloc; NKikimr::NMiniKQL::TTypeEnvironment env(alloc); @@ -84,7 +77,7 @@ namespace NYql::NDqs { NKikimr::NMiniKQL::TMemoryUsageInfo memInfo("Precompute"); NKikimr::NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo); - return OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { + auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { if (TDqStageBase::Match(node.Get())) { auto stage = TDqStageBase(node); TNodeOnNodeOwnedMap replaces; @@ -130,6 +123,48 @@ namespace NYql::NDqs { } return node; }, ctx, settings); + + if (status.Level != TStatus::Ok) { + return status; + } + + auto precomputes = FindNodes(output, + [](const TExprNode::TPtr& node) { + return !TDqReadWrapBase::Match(node.Get()); + }, + [] (const TExprNode::TPtr& node) { + return TDqPhyPrecompute::Match(node.Get()) && node->HasResult(); + } + ); + + if (!precomputes.empty()) { + TNodeOnNodeOwnedMap replaces; + for (auto node: precomputes) { + auto yson = node->GetResult().Content(); + auto dataNode = NYT::NodeFromYsonString(yson); + YQL_ENSURE(dataNode.IsList() && !dataNode.AsList().empty()); + dataNode = dataNode[0]; + TStringStream err; + NKikimr::NMiniKQL::TType* mkqlType = NCommon::BuildType(*node->GetTypeAnn(), pgmBuilder, err); + if (!mkqlType) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to process " << TDqPhyPrecompute::CallableName() << " type: " << err.Str())); + return TStatus::Error; + } + + auto value = NCommon::ParseYsonNodeInResultFormat(holderFactory, dataNode, mkqlType, &err); + if (!value) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to parse " << TDqPhyPrecompute::CallableName() << " value: " << err.Str())); + return TStatus::Error; + } + replaces[node.Get()] = NCommon::ValueToExprLiteral(node->GetTypeAnn(), *value, ctx, node->Pos()); + } + TOptimizeExprSettings settings(typesCtx); + settings.VisitStarted = true; + YQL_CLOG(DEBUG, ProviderDq) << "DqsReplacePrecomputes"; + return RemapExpr(output, output, replaces, ctx, settings); + } + + return TStatus::Ok; }); } diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index a703de40cc..63b633b648 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -285,7 +285,7 @@ private: } } - TExprNode::TPtr GetLambdaBody(int& level, TExprNode::TPtr&& node, TExprContext& ctx) const { + TExprNode::TPtr WrapLambdaBody(int& level, TExprNode::TPtr node, TExprContext& ctx) const { const auto kind = node->GetTypeAnn()->GetKind(); const bool data = kind != ETypeAnnotationKind::Flow && kind != ETypeAnnotationKind::List && kind != ETypeAnnotationKind::Stream && kind != ETypeAnnotationKind::Optional; level = data ? 1 : 0; @@ -517,31 +517,21 @@ private: TStatusCallbackPair GetLambda( TString* lambda, bool* untrustedUdfFlag, - int* level, TUploadList* uploadList, - const TResult& result, TExprContext& ctx, + const TExprNode::TPtr& resInput, TExprContext& ctx, bool hasGraphParams, bool enableLocalRun) const { - auto input = Build<TDqPhyStage>(ctx, result.Pos()) + auto input = Build<TDqPhyStage>(ctx, resInput->Pos()) .Inputs() .Build() .Program<TCoLambda>() .Args({}) - .Body(GetLambdaBody(*level, result.Input().Ptr(), ctx)) + .Body(resInput) .Build() .Settings().Build() .Done().Ptr(); - { - auto block = MeasureBlock("PeepHole"); - - bool hasNonDeterministicFunctions = false; - if (const auto status = PeepHoleOptimizeNode<true>(input, input, ctx, *State->TypeCtx, nullptr, hasNonDeterministicFunctions); status.Level != TStatus::Ok) { - return SyncStatus(status); - } - } - // copy-paste { TUserDataTable crutches = State->TypeCtx->UserDataStorageCrutches; TUserDataTable files; @@ -675,6 +665,22 @@ private: try { auto result = TMaybeNode<TResult>(input).Cast(); + + auto precomputes = FindIndependentPrecomputes(result.Input().Ptr()); + if (!precomputes.empty()) { + auto status = HandlePrecomputes(precomputes, ctx); + if (status.Level != TStatus::Ok) { + if (status == TStatus::Async) { + return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) { + completedFuture.GetValue(); + return HandlePrecomputeAsyncComplete(execState); + })); + } else { + return SyncStatus(status); + } + } + } + IDataProvider::TFillSettings fillSettings = NCommon::GetFillSettings(result.Ref()); auto settings = State->Settings->WithFillSettings(fillSettings); if (!settings->_RowsLimitPerWrite.Get() && !settings->_AllResultsBytesLimit.Get()) { @@ -690,12 +696,6 @@ private: TString type; TVector<TString> columns; GetResultType(&type, &columns, result.Ref(), result.Input().Ref()); - TString lambda; - bool untrustedUdfFlag; - int level; - TUploadList uploadList; - - bool enableLocalRun = true; TPublicIds::TPtr publicIds = std::make_shared<TPublicIds>(); VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) { @@ -720,9 +720,24 @@ private: && integration->PrepareFullResultTableParams(result.Ref(), ctx, graphParams, secureParams); settings->EnableFullResultWrite = enableFullResultWrite; } + + int level; + TExprNode::TPtr resInput = WrapLambdaBody(level, result.Input().Ptr(), ctx); + { + auto block = MeasureBlock("PeepHole"); + if (const auto status = PeepHole(resInput, resInput, ctx); status.Level != TStatus::Ok) { + return SyncStatus(status); + } + } + + TString lambda; + bool untrustedUdfFlag; + TUploadList uploadList; + + bool enableLocalRun = true; + NThreading::TFuture<IDqGateway::TResult> future; bool localRun = false; - // try to prepare lambda with localRun 'on' and 'off' for (int i = 0; i < 2 && !future.Initialized(); i++) { uploadList.clear(); @@ -730,9 +745,8 @@ private: auto lambdaResult = GetLambda( &lambda, &untrustedUdfFlag, - &level, &uploadList, - result, + resInput, ctx, hasGraphParams, enableLocalRun); @@ -1361,15 +1375,13 @@ private: if (TDqStageBase::Match(node.Get())) { auto stage = TDqStageBase(node); for (const auto& input : stage.Inputs()) { - if (auto maybePrecompute = input.Maybe<TDqPhyPrecompute>()) { - if (!input.Ref().HasResult() && input.Ref().GetState() != TExprNode::EState::Error) { - hasPrecompute = true; - if (input.Ref().StartsExecution() || !FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes)) { - precomputes[input.Raw()] = input.Ptr(); - } - } - } else { - hasPrecompute = FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes) || hasPrecompute; + hasPrecompute = FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes) || hasPrecompute; + } + } else if (TDqPhyPrecompute::Match(node.Get())) { + if (!node->HasResult() && node->GetState() != TExprNode::EState::Error) { + hasPrecompute = true; + if (node->StartsExecution() || !FindIndependentPrecomputesImpl(node->HeadPtr(), precomputes, visitedNodes)) { + precomputes[node.Get()] = node; } } } else { |