diff options
| author | lucius <[email protected]> | 2025-01-20 15:08:43 +0300 |
|---|---|---|
| committer | lucius <[email protected]> | 2025-01-20 15:37:42 +0300 |
| commit | f7afbbd9f19b8800552a7b50e6e9d0f7629ebfff (patch) | |
| tree | 02615b59a5506886b00f4f58068f37e00dabc4a6 | |
| parent | c098dc75e35577bb93598b39c550f53387919ef6 (diff) | |
YQL-19382 pushdown filters to YT QL
commit_hash:ce6e236740c1a960e02ea431aa677fe89e91b7a8
39 files changed, 717 insertions, 18 deletions
diff --git a/yt/yql/providers/yt/common/yql_configuration.h b/yt/yql/providers/yt/common/yql_configuration.h index 9c6897c0665..f84ca7ef60b 100644 --- a/yt/yql/providers/yt/common/yql_configuration.h +++ b/yt/yql/providers/yt/common/yql_configuration.h @@ -112,4 +112,7 @@ constexpr bool DEFAULT_DISABLE_FUSE_OPERATIONS = false; constexpr bool DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS = false; +constexpr bool DEFAULT_USE_QL_FILTER = false; +constexpr bool DEFAULT_PRUNE_QL_FILTER_LAMBDA = true; + } // NYql diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp index edf95828850..21f23aa697c 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.cpp +++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp @@ -444,6 +444,8 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx) REGISTER_SETTING(*this, UseNewPredicateExtraction); REGISTER_SETTING(*this, PruneKeyFilterLambda); REGISTER_SETTING(*this, DqPruneKeyFilterLambda); + REGISTER_SETTING(*this, UseQLFilter); + REGISTER_SETTING(*this, PruneQLFilterLambda); REGISTER_SETTING(*this, MergeAdjacentPointRanges); REGISTER_SETTING(*this, KeyFilterForStartsWith); REGISTER_SETTING(*this, MaxKeyRangeCount).Upper(10000); diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h index 1163f9ccbb7..2667dfa36f4 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.h +++ b/yt/yql/providers/yt/common/yql_yt_settings.h @@ -274,6 +274,8 @@ struct TYtSettings { NCommon::TConfSetting<bool, false> UseNewPredicateExtraction; NCommon::TConfSetting<bool, false> PruneKeyFilterLambda; NCommon::TConfSetting<bool, false> DqPruneKeyFilterLambda; + NCommon::TConfSetting<bool, false> UseQLFilter; + NCommon::TConfSetting<bool, false> PruneQLFilterLambda; NCommon::TConfSetting<bool, false> MergeAdjacentPointRanges; NCommon::TConfSetting<bool, false> KeyFilterForStartsWith; NCommon::TConfSetting<ui64, false> MaxKeyRangeCount; diff --git a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json index d6508aef6c8..d7ff47f5441 100644 --- a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json +++ b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json @@ -492,6 +492,15 @@ {"Index": 1, "Name": "UserStateType", "Type": "TExprBase"}, {"Index": 2, "Name": "ProcessStateKey", "Type": "TCoAtom"} ] + }, + { + "Name": "TYtQLFilter", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtQLFilter"}, + "Children": [ + {"Index": 0, "Name": "RowType", "Type": "TExprBase"}, + {"Index": 1, "Name": "Predicate", "Type": "TCoLambda"} + ] } ] } diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp index beb330b74be..f2043767d11 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp @@ -357,6 +357,29 @@ TRuntimeNode ApplyPathRangesAndSampling(TRuntimeNode inputList, TType* itemType, return inputList; } +TRuntimeNode ApplyQLFilter(TRuntimeNode inputList, const TYtTransientOpBase& ytOp, NCommon::TMkqlBuildContext& ctx) { + if (!ytOp.Maybe<TYtMap>() && !ytOp.Maybe<TYtMapReduce>() && !ytOp.Maybe<TYtMerge>()) { + return inputList; + } + + const auto qlFilterSetting = NYql::GetSetting(ytOp.Settings().Ref(), EYtSettingType::QLFilter); + if (!qlFilterSetting) { + return inputList; + } + + const auto qlFilterNode = qlFilterSetting->Child(1); + YQL_ENSURE(qlFilterNode && qlFilterNode->IsCallable("YtQLFilter")); + const TYtQLFilter qlFilter(qlFilterNode); + const auto arg = qlFilter.Predicate().Args().Arg(0).Raw(); + const auto body = qlFilter.Predicate().Body().Raw(); + const auto lambdaId = qlFilter.Predicate().Ref().UniqueId(); + + return ctx.ProgramBuilder.OrderedFilter(inputList, [&] (TRuntimeNode item) -> TRuntimeNode { + NCommon::TMkqlBuildContext innerCtx(ctx, {{arg, item}}, lambdaId); + return NCommon::MkqlBuildExpr(*body, innerCtx); + }); +} + TRuntimeNode ToList(TRuntimeNode list, NCommon::TMkqlBuildContext& ctx) { const auto listType = list.GetStaticType(); if (listType->IsOptional()) { @@ -598,6 +621,7 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { ytOp.DataSink().Cluster().Value(), ytOp.Input().Ref(), ctx, THashSet<TString>{"num", "index"}, forceKeyColumns); values = ApplyPathRangesAndSampling(values, mkqlInputType, ytOp.Input().Ref(), ctx); + values = ApplyQLFilter(values, ytOp, ctx); if ((ytOp.Maybe<TYtMerge>() && outTableInfo.RowSpec->IsSorted() && ytOp.Input().Item(0).Paths().Size() > 1) || ytOp.Maybe<TYtSort>()) @@ -627,6 +651,7 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { values = arg->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow ? ctx.ProgramBuilder.ToFlow(values) : ctx.ProgramBuilder.Iterator(values, {}); values = ApplyPathRangesAndSampling(values, itemType, ytMap.Input().Ref(), ctx); + values = ApplyQLFilter(values, ytMap, ctx); auto& lambdaInputType = GetSeqItemType(*ytMap.Mapper().Args().Arg(0).Ref().GetTypeAnn()); auto& lambdaOutputType = GetSeqItemType(*ytMap.Mapper().Body().Ref().GetTypeAnn()); @@ -826,6 +851,7 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { THashSet<TString>{"num", "index"}, forceKeyColumns); values = ApplyPathRangesAndSampling(values, itemType, ytMapReduce.Input().Ref(), ctx); + values = ApplyQLFilter(values, ytMapReduce, ctx); const auto outputItemType = BuildOutputType(ytMapReduce.Output(), ctx); const size_t outputsCount = ytMapReduce.Output().Ref().ChildrenSize(); diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index d8038fb20b8..16dac196f4e 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -213,6 +213,220 @@ TString DebugPath(NYT::TRichYPath path) { return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text) + " (" + std::to_string(numColumns) + " columns)"; } +void GetIntegerConstraints(const TExprNode::TPtr& column, bool& isSigned, ui64& minValueAbs, ui64& maxValueAbs) { + EDataSlot toType = column->GetTypeAnn()->Cast<TDataExprType>()->GetSlot(); + + // AllowIntegralConversion (may consider some refactoring) + if (toType == EDataSlot::Uint8) { + isSigned = false; + minValueAbs = 0; + maxValueAbs = Max<ui8>(); + } + else if (toType == EDataSlot::Uint16) { + isSigned = false; + minValueAbs = 0; + maxValueAbs = Max<ui16>(); + } + else if (toType == EDataSlot::Uint32) { + isSigned = false; + minValueAbs = 0; + maxValueAbs = Max<ui32>(); + } + else if (toType == EDataSlot::Uint64) { + isSigned = false; + minValueAbs = 0; + maxValueAbs = Max<ui64>(); + } + else if (toType == EDataSlot::Int8) { + isSigned = true; + minValueAbs = (ui64)Max<i8>() + 1; + maxValueAbs = (ui64)Max<i8>(); + } + else if (toType == EDataSlot::Int16) { + isSigned = true; + minValueAbs = (ui64)Max<i16>() + 1; + maxValueAbs = (ui64)Max<i16>(); + } + else if (toType == EDataSlot::Int32) { + isSigned = true; + minValueAbs = (ui64)Max<i32>() + 1; + maxValueAbs = (ui64)Max<i32>(); + } + else if (toType == EDataSlot::Int64) { + isSigned = true; + minValueAbs = (ui64)Max<i64>() + 1; + maxValueAbs = (ui64)Max<i64>(); + } else { + YQL_ENSURE(false, "unexpected integer node type"); + } +} + +void QuoteColumnForQL(const TStringBuf columnName, TStringBuilder& result) { + result << '`'; + if (!columnName.Contains('`')) { + result << columnName; + } else { + for (const auto c : columnName) { + if (c == '`') { + result << "\\`"; + } else { + result << c; + } + } + } + result << '`'; +} + +void GenerateInputQueryIntegerComparison(const TStringBuf& opName, const TExprNode::TPtr& intColumn, const TExprNode::TPtr& intValue, TStringBuilder& result) { + bool columnsIsSigned; + ui64 minValueAbs; + ui64 maxValueAbs; + GetIntegerConstraints(intColumn, columnsIsSigned, minValueAbs, maxValueAbs); + + const auto maybeInt = TMaybeNode<TCoIntegralCtor>(intValue); + YQL_ENSURE(maybeInt); + bool hasSign; + bool isSigned; + ui64 valueAbs; + ExtractIntegralValue(maybeInt.Ref(), false, hasSign, isSigned, valueAbs); + + if (!hasSign && valueAbs > maxValueAbs) { + // value is greater than maximum + if (opName == ">" || opName == ">=" || opName == "==") { + result << "FALSE"; + } else { + result << "TRUE"; + } + } else if (hasSign && valueAbs > minValueAbs) { + // value is less than minimum + if (opName == "<" || opName == "<=" || opName == "==") { + result << "FALSE"; + } else { + result << "TRUE"; + } + } else { + // value is in the range + const auto columnName = intColumn->ChildPtr(1)->Content(); + const auto valueStr = maybeInt.Cast().Literal().Value(); + QuoteColumnForQL(columnName, result); + result << " " << opName << " " << valueStr; + } +} + +void GenerateInputQueryComparison(const TCoCompare& op, TStringBuilder& result) { + YQL_ENSURE(op.Ref().IsCallable({"<", "<=", ">", ">=", "==", "!="})); + const auto left = op.Left().Ptr(); + const auto right = op.Right().Ptr(); + + if (left->IsCallable("Member")) { + GenerateInputQueryIntegerComparison(op.CallableName(), left, right, result); + } else { + YQL_ENSURE(right->IsCallable("Member")); + auto invertedOp = op.CallableName(); + if (invertedOp == "<") { + invertedOp = ">"; + } else if (invertedOp == "<=") { + invertedOp = ">="; + } else if (invertedOp == ">") { + invertedOp = "<"; + } else if (invertedOp == ">=") { + invertedOp = "<="; + } + GenerateInputQueryIntegerComparison(invertedOp, right, left, result); + } +} + +void GenerateInputQueryWhereExpression(const TExprNode::TPtr& node, TStringBuilder& result) { + if (const auto maybeCompare = TMaybeNode<TCoCompare>(node)) { + GenerateInputQueryComparison(maybeCompare.Cast(), result); + } else if (node->IsCallable("Not")) { + result << "NOT ("; + GenerateInputQueryWhereExpression(node->ChildPtr(0), result); + result << ")"; + } else if (node->IsCallable({"And", "Or"})) { + const TStringBuf op = node->IsCallable("And") ? "AND" : "OR"; + + result << "("; + GenerateInputQueryWhereExpression(node->Child(0), result); + result << ")"; + + const auto size = node->ChildrenSize(); + for (TExprNode::TListType::size_type i = 1U; i < size; ++i) { + result << " " << op << " ("; + GenerateInputQueryWhereExpression(node->Child(i), result); + result << ")"; + }; + } else { + YQL_ENSURE(false, "unexpected node type"); + } +} + +TString GenerateInputQuery(const TExprNode& settings, const TVector<TInputInfo>& inputs) { + auto qlFilterNode = NYql::GetSetting(settings, EYtSettingType::QLFilter); + if (!qlFilterNode) { + return ""; + } + + // TODO: how to handle multiple inputs? + YQL_ENSURE(inputs.size() == 1, "YtQLFilter: multiple inputs are not supported"); + + qlFilterNode = qlFilterNode->ChildPtr(1); + YQL_ENSURE(qlFilterNode && qlFilterNode->IsCallable("YtQLFilter")); + const TYtQLFilter qlFilter(qlFilterNode); + const auto predicate = qlFilter.Predicate().Body(); + + TStringBuilder result; + bool foundSomeColumns = false; + for (const auto& table: inputs) { + YQL_ENSURE(table.Path.Columns_ && table.Path.Columns_->Parts_, "YtQLFilter: TRichYPath should have columns"); + for (const auto& column: table.Path.Columns_->Parts_) { + if (foundSomeColumns) { + result << ", "; + } + QuoteColumnForQL(column, result); + foundSomeColumns = true; + } + } + if (!foundSomeColumns) { + result << "*"; + } + result << " WHERE "; + GenerateInputQueryWhereExpression(predicate.Ptr(), result); + YQL_CLOG(INFO, ProviderYt) << __FUNCTION__ << ": " << result; + return result; +} + +void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery) { + spec["input_query"] = inputQuery; + spec["input_query_filter_options"]["enable_chunk_filter"] = true; + spec["input_query_filter_options"]["enable_row_filter"] = true; +} + +void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& inputQuery) { + // YQL-19382 + if (inputQuery) { + for (auto& path : paths) { + path.Columns_.Clear(); + } + SetInputQuerySpec(spec, inputQuery); + } +} + +template <typename T> +void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& inputQuery, const bool useSkiff) { + // YQL-19382 + if (inputQuery) { + YQL_ENSURE(!useSkiff, "QLFilter can't work with skiff on map/mapreduce right now, try with PRAGMA yt.UseSkiff='false'"); + const auto& inputs = specWithPaths.GetInputs(); + for (size_t i = 0; i < inputs.size(); ++i) { + auto path = inputs[i]; + path.Columns_.Clear(); + specWithPaths.SetInput(i, path); + } + SetInputQuerySpec(spec, inputQuery); + } +} + } // unnamed /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -3378,9 +3592,10 @@ private: bool forceTransform = NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform); bool combineChunks = NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::CombineChunks); TMaybe<ui64> limit = GetLimit(merge.Settings().Ref()); + const TString inputQuery = GenerateInputQuery(merge.Settings().Ref(), execCtx->InputTables_); - return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, execCtx]() { - return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, execCtx] (const auto& f) { + return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, inputQuery, execCtx]() { + return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, inputQuery, execCtx] (const auto& f) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); auto entry = execCtx->GetEntry(); bool cacheHit = f.GetValue(); @@ -3425,6 +3640,8 @@ private: spec["schema_inference_mode"] = "from_output"; // YTADMINREQ-17692 } + PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQuery); + return execCtx->RunOperation([entry, mergeOpSpec = std::move(mergeOpSpec), spec = std::move(spec)](){ return entry->Tx->Merge(mergeOpSpec, TOperationOptions().StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec)); }); @@ -3442,12 +3659,13 @@ private: TString mapLambda, const TString& inputType, const TExpressionResorceUsage& extraUsage, + const TString& inputQuery, const TExecContext<TRunOptions>::TPtr& execCtx ) { const bool testRun = execCtx->Config_->GetLocalChainTest(); TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync(); return ret.Apply([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, - inputType, extraUsage, execCtx, testRun] (const auto& f) mutable + inputType, extraUsage, inputQuery, execCtx, testRun] (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); TTransactionCache::TEntry::TPtr entry; @@ -3610,6 +3828,8 @@ private: spec["job_count"] = static_cast<i64>(*jobCount); } + PrepareInputQueryForMap(spec, mapOpSpec, inputQuery, useSkiff); + TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); opOpts.StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec); @@ -3647,11 +3867,12 @@ private: } auto extraUsage = execCtx->ScanExtraResourceUsage(map.Mapper().Body().Ref(), true); TString inputType = NCommon::WriteTypeToYson(GetSequenceItemType(map.Input().Size() == 1U ? TExprBase(map.Input().Item(0)) : TExprBase(map.Mapper().Args().Arg(0)), true)); + const TString inputQuery = GenerateInputQuery(map.Settings().Ref(), execCtx->InputTables_); - return execCtx->Session_->Queue_->Async([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, execCtx]() { + return execCtx->Session_->Queue_->Async([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQuery, execCtx]() { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); execCtx->MakeUserFiles(); - return ExecMap(ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, execCtx); + return ExecMap(ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQuery, execCtx); }); } @@ -3879,13 +4100,14 @@ private: NYT::TNode intermediateMeta, const NYT::TNode& intermediateSchema, const NYT::TNode& intermediateStreams, + const TString& inputQuery, const TExecContext<TRunOptions>::TPtr& execCtx ) { const bool testRun = execCtx->Config_->GetLocalChainTest(); TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync(); return ret.Apply([reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage, reduceLambda, reduceInputType, reduceExtraUsage, - intermediateMeta, intermediateSchema, intermediateStreams, execCtx, testRun] + intermediateMeta, intermediateSchema, intermediateStreams, inputQuery, execCtx, testRun] (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); @@ -4103,6 +4325,8 @@ private: spec["mapper"]["output_streams"] = intermediateStreams; } + PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQuery, useSkiff); + TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); opOpts.StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec); @@ -4124,12 +4348,13 @@ private: const TExpressionResorceUsage& reduceExtraUsage, const NYT::TNode& intermediateSchema, bool useIntermediateStreams, + const TString& inputQuery, const TExecContext<TRunOptions>::TPtr& execCtx ) { const bool testRun = execCtx->Config_->GetLocalChainTest(); TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync(); return ret.Apply([reduceBy, sortBy, limit, sortLimitBy, reduceLambda, reduceInputType, - reduceExtraUsage, intermediateSchema, useIntermediateStreams, execCtx, testRun] + reduceExtraUsage, intermediateSchema, useIntermediateStreams, inputQuery, execCtx, testRun] (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); @@ -4247,6 +4472,8 @@ private: spec["reducer"]["enable_input_table_index"] = true; } + PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQuery, useSkiff); + TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); opOpts.StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec); @@ -4368,17 +4595,19 @@ private: limit.Clear(); } + const TString inputQuery = GenerateInputQuery(mapReduce.Settings().Ref(), execCtx->InputTables_); + return execCtx->Session_->Queue_->Async([reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage, - reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, useIntermediateStreams, execCtx]() + reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, useIntermediateStreams, inputQuery, execCtx]() { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); execCtx->MakeUserFiles(); if (mapLambda) { return ExecMapReduce(reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage, - reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, execCtx); + reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, inputQuery, execCtx); } else { return ExecMapReduce(reduceBy, sortBy, limit, sortLimitBy, reduceLambda, reduceInputType, reduceExtraUsage, intermediateSchema, - useIntermediateStreams, execCtx); + useIntermediateStreams, inputQuery, execCtx); } }); } diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp index 1107e0210dc..8db9ef743b3 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp @@ -81,6 +81,11 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T AddHandler(1, &TYtReduce::Match, HNDL(FuseReduceWithTrivialMap)); } + if (State_->Configuration->UseQLFilter.Get().GetOrElse(DEFAULT_USE_QL_FILTER)) { + // best to run after Fuse*Map and before MapToMerge + AddHandler(2, Names({TYtMap::CallableName()}), HNDL(ExtractQLFilters)); + AddHandler(2, Names({TYtQLFilter::CallableName()}), HNDL(OptimizeQLFilterType)); + } AddHandler(2, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin)); AddHandler(2, &TStatWriteTable::Match, HNDL(ReplaceStatWriteTable)); AddHandler(2, &TYtMap::Match, HNDL(MapToMerge)); diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h index 67e68ea9790..48765ee2c4a 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h @@ -74,6 +74,10 @@ private: // <cmpItem> := '(<cmpOp> <value>) NNodes::TMaybeNode<NNodes::TExprBase> ExtractKeyRangeLegacy(NNodes::TExprBase node, TExprContext& ctx) const; + NNodes::TMaybeNode<NNodes::TExprBase> ExtractQLFilters(NNodes::TExprBase node, TExprContext& ctx) const; + + NNodes::TMaybeNode<NNodes::TExprBase> OptimizeQLFilterType(NNodes::TExprBase node, TExprContext& ctx) const; + NNodes::TMaybeNode<NNodes::TExprBase> FuseReduce(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const; NNodes::TMaybeNode<NNodes::TExprBase> FuseReduceWithTrivialMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const; diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp index e70d54fffa8..a2f5fd41000 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp @@ -8,6 +8,20 @@ namespace NYql { using namespace NNodes; +namespace { + void InsertQLFilterColumns(const TYtWithUserJobsOpBase& op, TSet<TStringBuf>& memberSet) { + const auto qlFilterSetting = GetSetting(op.Settings().Ref(), EYtSettingType::QLFilter); + if (qlFilterSetting) { + const auto qlFilter = qlFilterSetting->Child(1); + YQL_ENSURE(qlFilter->IsCallable("YtQLFilter")); + const TStructExprType* qlFilterType = qlFilter->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); + for (const auto& item : qlFilterType->GetItems()) { + memberSet.insert(item->GetName()); + } + } + } +} // empty namespace + TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::LambdaFieldsSubset(TYtWithUserJobsOpBase op, size_t lambdaIdx, TExprContext& ctx, const TGetParents& getParents) const { auto lambda = TCoLambda(op.Ref().ChildPtr(lambdaIdx)); @@ -28,6 +42,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::LambdaFieldsSubset(TYtW memberSet.insert(reduceBy.cbegin(), reduceBy.cend()); auto sortBy = NYql::GetSettingAsColumnList(op.Settings().Ref(), EYtSettingType::SortBy); memberSet.insert(sortBy.cbegin(), sortBy.cend()); + InsertQLFilterColumns(op, memberSet); auto itemType = GetSeqItemType(lambda.Args().Arg(0).Ref().GetTypeAnn())->Cast<TStructExprType>(); if (memberSet.size() < itemType->GetSize()) { diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp index 31c685f4438..821666afe40 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp @@ -641,7 +641,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseInnerMap(TExprBase if (NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Flow) != NYql::HasSetting(outerMap.Settings().Ref(), EYtSettingType::Flow)) { return node; } - if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount)) { + if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount | EYtSettingType::QLFilter)) { return node; } if (!path.Ranges().Maybe<TCoVoid>()) { @@ -779,7 +779,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseOuterMap(TExprBase if (NYql::HasAnySetting(inner.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::JobCount)) { return node; } - if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputApplied)) { + if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputApplied | EYtSettingType::QLFilter)) { return node; } if (outerMap.Input().Item(0).Settings().Size() != 0) { diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp index e3230f6f329..1caead064f9 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp @@ -394,7 +394,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::MapToMerge(TExprBase no .Input() .Add(section) .Build() - .Settings(NYql::KeepOnlySettings(map.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::KeepSorted, ctx)) + .Settings(NYql::KeepOnlySettings(map.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::QLFilter, ctx)) .Done(); } @@ -409,7 +409,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::MergeToCopy(TExprBase n return node; } - if (NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks)) { + if (NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks | EYtSettingType::QLFilter)) { return node; } diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp new file mode 100644 index 00000000000..73cacf723d8 --- /dev/null +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp @@ -0,0 +1,233 @@ +#include "yql_yt_phy_opt.h" + +#include <yt/yql/providers/yt/provider/yql_yt_helpers.h> + +#include <yql/essentials/core/dq_expr_nodes/dq_expr_nodes.h> +#include <yql/essentials/core/yql_opt_utils.h> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/minikql/mkql_node.h> + +namespace NYql { + +using namespace NNodes; +using namespace NNodes::NDq; +using namespace NPrivate; + +namespace { + +bool NodeHasQLCompatibleType(const TExprNode::TPtr& node) { + bool isOptional = false; + const TDataExprType* dataType = nullptr; + if (!IsDataOrOptionalOfData(node->GetTypeAnn(), isOptional, dataType)) { + return false; + } + if (isOptional) { + return false; + } + if (!dataType) { + return false; + } + if (!IsDataTypeIntegral(dataType->GetSlot())) { + return false; + } + return true; +} + +TExprNode::TPtr CheckQLConst(const TExprNode::TPtr& node, const TExprNode::TPtr& rowArg) { + if (!NodeHasQLCompatibleType(node)) { + return nullptr; + } + if (IsDepended(*node, *rowArg)) { + return nullptr; + } + return node; +} + +TExprNode::TPtr ConvertQLMember(const TExprNode::TPtr& node, const TExprNode::TPtr& rowArg, const TExprNode::TPtr& newRowArg, TExprContext& ctx) { + if (!node->IsCallable("Member")) { + return nullptr; + } + YQL_ENSURE(node->ChildrenSize() == 2); + if (node->ChildPtr(0) != rowArg) { + return nullptr; + } + const auto memberName = node->Child(1)->Content(); + if (memberName.StartsWith("_yql_sys_")) { + return nullptr; + } + if (!NodeHasQLCompatibleType(node)) { + return nullptr; + } + auto arg = newRowArg; + return ctx.ChangeChild(*node, 0, std::move(arg)); +} + +TExprNode::TPtr ConvertQLComparison(const TExprNode::TPtr& node, const TExprNode::TPtr& rowArg, const TExprNode::TPtr& newRowArg, TExprContext& ctx) { + YQL_ENSURE(node->ChildrenSize() == 2); + TExprNode::TPtr childLeft; + TExprNode::TPtr childRight; + if (childLeft = ConvertQLMember(node->ChildPtr(0), rowArg, newRowArg, ctx)) { + childRight = CheckQLConst(node->ChildPtr(1), rowArg); + } + else if (childRight = ConvertQLMember(node->ChildPtr(1), rowArg, newRowArg, ctx)) { + childLeft = CheckQLConst(node->ChildPtr(0), rowArg); + } + if (!childLeft || !childRight) { + return nullptr; + } + return ctx.ChangeChildren(*node, {childLeft, childRight}); +} + +TExprNode::TPtr ConvertQLSubTree(const TExprNode::TPtr& node, const TExprNode::TPtr& rowArg, const TExprNode::TPtr& newRowArg, TExprContext& ctx) { + if (node->IsCallable({"And", "Or", "Not"})) { + TExprNode::TListType convertedChildren; + for (const auto& child : node->ChildrenList()) { + const auto converted = ConvertQLSubTree(child, rowArg, newRowArg, ctx); + if (!converted) { + return nullptr; + } + convertedChildren.push_back(converted); + }; + return ctx.ChangeChildren(*node, std::move(convertedChildren)); + } + if (node->IsCallable({"<", "<=", ">", ">=", "==", "!="})) { + return ConvertQLComparison(node, rowArg, newRowArg, ctx); + } + return nullptr; +} + +} // empty namespace + +TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::ExtractQLFilters(TExprBase node, TExprContext& ctx) const { + const auto opMap = node.Cast<TYtMap>(); + if (opMap.Input().Size() > 1) { + return node; + } + + const auto section = opMap.Input().Item(0); + if (NYql::HasAnySetting(opMap.Settings().Ref(), EYtSettingType::QLFilter | EYtSettingType::WeakFields)) { + return node; + } + + const auto flatMap = GetFlatMapOverInputStream(opMap.Mapper()); + if (!flatMap) { + return node; + } + + const auto flatMapLambda = flatMap.Lambda(); + if (!flatMapLambda) { + return node; + } + + const auto optionalIf = flatMapLambda.Body().Maybe<TCoOptionalIf>(); + if (!optionalIf) { + return node; + } + + const auto rowArg = flatMapLambda.Cast().Args().Arg(0).Ptr(); + const auto newRowArg = ctx.NewArgument(rowArg->Pos(), rowArg->Content()); + + TExprNode::TListType qlCompatibleParts; + TExprNode::TListType otherParts; + TExprNode::TPtr predicate = optionalIf.Cast().Predicate().Ptr(); + + if (!predicate->IsCallable("And")) { + const auto converted = ConvertQLSubTree(predicate, rowArg, newRowArg, ctx); + if (converted) { + qlCompatibleParts.push_back(converted); + } else { + otherParts.push_back(predicate); + } + } else { + for (const auto& child : predicate->ChildrenList()) { + const auto converted = ConvertQLSubTree(child, rowArg, newRowArg, ctx); + if (converted) { + qlCompatibleParts.push_back(converted); + } else { + otherParts.push_back(child); + } + }; + } + + if (qlCompatibleParts.empty()) { + return node; + } + + TExprNode::TPtr qlCompatiblePredicate; + if (qlCompatibleParts.size() == 1) { + qlCompatiblePredicate = qlCompatibleParts.front(); + } else { + qlCompatiblePredicate = ctx.NewCallable(flatMap.Cast().Pos(), "And", std::move(qlCompatibleParts)); + } + YQL_ENSURE(qlCompatiblePredicate); + + TExprNode::TPtr prunedPredicate; + if (otherParts.empty()) { + prunedPredicate = MakeBool<true>(predicate->Pos(), ctx); + } else if (otherParts.size() == 1) { + prunedPredicate = otherParts.front(); + } else { + prunedPredicate = ctx.NewCallable(predicate->Pos(), "And", std::move(otherParts)); + } + YQL_ENSURE(prunedPredicate); + + const auto typeNode = ExpandType(rowArg->Pos(), *rowArg->GetTypeAnn(), ctx); + const auto lambdaNode = ctx.NewLambda(qlCompatiblePredicate->Pos(), ctx.NewArguments(qlCompatiblePredicate->Pos(), {newRowArg}), std::move(qlCompatiblePredicate)); + const auto qlFilter = ctx.NewCallable(flatMap.Cast().Pos(), "YtQLFilter", {typeNode, lambdaNode}); + + auto newOpMap = ctx.ChangeChild(opMap.Ref(), TYtMap::idx_Settings, NYql::AddSetting(opMap.Settings().Ref(), EYtSettingType::QLFilter, qlFilter, ctx)); + const bool pruneLambda = State_->Configuration->PruneQLFilterLambda.Get().GetOrElse(DEFAULT_PRUNE_QL_FILTER_LAMBDA); + if (pruneLambda) { + const auto newFlatMap = Build<TCoFlatMapBase>(ctx, flatMap.Cast().Pos()) + .InitFrom(flatMap.Cast()) + .Lambda() + .InitFrom(flatMapLambda.Cast()) + .Body<TCoOptionalIf>() + .InitFrom(optionalIf.Cast()) + .Predicate(prunedPredicate) + .Build() + .Build() + .Done().Ptr(); + + const TOptimizeExprSettings settings{State_->Types}; + const TNodeOnNodeOwnedMap remaps{{flatMap.Cast().Raw(), newFlatMap}}; + const auto status = RemapExpr(newOpMap, newOpMap, remaps, ctx, settings); + YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Error); + } + return newOpMap; +} + +TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::OptimizeQLFilterType(TExprBase node, TExprContext& ctx) const { + const auto qlFilter = node.Cast<TYtQLFilter>(); + const auto rowType = qlFilter.RowType(); + + const auto arg = qlFilter.Predicate().Args().Arg(0).Ptr(); + TSet<TStringBuf> memberSet; + VisitExpr(qlFilter.Predicate().Body().Ptr(), [&arg, &memberSet](const TExprNode::TPtr& n) { + if (n->IsCallable("Member")) { + if (n->ChildPtr(0) == arg) { + const auto member = n->Child(1); + YQL_ENSURE(member->IsAtom()); + memberSet.insert(member->Content()); + } + return false; + } + return true; + }); + + TExprNode::TListType newRowTypeChildren; + for (const auto& child : rowType.Ptr()->ChildrenList()) { + const auto name = child->Child(0); + if (memberSet.contains(name->Content())) { + newRowTypeChildren.push_back(child); + } + } + if (newRowTypeChildren.size() == rowType.Ptr()->ChildrenSize()) { + return node; + } + + auto newRowType = ctx.ChangeChildren(rowType.Ref(), std::move(newRowTypeChildren)); + return ctx.ChangeChild(qlFilter.Ref(), TYtQLFilter::idx_RowType, std::move(newRowType)); +} + +} // namespace NYql diff --git a/yt/yql/providers/yt/provider/ya.make b/yt/yql/providers/yt/provider/ya.make index f8b57a915eb..719850bb8b0 100644 --- a/yt/yql/providers/yt/provider/ya.make +++ b/yt/yql/providers/yt/provider/ya.make @@ -61,6 +61,7 @@ SRCS( phy_opt/yql_yt_phy_opt_field_subset.cpp phy_opt/yql_yt_phy_opt_helper.cpp phy_opt/yql_yt_phy_opt_key_range.cpp + phy_opt/yql_yt_phy_opt_ytql.cpp phy_opt/yql_yt_phy_opt_merge.cpp phy_opt/yql_yt_phy_opt_push.cpp phy_opt/yql_yt_phy_opt_write.cpp diff --git a/yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp b/yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp index c9978f95aa1..1ad5173187e 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp @@ -43,6 +43,7 @@ public: AddHandler({TYtDqProcessWrite ::CallableName()}, Hndl(&TYtDataSinkConstraintTransformer::HandleDqProcessWrite)); AddHandler({TYtTryFirst ::CallableName()}, Hndl(&TYtDataSinkConstraintTransformer::HandleTryFirst)); AddHandler({TYtMaterialize ::CallableName()}, Hndl(&TYtDataSinkConstraintTransformer::HandleMaterialize)); + AddHandler({TYtQLFilter::CallableName()}, Hndl(&TYtDataSinkConstraintTransformer::HandleDefault)); } private: static void CopyExcept(TExprNode* dst, const TExprNode& from, const TStringBuf& except) { diff --git a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp index 56b1d6866ff..9bfc7af690a 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp @@ -115,6 +115,7 @@ public: AddHandler({TYtDqWideWrite::CallableName()}, Hndl(&TYtDataSinkTypeAnnotationTransformer::HandleDqWrite<true>)); AddHandler({TYtTryFirst::CallableName()}, Hndl(&TYtDataSinkTypeAnnotationTransformer::HandleTryFirst)); AddHandler({TYtMaterialize::CallableName()}, Hndl(&TYtDataSinkTypeAnnotationTransformer::HandleMaterialize)); + AddHandler({TYtQLFilter::CallableName()}, Hndl(&TYtDataSinkTypeAnnotationTransformer::HandleQLFilter)); } private: @@ -1013,7 +1014,7 @@ private: auto merge = TYtMerge(input); - if (!ValidateSettings(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks | EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::NoDq, ctx)) { + if (!ValidateSettings(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks | EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::NoDq | EYtSettingType::QLFilter, ctx)) { return TStatus::Error; } @@ -1097,7 +1098,8 @@ private: | EYtSettingType::BlockInputReady | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputReady - | EYtSettingType::BlockOutputApplied; + | EYtSettingType::BlockOutputApplied + | EYtSettingType::QLFilter; if (!ValidateSettings(map.Settings().Ref(), accpeted, ctx)) { return TStatus::Error; } @@ -1297,7 +1299,8 @@ private: | EYtSettingType::KeySwitch | EYtSettingType::MapOutputType | EYtSettingType::ReduceInputType - | EYtSettingType::NoDq; + | EYtSettingType::NoDq + | EYtSettingType::QLFilter; if (!ValidateSettings(mapReduce.Settings().Ref(), acceptedSettings, ctx)) { return TStatus::Error; } @@ -2142,6 +2145,38 @@ private: return TStatus::Ok; } + TStatus HandleQLFilter(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 2, ctx)) { + return TStatus::Error; + } + + const auto& type = input->Child(0); + if (!EnsureTypeWithStructType(*type, ctx)) { + return TStatus::Error; + } + + auto& lambda = input->ChildRef(1); + const auto status = ConvertToLambda(lambda, ctx, 1); + if (status.Level != IGraphTransformer::TStatus::Ok) { + return status; + } + + if (!UpdateLambdaAllArgumentsTypes(lambda, {type->GetTypeAnn()->Cast<TTypeExprType>()->GetType()}, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + if (!lambda->GetTypeAnn()) { + return IGraphTransformer::TStatus::Repeat; + } + + if (!EnsureSpecificDataType(*lambda, EDataSlot::Bool, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + input->SetTypeAnn(ctx.MakeType<TUnitExprType>()); + return TStatus::Ok; + } + private: const TYtState::TPtr State_; }; diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp index a47ae7b7c81..97b9fdeda92 100644 --- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp @@ -573,6 +573,25 @@ bool IsConstExpSortDirections(NNodes::TExprBase sortDirections) { return false; } +void GetNodesToCalculateFromQLFilter(const TExprNode& qlFilter, TExprNode::TListType &needCalc, TNodeSet &uniqNodes) { + YQL_ENSURE(qlFilter.IsCallable("YtQLFilter")); + const auto lambdaBody = qlFilter.Child(1)->Child(1); + VisitExpr(lambdaBody, [&needCalc, &uniqNodes](const TExprNode::TPtr& node) { + if (node->IsCallable({"And", "Or", "Not", "<", "<=", ">", ">=", "==", "!="})) { + return true; + } + if (node->IsCallable("Member")) { + return false; + } + if (uniqNodes.insert(node.Get()).second) { + if (NeedCalc(TExprBase(node.Get()))) { + needCalc.push_back(node); + } + } + return false; + }); +} + TExprNode::TListType GetNodesToCalculate(const TExprNode::TPtr& input) { TExprNode::TListType needCalc; TNodeSet uniqNodes; @@ -592,6 +611,9 @@ TExprNode::TListType GetNodesToCalculate(const TExprNode::TPtr& input) { } } break; + case EYtSettingType::QLFilter: + GetNodesToCalculateFromQLFilter(setting.Value().Cast().Ref(), needCalc, uniqNodes); + break; default: break; } diff --git a/yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp b/yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp index fdc3b118269..13d426e13dd 100644 --- a/yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp @@ -54,7 +54,7 @@ bool THorizontalJoinBase::IsGoodForHorizontalJoin(TYtMap map) const { } // Map has output limit or is sharded MapJoin - if (NYql::HasAnySetting(map.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::Sharded | EYtSettingType::JobCount | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputApplied)) { + if (NYql::HasAnySetting(map.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::Sharded | EYtSettingType::JobCount | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputApplied | EYtSettingType::QLFilter)) { return false; } diff --git a/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp b/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp index d54ec130778..2d1ebbc5025 100644 --- a/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp @@ -917,6 +917,17 @@ bool ValidateSettings(const TExprNode& settingsNode, EYtSettingTypes accepted, T } return true; } + case EYtSettingType::QLFilter: { + if (!EnsureTupleSize(*setting, 2, ctx)) { + return false; + } + const auto qlFilter = setting->Child(1); + if (!qlFilter->IsCallable("YtQLFilter")) { + ctx.AddError(TIssue(ctx.GetPosition(qlFilter->Pos()), TStringBuilder() + << "Expected YtQLFilter node, got: " << qlFilter->Content())); + } + break; + } case EYtSettingType::LAST: { YQL_ENSURE(false, "Unexpected EYtSettingType"); } diff --git a/yt/yql/providers/yt/provider/yql_yt_op_settings.h b/yt/yql/providers/yt/provider/yql_yt_op_settings.h index f249a51ddfb..e99450e1081 100644 --- a/yt/yql/providers/yt/provider/yql_yt_op_settings.h +++ b/yt/yql/providers/yt/provider/yql_yt_op_settings.h @@ -101,6 +101,7 @@ enum class EYtSettingType: ui64 { BlockInputApplied /* "blockInputApplied" */, // hybrid supported BlockOutputReady /* "blockOutputReady" */, // hybrid supported BlockOutputApplied /* "blockOutputApplied" */, // hybrid supported + QLFilter /* "qlFilter" */, // Out tables UniqueBy /* "uniqueBy" */, OpHash /* "opHash" */, diff --git a/yt/yql/tests/sql/suites/ql_filter/integer.txt b/yt/yql/tests/sql/suites/ql_filter/integer.txt new file mode 100644 index 00000000000..e4a8e247c95 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer.txt @@ -0,0 +1,10 @@ +{"a"=1;"b"=1;"c"=1u;"d"=1;"e"=1u;"escaping []`"=1}; +{"a"=2;"b"=-2;"c"=2u;"d"=10;"e"=10u;"escaping []`"=2}; +{"a"=-3;"b"=3;"c"=3u;"d"=-100;"e"=100u;"escaping []`"=3}; +{"a"=4;"b"=4;"c"=4u;"d"=1000;"e"=1000u;"escaping []`"=4}; +{"a"=5;"b"=-5;"c"=5u;"d"=10000;"e"=10000u;"escaping []`"=5}; +{"a"=-6;"b"=6;"c"=6u;"d"=-100000;"e"=100000u;"escaping []`"=6}; +{"a"=7;"b"=7;"c"=7u;"d"=1000000;"e"=1000000u;"escaping []`"=7}; +{"a"=8;"b"=-8;"c"=8u;"d"=10000000;"e"=10000000u;"escaping []`"=8}; +{"a"=-9;"b"=9;"c"=9u;"d"=-100000000;"e"=100000000u;"escaping []`"=9}; +{"a"=10;"b"=10;"c"=10u;"d"=1000000000;"e"=1000000000u;"escaping []`"=10}; diff --git a/yt/yql/tests/sql/suites/ql_filter/integer.txt.attr b/yt/yql/tests/sql/suites/ql_filter/integer.txt.attr new file mode 100644 index 00000000000..eb0ef07a6ef --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer.txt.attr @@ -0,0 +1,12 @@ +{ + "_yql_row_spec"={ + "Type"=["StructType";[ + ["a";["DataType";"Int32"]]; + ["b";["DataType";"Int32"]]; + ["c";["DataType";"Uint32"]]; + ["d";["DataType";"Int64"]]; + ["e";["DataType";"Uint64"]]; + ["escaping []`";["DataType";"Int32"]] + ]] + } +} diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_bounds.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql new file mode 100644 index 00000000000..1f763a6b417 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql @@ -0,0 +1,8 @@ +pragma yt.UseQLFilter; +PRAGMA yt.UseSkiff='false'; + +select c +from plato.Input +where + c > -1 + AND a < 18446744073709551615; diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_escaping.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql new file mode 100644 index 00000000000..3e074e24e60 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql @@ -0,0 +1,8 @@ +pragma yt.UseQLFilter; +PRAGMA yt.UseSkiff='false'; + +select + `escaping []\``, + `escaping []\`` as renamed +from plato.Input +where `escaping []\`` > 5; diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_eval.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_eval.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_eval.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_eval.sql b/yt/yql/tests/sql/suites/ql_filter/integer_eval.sql new file mode 100644 index 00000000000..a7772686f7b --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_eval.sql @@ -0,0 +1,5 @@ +pragma yt.UseQLFilter; + +select a +from plato.Input +where a > 2+3;
\ No newline at end of file diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_left.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql new file mode 100644 index 00000000000..fa2d5840773 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql @@ -0,0 +1,13 @@ +pragma yt.UseQLFilter; +pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644 + +select a, c, d, e +from plato.Input +where + a > 5 + and + c > 5 + and + d > 5 + and + e > 5;
\ No newline at end of file diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_right.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql new file mode 100644 index 00000000000..56051ba52b5 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql @@ -0,0 +1,13 @@ +pragma yt.UseQLFilter; +pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644 + +select a, c, d, e +from plato.Input +where + 5 < a + and + 5 < c + and + 5 < d + and + 5 < e;
\ No newline at end of file diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_members.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_members.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_members.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_members.sql b/yt/yql/tests/sql/suites/ql_filter/integer_members.sql new file mode 100644 index 00000000000..a9640db73c1 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_members.sql @@ -0,0 +1,5 @@ +pragma yt.UseQLFilter; + +select a, b +from plato.Input +where a > b;
\ No newline at end of file diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.sql b/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.sql new file mode 100644 index 00000000000..a2f57ebbffc --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.sql @@ -0,0 +1,5 @@ +pragma yt.UseQLFilter; + +select a, b +from plato.Input +where a - b + 5 > 0;
\ No newline at end of file diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_select_other.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql new file mode 100644 index 00000000000..bdf2ebbcd26 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql @@ -0,0 +1,7 @@ +pragma yt.UseQLFilter; +pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644 + +select b +from plato.Input +where + a > 5;
\ No newline at end of file diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_single.cfg new file mode 100644 index 00000000000..d0ce4581d7d --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_single.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single.sql b/yt/yql/tests/sql/suites/ql_filter/integer_single.sql new file mode 100644 index 00000000000..73d6933bd55 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_single.sql @@ -0,0 +1,5 @@ +pragma yt.UseQLFilter; + +select a +from plato.Input +where a > 5; |
