aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.ru>2022-06-10 20:30:44 +0300
committerudovichenko-r <udovichenko-r@yandex-team.ru>2022-06-10 20:30:44 +0300
commit15ca532406441566b680acabb27207b68add52f1 (patch)
tree24f7c59fd4b7dfca8810c7e3506cc0a934c83cc9
parent4e59df6e51cde36d20e6710e9f1ff6d684737f47 (diff)
downloadydb-15ca532406441566b680acabb27207b68add52f1.tar.gz
[yql] Handle DqPrecompute in Result
YQL-12393 ref:a3040fde9907bf06ff96f46f6add98cf1c574b09
-rw-r--r--ydb/library/yql/providers/dq/opt/dqs_opt.cpp63
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp76
2 files changed, 93 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
index 850d24944f..c171924714 100644
--- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
+++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
@@ -34,8 +34,7 @@
do { \
if (auto result = func(__VA_ARGS__); result.Raw() != node.Raw()) { \
YQL_CLOG(DEBUG, ProviderDq) << #func; \
- node = result; \
- return node.Ptr(); \
+ return result.Ptr(); \
} \
} while (0)
@@ -65,18 +64,12 @@ namespace NYql::NDqs {
}
THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext* typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) {
- return CreateFunctorTransformer([typesCtx, funcRegistry](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- TProcessedNodesSet ignoreNodes;
- VisitExpr(input, [&](const TExprNode::TPtr& node) {
- if (node != input && (TDqReadWrapBase::Match(node.Get()) || TDqPhyPrecompute::Match(node.Get()))) {
- ignoreNodes.insert(node->UniqueId());
- return false;
- }
- return true;
- });
-
+ return CreateFunctorTransformer([typesCtx, funcRegistry](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
TOptimizeExprSettings settings(typesCtx);
- settings.ProcessedNodes = &ignoreNodes;
+ settings.VisitChecker = [&](const TExprNode& node) {
+ return input.Get() == &node || (!TDqReadWrapBase::Match(&node) && !TDqPhyPrecompute::Match(&node));
+ };
+ settings.VisitStarted = true;
NKikimr::NMiniKQL::TScopedAlloc alloc;
NKikimr::NMiniKQL::TTypeEnvironment env(alloc);
@@ -84,7 +77,7 @@ namespace NYql::NDqs {
NKikimr::NMiniKQL::TMemoryUsageInfo memInfo("Precompute");
NKikimr::NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo);
- return OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
if (TDqStageBase::Match(node.Get())) {
auto stage = TDqStageBase(node);
TNodeOnNodeOwnedMap replaces;
@@ -130,6 +123,48 @@ namespace NYql::NDqs {
}
return node;
}, ctx, settings);
+
+ if (status.Level != TStatus::Ok) {
+ return status;
+ }
+
+ auto precomputes = FindNodes(output,
+ [](const TExprNode::TPtr& node) {
+ return !TDqReadWrapBase::Match(node.Get());
+ },
+ [] (const TExprNode::TPtr& node) {
+ return TDqPhyPrecompute::Match(node.Get()) && node->HasResult();
+ }
+ );
+
+ if (!precomputes.empty()) {
+ TNodeOnNodeOwnedMap replaces;
+ for (auto node: precomputes) {
+ auto yson = node->GetResult().Content();
+ auto dataNode = NYT::NodeFromYsonString(yson);
+ YQL_ENSURE(dataNode.IsList() && !dataNode.AsList().empty());
+ dataNode = dataNode[0];
+ TStringStream err;
+ NKikimr::NMiniKQL::TType* mkqlType = NCommon::BuildType(*node->GetTypeAnn(), pgmBuilder, err);
+ if (!mkqlType) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to process " << TDqPhyPrecompute::CallableName() << " type: " << err.Str()));
+ return TStatus::Error;
+ }
+
+ auto value = NCommon::ParseYsonNodeInResultFormat(holderFactory, dataNode, mkqlType, &err);
+ if (!value) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to parse " << TDqPhyPrecompute::CallableName() << " value: " << err.Str()));
+ return TStatus::Error;
+ }
+ replaces[node.Get()] = NCommon::ValueToExprLiteral(node->GetTypeAnn(), *value, ctx, node->Pos());
+ }
+ TOptimizeExprSettings settings(typesCtx);
+ settings.VisitStarted = true;
+ YQL_CLOG(DEBUG, ProviderDq) << "DqsReplacePrecomputes";
+ return RemapExpr(output, output, replaces, ctx, settings);
+ }
+
+ return TStatus::Ok;
});
}
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index a703de40cc..63b633b648 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -285,7 +285,7 @@ private:
}
}
- TExprNode::TPtr GetLambdaBody(int& level, TExprNode::TPtr&& node, TExprContext& ctx) const {
+ TExprNode::TPtr WrapLambdaBody(int& level, TExprNode::TPtr node, TExprContext& ctx) const {
const auto kind = node->GetTypeAnn()->GetKind();
const bool data = kind != ETypeAnnotationKind::Flow && kind != ETypeAnnotationKind::List && kind != ETypeAnnotationKind::Stream && kind != ETypeAnnotationKind::Optional;
level = data ? 1 : 0;
@@ -517,31 +517,21 @@ private:
TStatusCallbackPair GetLambda(
TString* lambda,
bool* untrustedUdfFlag,
- int* level,
TUploadList* uploadList,
- const TResult& result, TExprContext& ctx,
+ const TExprNode::TPtr& resInput, TExprContext& ctx,
bool hasGraphParams,
bool enableLocalRun) const
{
- auto input = Build<TDqPhyStage>(ctx, result.Pos())
+ auto input = Build<TDqPhyStage>(ctx, resInput->Pos())
.Inputs()
.Build()
.Program<TCoLambda>()
.Args({})
- .Body(GetLambdaBody(*level, result.Input().Ptr(), ctx))
+ .Body(resInput)
.Build()
.Settings().Build()
.Done().Ptr();
- {
- auto block = MeasureBlock("PeepHole");
-
- bool hasNonDeterministicFunctions = false;
- if (const auto status = PeepHoleOptimizeNode<true>(input, input, ctx, *State->TypeCtx, nullptr, hasNonDeterministicFunctions); status.Level != TStatus::Ok) {
- return SyncStatus(status);
- }
- }
-
// copy-paste {
TUserDataTable crutches = State->TypeCtx->UserDataStorageCrutches;
TUserDataTable files;
@@ -675,6 +665,22 @@ private:
try {
auto result = TMaybeNode<TResult>(input).Cast();
+
+ auto precomputes = FindIndependentPrecomputes(result.Input().Ptr());
+ if (!precomputes.empty()) {
+ auto status = HandlePrecomputes(precomputes, ctx);
+ if (status.Level != TStatus::Ok) {
+ if (status == TStatus::Async) {
+ return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
+ completedFuture.GetValue();
+ return HandlePrecomputeAsyncComplete(execState);
+ }));
+ } else {
+ return SyncStatus(status);
+ }
+ }
+ }
+
IDataProvider::TFillSettings fillSettings = NCommon::GetFillSettings(result.Ref());
auto settings = State->Settings->WithFillSettings(fillSettings);
if (!settings->_RowsLimitPerWrite.Get() && !settings->_AllResultsBytesLimit.Get()) {
@@ -690,12 +696,6 @@ private:
TString type;
TVector<TString> columns;
GetResultType(&type, &columns, result.Ref(), result.Input().Ref());
- TString lambda;
- bool untrustedUdfFlag;
- int level;
- TUploadList uploadList;
-
- bool enableLocalRun = true;
TPublicIds::TPtr publicIds = std::make_shared<TPublicIds>();
VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) {
@@ -720,9 +720,24 @@ private:
&& integration->PrepareFullResultTableParams(result.Ref(), ctx, graphParams, secureParams);
settings->EnableFullResultWrite = enableFullResultWrite;
}
+
+ int level;
+ TExprNode::TPtr resInput = WrapLambdaBody(level, result.Input().Ptr(), ctx);
+ {
+ auto block = MeasureBlock("PeepHole");
+ if (const auto status = PeepHole(resInput, resInput, ctx); status.Level != TStatus::Ok) {
+ return SyncStatus(status);
+ }
+ }
+
+ TString lambda;
+ bool untrustedUdfFlag;
+ TUploadList uploadList;
+
+ bool enableLocalRun = true;
+
NThreading::TFuture<IDqGateway::TResult> future;
bool localRun = false;
-
// try to prepare lambda with localRun 'on' and 'off'
for (int i = 0; i < 2 && !future.Initialized(); i++) {
uploadList.clear();
@@ -730,9 +745,8 @@ private:
auto lambdaResult = GetLambda(
&lambda,
&untrustedUdfFlag,
- &level,
&uploadList,
- result,
+ resInput,
ctx,
hasGraphParams,
enableLocalRun);
@@ -1361,15 +1375,13 @@ private:
if (TDqStageBase::Match(node.Get())) {
auto stage = TDqStageBase(node);
for (const auto& input : stage.Inputs()) {
- if (auto maybePrecompute = input.Maybe<TDqPhyPrecompute>()) {
- if (!input.Ref().HasResult() && input.Ref().GetState() != TExprNode::EState::Error) {
- hasPrecompute = true;
- if (input.Ref().StartsExecution() || !FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes)) {
- precomputes[input.Raw()] = input.Ptr();
- }
- }
- } else {
- hasPrecompute = FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes) || hasPrecompute;
+ hasPrecompute = FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes) || hasPrecompute;
+ }
+ } else if (TDqPhyPrecompute::Match(node.Get())) {
+ if (!node->HasResult() && node->GetState() != TExprNode::EState::Error) {
+ hasPrecompute = true;
+ if (node->StartsExecution() || !FindIndependentPrecomputesImpl(node->HeadPtr(), precomputes, visitedNodes)) {
+ precomputes[node.Get()] = node;
}
}
} else {