summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlucius <[email protected]>2025-01-20 15:08:43 +0300
committerlucius <[email protected]>2025-01-20 15:37:42 +0300
commitf7afbbd9f19b8800552a7b50e6e9d0f7629ebfff (patch)
tree02615b59a5506886b00f4f58068f37e00dabc4a6
parentc098dc75e35577bb93598b39c550f53387919ef6 (diff)
YQL-19382 pushdown filters to YT QL
commit_hash:ce6e236740c1a960e02ea431aa677fe89e91b7a8
-rw-r--r--yt/yql/providers/yt/common/yql_configuration.h3
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.cpp2
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.h2
-rw-r--r--yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json9
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp26
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp249
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp5
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h4
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp15
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp4
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp4
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp233
-rw-r--r--yt/yql/providers/yt/provider/ya.make1
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp1
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp41
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_helpers.cpp22
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp2
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_op_settings.cpp11
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_op_settings.h1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer.txt10
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer.txt.attr12
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_bounds.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql8
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_escaping.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql8
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_eval.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_eval.sql5
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_many_left.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql13
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_many_right.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql13
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_members.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_members.sql5
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_members_eval.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_members_eval.sql5
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_select_other.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql7
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_single.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_single.sql5
39 files changed, 717 insertions, 18 deletions
diff --git a/yt/yql/providers/yt/common/yql_configuration.h b/yt/yql/providers/yt/common/yql_configuration.h
index 9c6897c0665..f84ca7ef60b 100644
--- a/yt/yql/providers/yt/common/yql_configuration.h
+++ b/yt/yql/providers/yt/common/yql_configuration.h
@@ -112,4 +112,7 @@ constexpr bool DEFAULT_DISABLE_FUSE_OPERATIONS = false;
constexpr bool DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS = false;
+constexpr bool DEFAULT_USE_QL_FILTER = false;
+constexpr bool DEFAULT_PRUNE_QL_FILTER_LAMBDA = true;
+
} // NYql
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp
index edf95828850..21f23aa697c 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp
@@ -444,6 +444,8 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
REGISTER_SETTING(*this, UseNewPredicateExtraction);
REGISTER_SETTING(*this, PruneKeyFilterLambda);
REGISTER_SETTING(*this, DqPruneKeyFilterLambda);
+ REGISTER_SETTING(*this, UseQLFilter);
+ REGISTER_SETTING(*this, PruneQLFilterLambda);
REGISTER_SETTING(*this, MergeAdjacentPointRanges);
REGISTER_SETTING(*this, KeyFilterForStartsWith);
REGISTER_SETTING(*this, MaxKeyRangeCount).Upper(10000);
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h
index 1163f9ccbb7..2667dfa36f4 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.h
+++ b/yt/yql/providers/yt/common/yql_yt_settings.h
@@ -274,6 +274,8 @@ struct TYtSettings {
NCommon::TConfSetting<bool, false> UseNewPredicateExtraction;
NCommon::TConfSetting<bool, false> PruneKeyFilterLambda;
NCommon::TConfSetting<bool, false> DqPruneKeyFilterLambda;
+ NCommon::TConfSetting<bool, false> UseQLFilter;
+ NCommon::TConfSetting<bool, false> PruneQLFilterLambda;
NCommon::TConfSetting<bool, false> MergeAdjacentPointRanges;
NCommon::TConfSetting<bool, false> KeyFilterForStartsWith;
NCommon::TConfSetting<ui64, false> MaxKeyRangeCount;
diff --git a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
index d6508aef6c8..d7ff47f5441 100644
--- a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
+++ b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
@@ -492,6 +492,15 @@
{"Index": 1, "Name": "UserStateType", "Type": "TExprBase"},
{"Index": 2, "Name": "ProcessStateKey", "Type": "TCoAtom"}
]
+ },
+ {
+ "Name": "TYtQLFilter",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "YtQLFilter"},
+ "Children": [
+ {"Index": 0, "Name": "RowType", "Type": "TExprBase"},
+ {"Index": 1, "Name": "Predicate", "Type": "TCoLambda"}
+ ]
}
]
}
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
index beb330b74be..f2043767d11 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
@@ -357,6 +357,29 @@ TRuntimeNode ApplyPathRangesAndSampling(TRuntimeNode inputList, TType* itemType,
return inputList;
}
+TRuntimeNode ApplyQLFilter(TRuntimeNode inputList, const TYtTransientOpBase& ytOp, NCommon::TMkqlBuildContext& ctx) {
+ if (!ytOp.Maybe<TYtMap>() && !ytOp.Maybe<TYtMapReduce>() && !ytOp.Maybe<TYtMerge>()) {
+ return inputList;
+ }
+
+ const auto qlFilterSetting = NYql::GetSetting(ytOp.Settings().Ref(), EYtSettingType::QLFilter);
+ if (!qlFilterSetting) {
+ return inputList;
+ }
+
+ const auto qlFilterNode = qlFilterSetting->Child(1);
+ YQL_ENSURE(qlFilterNode && qlFilterNode->IsCallable("YtQLFilter"));
+ const TYtQLFilter qlFilter(qlFilterNode);
+ const auto arg = qlFilter.Predicate().Args().Arg(0).Raw();
+ const auto body = qlFilter.Predicate().Body().Raw();
+ const auto lambdaId = qlFilter.Predicate().Ref().UniqueId();
+
+ return ctx.ProgramBuilder.OrderedFilter(inputList, [&] (TRuntimeNode item) -> TRuntimeNode {
+ NCommon::TMkqlBuildContext innerCtx(ctx, {{arg, item}}, lambdaId);
+ return NCommon::MkqlBuildExpr(*body, innerCtx);
+ });
+}
+
TRuntimeNode ToList(TRuntimeNode list, NCommon::TMkqlBuildContext& ctx) {
const auto listType = list.GetStaticType();
if (listType->IsOptional()) {
@@ -598,6 +621,7 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
ytOp.DataSink().Cluster().Value(), ytOp.Input().Ref(), ctx, THashSet<TString>{"num", "index"}, forceKeyColumns);
values = ApplyPathRangesAndSampling(values, mkqlInputType, ytOp.Input().Ref(), ctx);
+ values = ApplyQLFilter(values, ytOp, ctx);
if ((ytOp.Maybe<TYtMerge>() && outTableInfo.RowSpec->IsSorted() && ytOp.Input().Item(0).Paths().Size() > 1)
|| ytOp.Maybe<TYtSort>())
@@ -627,6 +651,7 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
values = arg->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow ?
ctx.ProgramBuilder.ToFlow(values) : ctx.ProgramBuilder.Iterator(values, {});
values = ApplyPathRangesAndSampling(values, itemType, ytMap.Input().Ref(), ctx);
+ values = ApplyQLFilter(values, ytMap, ctx);
auto& lambdaInputType = GetSeqItemType(*ytMap.Mapper().Args().Arg(0).Ref().GetTypeAnn());
auto& lambdaOutputType = GetSeqItemType(*ytMap.Mapper().Body().Ref().GetTypeAnn());
@@ -826,6 +851,7 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
THashSet<TString>{"num", "index"}, forceKeyColumns);
values = ApplyPathRangesAndSampling(values, itemType, ytMapReduce.Input().Ref(), ctx);
+ values = ApplyQLFilter(values, ytMapReduce, ctx);
const auto outputItemType = BuildOutputType(ytMapReduce.Output(), ctx);
const size_t outputsCount = ytMapReduce.Output().Ref().ChildrenSize();
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index d8038fb20b8..16dac196f4e 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -213,6 +213,220 @@ TString DebugPath(NYT::TRichYPath path) {
return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text) + " (" + std::to_string(numColumns) + " columns)";
}
+void GetIntegerConstraints(const TExprNode::TPtr& column, bool& isSigned, ui64& minValueAbs, ui64& maxValueAbs) {
+ EDataSlot toType = column->GetTypeAnn()->Cast<TDataExprType>()->GetSlot();
+
+ // AllowIntegralConversion (may consider some refactoring)
+ if (toType == EDataSlot::Uint8) {
+ isSigned = false;
+ minValueAbs = 0;
+ maxValueAbs = Max<ui8>();
+ }
+ else if (toType == EDataSlot::Uint16) {
+ isSigned = false;
+ minValueAbs = 0;
+ maxValueAbs = Max<ui16>();
+ }
+ else if (toType == EDataSlot::Uint32) {
+ isSigned = false;
+ minValueAbs = 0;
+ maxValueAbs = Max<ui32>();
+ }
+ else if (toType == EDataSlot::Uint64) {
+ isSigned = false;
+ minValueAbs = 0;
+ maxValueAbs = Max<ui64>();
+ }
+ else if (toType == EDataSlot::Int8) {
+ isSigned = true;
+ minValueAbs = (ui64)Max<i8>() + 1;
+ maxValueAbs = (ui64)Max<i8>();
+ }
+ else if (toType == EDataSlot::Int16) {
+ isSigned = true;
+ minValueAbs = (ui64)Max<i16>() + 1;
+ maxValueAbs = (ui64)Max<i16>();
+ }
+ else if (toType == EDataSlot::Int32) {
+ isSigned = true;
+ minValueAbs = (ui64)Max<i32>() + 1;
+ maxValueAbs = (ui64)Max<i32>();
+ }
+ else if (toType == EDataSlot::Int64) {
+ isSigned = true;
+ minValueAbs = (ui64)Max<i64>() + 1;
+ maxValueAbs = (ui64)Max<i64>();
+ } else {
+ YQL_ENSURE(false, "unexpected integer node type");
+ }
+}
+
+void QuoteColumnForQL(const TStringBuf columnName, TStringBuilder& result) {
+ result << '`';
+ if (!columnName.Contains('`')) {
+ result << columnName;
+ } else {
+ for (const auto c : columnName) {
+ if (c == '`') {
+ result << "\\`";
+ } else {
+ result << c;
+ }
+ }
+ }
+ result << '`';
+}
+
+void GenerateInputQueryIntegerComparison(const TStringBuf& opName, const TExprNode::TPtr& intColumn, const TExprNode::TPtr& intValue, TStringBuilder& result) {
+ bool columnsIsSigned;
+ ui64 minValueAbs;
+ ui64 maxValueAbs;
+ GetIntegerConstraints(intColumn, columnsIsSigned, minValueAbs, maxValueAbs);
+
+ const auto maybeInt = TMaybeNode<TCoIntegralCtor>(intValue);
+ YQL_ENSURE(maybeInt);
+ bool hasSign;
+ bool isSigned;
+ ui64 valueAbs;
+ ExtractIntegralValue(maybeInt.Ref(), false, hasSign, isSigned, valueAbs);
+
+ if (!hasSign && valueAbs > maxValueAbs) {
+ // value is greater than maximum
+ if (opName == ">" || opName == ">=" || opName == "==") {
+ result << "FALSE";
+ } else {
+ result << "TRUE";
+ }
+ } else if (hasSign && valueAbs > minValueAbs) {
+ // value is less than minimum
+ if (opName == "<" || opName == "<=" || opName == "==") {
+ result << "FALSE";
+ } else {
+ result << "TRUE";
+ }
+ } else {
+ // value is in the range
+ const auto columnName = intColumn->ChildPtr(1)->Content();
+ const auto valueStr = maybeInt.Cast().Literal().Value();
+ QuoteColumnForQL(columnName, result);
+ result << " " << opName << " " << valueStr;
+ }
+}
+
+void GenerateInputQueryComparison(const TCoCompare& op, TStringBuilder& result) {
+ YQL_ENSURE(op.Ref().IsCallable({"<", "<=", ">", ">=", "==", "!="}));
+ const auto left = op.Left().Ptr();
+ const auto right = op.Right().Ptr();
+
+ if (left->IsCallable("Member")) {
+ GenerateInputQueryIntegerComparison(op.CallableName(), left, right, result);
+ } else {
+ YQL_ENSURE(right->IsCallable("Member"));
+ auto invertedOp = op.CallableName();
+ if (invertedOp == "<") {
+ invertedOp = ">";
+ } else if (invertedOp == "<=") {
+ invertedOp = ">=";
+ } else if (invertedOp == ">") {
+ invertedOp = "<";
+ } else if (invertedOp == ">=") {
+ invertedOp = "<=";
+ }
+ GenerateInputQueryIntegerComparison(invertedOp, right, left, result);
+ }
+}
+
+void GenerateInputQueryWhereExpression(const TExprNode::TPtr& node, TStringBuilder& result) {
+ if (const auto maybeCompare = TMaybeNode<TCoCompare>(node)) {
+ GenerateInputQueryComparison(maybeCompare.Cast(), result);
+ } else if (node->IsCallable("Not")) {
+ result << "NOT (";
+ GenerateInputQueryWhereExpression(node->ChildPtr(0), result);
+ result << ")";
+ } else if (node->IsCallable({"And", "Or"})) {
+ const TStringBuf op = node->IsCallable("And") ? "AND" : "OR";
+
+ result << "(";
+ GenerateInputQueryWhereExpression(node->Child(0), result);
+ result << ")";
+
+ const auto size = node->ChildrenSize();
+ for (TExprNode::TListType::size_type i = 1U; i < size; ++i) {
+ result << " " << op << " (";
+ GenerateInputQueryWhereExpression(node->Child(i), result);
+ result << ")";
+ };
+ } else {
+ YQL_ENSURE(false, "unexpected node type");
+ }
+}
+
+TString GenerateInputQuery(const TExprNode& settings, const TVector<TInputInfo>& inputs) {
+ auto qlFilterNode = NYql::GetSetting(settings, EYtSettingType::QLFilter);
+ if (!qlFilterNode) {
+ return "";
+ }
+
+ // TODO: how to handle multiple inputs?
+ YQL_ENSURE(inputs.size() == 1, "YtQLFilter: multiple inputs are not supported");
+
+ qlFilterNode = qlFilterNode->ChildPtr(1);
+ YQL_ENSURE(qlFilterNode && qlFilterNode->IsCallable("YtQLFilter"));
+ const TYtQLFilter qlFilter(qlFilterNode);
+ const auto predicate = qlFilter.Predicate().Body();
+
+ TStringBuilder result;
+ bool foundSomeColumns = false;
+ for (const auto& table: inputs) {
+ YQL_ENSURE(table.Path.Columns_ && table.Path.Columns_->Parts_, "YtQLFilter: TRichYPath should have columns");
+ for (const auto& column: table.Path.Columns_->Parts_) {
+ if (foundSomeColumns) {
+ result << ", ";
+ }
+ QuoteColumnForQL(column, result);
+ foundSomeColumns = true;
+ }
+ }
+ if (!foundSomeColumns) {
+ result << "*";
+ }
+ result << " WHERE ";
+ GenerateInputQueryWhereExpression(predicate.Ptr(), result);
+ YQL_CLOG(INFO, ProviderYt) << __FUNCTION__ << ": " << result;
+ return result;
+}
+
+void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery) {
+ spec["input_query"] = inputQuery;
+ spec["input_query_filter_options"]["enable_chunk_filter"] = true;
+ spec["input_query_filter_options"]["enable_row_filter"] = true;
+}
+
+void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& inputQuery) {
+ // YQL-19382
+ if (inputQuery) {
+ for (auto& path : paths) {
+ path.Columns_.Clear();
+ }
+ SetInputQuerySpec(spec, inputQuery);
+ }
+}
+
+template <typename T>
+void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& inputQuery, const bool useSkiff) {
+ // YQL-19382
+ if (inputQuery) {
+ YQL_ENSURE(!useSkiff, "QLFilter can't work with skiff on map/mapreduce right now, try with PRAGMA yt.UseSkiff='false'");
+ const auto& inputs = specWithPaths.GetInputs();
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ auto path = inputs[i];
+ path.Columns_.Clear();
+ specWithPaths.SetInput(i, path);
+ }
+ SetInputQuerySpec(spec, inputQuery);
+ }
+}
+
} // unnamed
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -3378,9 +3592,10 @@ private:
bool forceTransform = NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform);
bool combineChunks = NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::CombineChunks);
TMaybe<ui64> limit = GetLimit(merge.Settings().Ref());
+ const TString inputQuery = GenerateInputQuery(merge.Settings().Ref(), execCtx->InputTables_);
- return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, execCtx]() {
- return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, execCtx] (const auto& f) {
+ return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, inputQuery, execCtx]() {
+ return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, inputQuery, execCtx] (const auto& f) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
auto entry = execCtx->GetEntry();
bool cacheHit = f.GetValue();
@@ -3425,6 +3640,8 @@ private:
spec["schema_inference_mode"] = "from_output"; // YTADMINREQ-17692
}
+ PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQuery);
+
return execCtx->RunOperation([entry, mergeOpSpec = std::move(mergeOpSpec), spec = std::move(spec)](){
return entry->Tx->Merge(mergeOpSpec, TOperationOptions().StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec));
});
@@ -3442,12 +3659,13 @@ private:
TString mapLambda,
const TString& inputType,
const TExpressionResorceUsage& extraUsage,
+ const TString& inputQuery,
const TExecContext<TRunOptions>::TPtr& execCtx
) {
const bool testRun = execCtx->Config_->GetLocalChainTest();
TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync();
return ret.Apply([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda,
- inputType, extraUsage, execCtx, testRun] (const auto& f) mutable
+ inputType, extraUsage, inputQuery, execCtx, testRun] (const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
TTransactionCache::TEntry::TPtr entry;
@@ -3610,6 +3828,8 @@ private:
spec["job_count"] = static_cast<i64>(*jobCount);
}
+ PrepareInputQueryForMap(spec, mapOpSpec, inputQuery, useSkiff);
+
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
opOpts.StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec);
@@ -3647,11 +3867,12 @@ private:
}
auto extraUsage = execCtx->ScanExtraResourceUsage(map.Mapper().Body().Ref(), true);
TString inputType = NCommon::WriteTypeToYson(GetSequenceItemType(map.Input().Size() == 1U ? TExprBase(map.Input().Item(0)) : TExprBase(map.Mapper().Args().Arg(0)), true));
+ const TString inputQuery = GenerateInputQuery(map.Settings().Ref(), execCtx->InputTables_);
- return execCtx->Session_->Queue_->Async([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, execCtx]() {
+ return execCtx->Session_->Queue_->Async([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQuery, execCtx]() {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
execCtx->MakeUserFiles();
- return ExecMap(ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, execCtx);
+ return ExecMap(ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQuery, execCtx);
});
}
@@ -3879,13 +4100,14 @@ private:
NYT::TNode intermediateMeta,
const NYT::TNode& intermediateSchema,
const NYT::TNode& intermediateStreams,
+ const TString& inputQuery,
const TExecContext<TRunOptions>::TPtr& execCtx
) {
const bool testRun = execCtx->Config_->GetLocalChainTest();
TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync();
return ret.Apply([reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs,
mapExtraUsage, reduceLambda, reduceInputType, reduceExtraUsage,
- intermediateMeta, intermediateSchema, intermediateStreams, execCtx, testRun]
+ intermediateMeta, intermediateSchema, intermediateStreams, inputQuery, execCtx, testRun]
(const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
@@ -4103,6 +4325,8 @@ private:
spec["mapper"]["output_streams"] = intermediateStreams;
}
+ PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQuery, useSkiff);
+
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
opOpts.StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec);
@@ -4124,12 +4348,13 @@ private:
const TExpressionResorceUsage& reduceExtraUsage,
const NYT::TNode& intermediateSchema,
bool useIntermediateStreams,
+ const TString& inputQuery,
const TExecContext<TRunOptions>::TPtr& execCtx
) {
const bool testRun = execCtx->Config_->GetLocalChainTest();
TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync();
return ret.Apply([reduceBy, sortBy, limit, sortLimitBy, reduceLambda, reduceInputType,
- reduceExtraUsage, intermediateSchema, useIntermediateStreams, execCtx, testRun]
+ reduceExtraUsage, intermediateSchema, useIntermediateStreams, inputQuery, execCtx, testRun]
(const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
@@ -4247,6 +4472,8 @@ private:
spec["reducer"]["enable_input_table_index"] = true;
}
+ PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQuery, useSkiff);
+
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
opOpts.StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec);
@@ -4368,17 +4595,19 @@ private:
limit.Clear();
}
+ const TString inputQuery = GenerateInputQuery(mapReduce.Settings().Ref(), execCtx->InputTables_);
+
return execCtx->Session_->Queue_->Async([reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage,
- reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, useIntermediateStreams, execCtx]()
+ reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, useIntermediateStreams, inputQuery, execCtx]()
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
execCtx->MakeUserFiles();
if (mapLambda) {
return ExecMapReduce(reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage,
- reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, execCtx);
+ reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, inputQuery, execCtx);
} else {
return ExecMapReduce(reduceBy, sortBy, limit, sortLimitBy, reduceLambda, reduceInputType, reduceExtraUsage, intermediateSchema,
- useIntermediateStreams, execCtx);
+ useIntermediateStreams, inputQuery, execCtx);
}
});
}
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
index 1107e0210dc..8db9ef743b3 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
@@ -81,6 +81,11 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
AddHandler(1, &TYtReduce::Match, HNDL(FuseReduceWithTrivialMap));
}
+ if (State_->Configuration->UseQLFilter.Get().GetOrElse(DEFAULT_USE_QL_FILTER)) {
+ // best to run after Fuse*Map and before MapToMerge
+ AddHandler(2, Names({TYtMap::CallableName()}), HNDL(ExtractQLFilters));
+ AddHandler(2, Names({TYtQLFilter::CallableName()}), HNDL(OptimizeQLFilterType));
+ }
AddHandler(2, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin));
AddHandler(2, &TStatWriteTable::Match, HNDL(ReplaceStatWriteTable));
AddHandler(2, &TYtMap::Match, HNDL(MapToMerge));
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
index 67e68ea9790..48765ee2c4a 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
@@ -74,6 +74,10 @@ private:
// <cmpItem> := '(<cmpOp> <value>)
NNodes::TMaybeNode<NNodes::TExprBase> ExtractKeyRangeLegacy(NNodes::TExprBase node, TExprContext& ctx) const;
+ NNodes::TMaybeNode<NNodes::TExprBase> ExtractQLFilters(NNodes::TExprBase node, TExprContext& ctx) const;
+
+ NNodes::TMaybeNode<NNodes::TExprBase> OptimizeQLFilterType(NNodes::TExprBase node, TExprContext& ctx) const;
+
NNodes::TMaybeNode<NNodes::TExprBase> FuseReduce(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
NNodes::TMaybeNode<NNodes::TExprBase> FuseReduceWithTrivialMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp
index e70d54fffa8..a2f5fd41000 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_field_subset.cpp
@@ -8,6 +8,20 @@ namespace NYql {
using namespace NNodes;
+namespace {
+ void InsertQLFilterColumns(const TYtWithUserJobsOpBase& op, TSet<TStringBuf>& memberSet) {
+ const auto qlFilterSetting = GetSetting(op.Settings().Ref(), EYtSettingType::QLFilter);
+ if (qlFilterSetting) {
+ const auto qlFilter = qlFilterSetting->Child(1);
+ YQL_ENSURE(qlFilter->IsCallable("YtQLFilter"));
+ const TStructExprType* qlFilterType = qlFilter->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+ for (const auto& item : qlFilterType->GetItems()) {
+ memberSet.insert(item->GetName());
+ }
+ }
+ }
+} // empty namespace
+
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::LambdaFieldsSubset(TYtWithUserJobsOpBase op, size_t lambdaIdx, TExprContext& ctx, const TGetParents& getParents) const {
auto lambda = TCoLambda(op.Ref().ChildPtr(lambdaIdx));
@@ -28,6 +42,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::LambdaFieldsSubset(TYtW
memberSet.insert(reduceBy.cbegin(), reduceBy.cend());
auto sortBy = NYql::GetSettingAsColumnList(op.Settings().Ref(), EYtSettingType::SortBy);
memberSet.insert(sortBy.cbegin(), sortBy.cend());
+ InsertQLFilterColumns(op, memberSet);
auto itemType = GetSeqItemType(lambda.Args().Arg(0).Ref().GetTypeAnn())->Cast<TStructExprType>();
if (memberSet.size() < itemType->GetSize()) {
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
index 31c685f4438..821666afe40 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
@@ -641,7 +641,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseInnerMap(TExprBase
if (NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Flow) != NYql::HasSetting(outerMap.Settings().Ref(), EYtSettingType::Flow)) {
return node;
}
- if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount)) {
+ if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount | EYtSettingType::QLFilter)) {
return node;
}
if (!path.Ranges().Maybe<TCoVoid>()) {
@@ -779,7 +779,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseOuterMap(TExprBase
if (NYql::HasAnySetting(inner.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::JobCount)) {
return node;
}
- if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputApplied)) {
+ if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputApplied | EYtSettingType::QLFilter)) {
return node;
}
if (outerMap.Input().Item(0).Settings().Size() != 0) {
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp
index e3230f6f329..1caead064f9 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp
@@ -394,7 +394,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::MapToMerge(TExprBase no
.Input()
.Add(section)
.Build()
- .Settings(NYql::KeepOnlySettings(map.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::KeepSorted, ctx))
+ .Settings(NYql::KeepOnlySettings(map.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::QLFilter, ctx))
.Done();
}
@@ -409,7 +409,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::MergeToCopy(TExprBase n
return node;
}
- if (NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks)) {
+ if (NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks | EYtSettingType::QLFilter)) {
return node;
}
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp
new file mode 100644
index 00000000000..73cacf723d8
--- /dev/null
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp
@@ -0,0 +1,233 @@
+#include "yql_yt_phy_opt.h"
+
+#include <yt/yql/providers/yt/provider/yql_yt_helpers.h>
+
+#include <yql/essentials/core/dq_expr_nodes/dq_expr_nodes.h>
+#include <yql/essentials/core/yql_opt_utils.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/minikql/mkql_node.h>
+
+namespace NYql {
+
+using namespace NNodes;
+using namespace NNodes::NDq;
+using namespace NPrivate;
+
+namespace {
+
+bool NodeHasQLCompatibleType(const TExprNode::TPtr& node) {
+ bool isOptional = false;
+ const TDataExprType* dataType = nullptr;
+ if (!IsDataOrOptionalOfData(node->GetTypeAnn(), isOptional, dataType)) {
+ return false;
+ }
+ if (isOptional) {
+ return false;
+ }
+ if (!dataType) {
+ return false;
+ }
+ if (!IsDataTypeIntegral(dataType->GetSlot())) {
+ return false;
+ }
+ return true;
+}
+
+TExprNode::TPtr CheckQLConst(const TExprNode::TPtr& node, const TExprNode::TPtr& rowArg) {
+ if (!NodeHasQLCompatibleType(node)) {
+ return nullptr;
+ }
+ if (IsDepended(*node, *rowArg)) {
+ return nullptr;
+ }
+ return node;
+}
+
+TExprNode::TPtr ConvertQLMember(const TExprNode::TPtr& node, const TExprNode::TPtr& rowArg, const TExprNode::TPtr& newRowArg, TExprContext& ctx) {
+ if (!node->IsCallable("Member")) {
+ return nullptr;
+ }
+ YQL_ENSURE(node->ChildrenSize() == 2);
+ if (node->ChildPtr(0) != rowArg) {
+ return nullptr;
+ }
+ const auto memberName = node->Child(1)->Content();
+ if (memberName.StartsWith("_yql_sys_")) {
+ return nullptr;
+ }
+ if (!NodeHasQLCompatibleType(node)) {
+ return nullptr;
+ }
+ auto arg = newRowArg;
+ return ctx.ChangeChild(*node, 0, std::move(arg));
+}
+
+TExprNode::TPtr ConvertQLComparison(const TExprNode::TPtr& node, const TExprNode::TPtr& rowArg, const TExprNode::TPtr& newRowArg, TExprContext& ctx) {
+ YQL_ENSURE(node->ChildrenSize() == 2);
+ TExprNode::TPtr childLeft;
+ TExprNode::TPtr childRight;
+ if (childLeft = ConvertQLMember(node->ChildPtr(0), rowArg, newRowArg, ctx)) {
+ childRight = CheckQLConst(node->ChildPtr(1), rowArg);
+ }
+ else if (childRight = ConvertQLMember(node->ChildPtr(1), rowArg, newRowArg, ctx)) {
+ childLeft = CheckQLConst(node->ChildPtr(0), rowArg);
+ }
+ if (!childLeft || !childRight) {
+ return nullptr;
+ }
+ return ctx.ChangeChildren(*node, {childLeft, childRight});
+}
+
+TExprNode::TPtr ConvertQLSubTree(const TExprNode::TPtr& node, const TExprNode::TPtr& rowArg, const TExprNode::TPtr& newRowArg, TExprContext& ctx) {
+ if (node->IsCallable({"And", "Or", "Not"})) {
+ TExprNode::TListType convertedChildren;
+ for (const auto& child : node->ChildrenList()) {
+ const auto converted = ConvertQLSubTree(child, rowArg, newRowArg, ctx);
+ if (!converted) {
+ return nullptr;
+ }
+ convertedChildren.push_back(converted);
+ };
+ return ctx.ChangeChildren(*node, std::move(convertedChildren));
+ }
+ if (node->IsCallable({"<", "<=", ">", ">=", "==", "!="})) {
+ return ConvertQLComparison(node, rowArg, newRowArg, ctx);
+ }
+ return nullptr;
+}
+
+} // empty namespace
+
+TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::ExtractQLFilters(TExprBase node, TExprContext& ctx) const {
+ const auto opMap = node.Cast<TYtMap>();
+ if (opMap.Input().Size() > 1) {
+ return node;
+ }
+
+ const auto section = opMap.Input().Item(0);
+ if (NYql::HasAnySetting(opMap.Settings().Ref(), EYtSettingType::QLFilter | EYtSettingType::WeakFields)) {
+ return node;
+ }
+
+ const auto flatMap = GetFlatMapOverInputStream(opMap.Mapper());
+ if (!flatMap) {
+ return node;
+ }
+
+ const auto flatMapLambda = flatMap.Lambda();
+ if (!flatMapLambda) {
+ return node;
+ }
+
+ const auto optionalIf = flatMapLambda.Body().Maybe<TCoOptionalIf>();
+ if (!optionalIf) {
+ return node;
+ }
+
+ const auto rowArg = flatMapLambda.Cast().Args().Arg(0).Ptr();
+ const auto newRowArg = ctx.NewArgument(rowArg->Pos(), rowArg->Content());
+
+ TExprNode::TListType qlCompatibleParts;
+ TExprNode::TListType otherParts;
+ TExprNode::TPtr predicate = optionalIf.Cast().Predicate().Ptr();
+
+ if (!predicate->IsCallable("And")) {
+ const auto converted = ConvertQLSubTree(predicate, rowArg, newRowArg, ctx);
+ if (converted) {
+ qlCompatibleParts.push_back(converted);
+ } else {
+ otherParts.push_back(predicate);
+ }
+ } else {
+ for (const auto& child : predicate->ChildrenList()) {
+ const auto converted = ConvertQLSubTree(child, rowArg, newRowArg, ctx);
+ if (converted) {
+ qlCompatibleParts.push_back(converted);
+ } else {
+ otherParts.push_back(child);
+ }
+ };
+ }
+
+ if (qlCompatibleParts.empty()) {
+ return node;
+ }
+
+ TExprNode::TPtr qlCompatiblePredicate;
+ if (qlCompatibleParts.size() == 1) {
+ qlCompatiblePredicate = qlCompatibleParts.front();
+ } else {
+ qlCompatiblePredicate = ctx.NewCallable(flatMap.Cast().Pos(), "And", std::move(qlCompatibleParts));
+ }
+ YQL_ENSURE(qlCompatiblePredicate);
+
+ TExprNode::TPtr prunedPredicate;
+ if (otherParts.empty()) {
+ prunedPredicate = MakeBool<true>(predicate->Pos(), ctx);
+ } else if (otherParts.size() == 1) {
+ prunedPredicate = otherParts.front();
+ } else {
+ prunedPredicate = ctx.NewCallable(predicate->Pos(), "And", std::move(otherParts));
+ }
+ YQL_ENSURE(prunedPredicate);
+
+ const auto typeNode = ExpandType(rowArg->Pos(), *rowArg->GetTypeAnn(), ctx);
+ const auto lambdaNode = ctx.NewLambda(qlCompatiblePredicate->Pos(), ctx.NewArguments(qlCompatiblePredicate->Pos(), {newRowArg}), std::move(qlCompatiblePredicate));
+ const auto qlFilter = ctx.NewCallable(flatMap.Cast().Pos(), "YtQLFilter", {typeNode, lambdaNode});
+
+ auto newOpMap = ctx.ChangeChild(opMap.Ref(), TYtMap::idx_Settings, NYql::AddSetting(opMap.Settings().Ref(), EYtSettingType::QLFilter, qlFilter, ctx));
+ const bool pruneLambda = State_->Configuration->PruneQLFilterLambda.Get().GetOrElse(DEFAULT_PRUNE_QL_FILTER_LAMBDA);
+ if (pruneLambda) {
+ const auto newFlatMap = Build<TCoFlatMapBase>(ctx, flatMap.Cast().Pos())
+ .InitFrom(flatMap.Cast())
+ .Lambda()
+ .InitFrom(flatMapLambda.Cast())
+ .Body<TCoOptionalIf>()
+ .InitFrom(optionalIf.Cast())
+ .Predicate(prunedPredicate)
+ .Build()
+ .Build()
+ .Done().Ptr();
+
+ const TOptimizeExprSettings settings{State_->Types};
+ const TNodeOnNodeOwnedMap remaps{{flatMap.Cast().Raw(), newFlatMap}};
+ const auto status = RemapExpr(newOpMap, newOpMap, remaps, ctx, settings);
+ YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Error);
+ }
+ return newOpMap;
+}
+
+TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::OptimizeQLFilterType(TExprBase node, TExprContext& ctx) const {
+ const auto qlFilter = node.Cast<TYtQLFilter>();
+ const auto rowType = qlFilter.RowType();
+
+ const auto arg = qlFilter.Predicate().Args().Arg(0).Ptr();
+ TSet<TStringBuf> memberSet;
+ VisitExpr(qlFilter.Predicate().Body().Ptr(), [&arg, &memberSet](const TExprNode::TPtr& n) {
+ if (n->IsCallable("Member")) {
+ if (n->ChildPtr(0) == arg) {
+ const auto member = n->Child(1);
+ YQL_ENSURE(member->IsAtom());
+ memberSet.insert(member->Content());
+ }
+ return false;
+ }
+ return true;
+ });
+
+ TExprNode::TListType newRowTypeChildren;
+ for (const auto& child : rowType.Ptr()->ChildrenList()) {
+ const auto name = child->Child(0);
+ if (memberSet.contains(name->Content())) {
+ newRowTypeChildren.push_back(child);
+ }
+ }
+ if (newRowTypeChildren.size() == rowType.Ptr()->ChildrenSize()) {
+ return node;
+ }
+
+ auto newRowType = ctx.ChangeChildren(rowType.Ref(), std::move(newRowTypeChildren));
+ return ctx.ChangeChild(qlFilter.Ref(), TYtQLFilter::idx_RowType, std::move(newRowType));
+}
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/provider/ya.make b/yt/yql/providers/yt/provider/ya.make
index f8b57a915eb..719850bb8b0 100644
--- a/yt/yql/providers/yt/provider/ya.make
+++ b/yt/yql/providers/yt/provider/ya.make
@@ -61,6 +61,7 @@ SRCS(
phy_opt/yql_yt_phy_opt_field_subset.cpp
phy_opt/yql_yt_phy_opt_helper.cpp
phy_opt/yql_yt_phy_opt_key_range.cpp
+ phy_opt/yql_yt_phy_opt_ytql.cpp
phy_opt/yql_yt_phy_opt_merge.cpp
phy_opt/yql_yt_phy_opt_push.cpp
phy_opt/yql_yt_phy_opt_write.cpp
diff --git a/yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp b/yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp
index c9978f95aa1..1ad5173187e 100644
--- a/yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_datasink_constraints.cpp
@@ -43,6 +43,7 @@ public:
AddHandler({TYtDqProcessWrite ::CallableName()}, Hndl(&TYtDataSinkConstraintTransformer::HandleDqProcessWrite));
AddHandler({TYtTryFirst ::CallableName()}, Hndl(&TYtDataSinkConstraintTransformer::HandleTryFirst));
AddHandler({TYtMaterialize ::CallableName()}, Hndl(&TYtDataSinkConstraintTransformer::HandleMaterialize));
+ AddHandler({TYtQLFilter::CallableName()}, Hndl(&TYtDataSinkConstraintTransformer::HandleDefault));
}
private:
static void CopyExcept(TExprNode* dst, const TExprNode& from, const TStringBuf& except) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp
index 56b1d6866ff..9bfc7af690a 100644
--- a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp
@@ -115,6 +115,7 @@ public:
AddHandler({TYtDqWideWrite::CallableName()}, Hndl(&TYtDataSinkTypeAnnotationTransformer::HandleDqWrite<true>));
AddHandler({TYtTryFirst::CallableName()}, Hndl(&TYtDataSinkTypeAnnotationTransformer::HandleTryFirst));
AddHandler({TYtMaterialize::CallableName()}, Hndl(&TYtDataSinkTypeAnnotationTransformer::HandleMaterialize));
+ AddHandler({TYtQLFilter::CallableName()}, Hndl(&TYtDataSinkTypeAnnotationTransformer::HandleQLFilter));
}
private:
@@ -1013,7 +1014,7 @@ private:
auto merge = TYtMerge(input);
- if (!ValidateSettings(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks | EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::NoDq, ctx)) {
+ if (!ValidateSettings(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks | EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::NoDq | EYtSettingType::QLFilter, ctx)) {
return TStatus::Error;
}
@@ -1097,7 +1098,8 @@ private:
| EYtSettingType::BlockInputReady
| EYtSettingType::BlockInputApplied
| EYtSettingType::BlockOutputReady
- | EYtSettingType::BlockOutputApplied;
+ | EYtSettingType::BlockOutputApplied
+ | EYtSettingType::QLFilter;
if (!ValidateSettings(map.Settings().Ref(), accpeted, ctx)) {
return TStatus::Error;
}
@@ -1297,7 +1299,8 @@ private:
| EYtSettingType::KeySwitch
| EYtSettingType::MapOutputType
| EYtSettingType::ReduceInputType
- | EYtSettingType::NoDq;
+ | EYtSettingType::NoDq
+ | EYtSettingType::QLFilter;
if (!ValidateSettings(mapReduce.Settings().Ref(), acceptedSettings, ctx)) {
return TStatus::Error;
}
@@ -2142,6 +2145,38 @@ private:
return TStatus::Ok;
}
+ TStatus HandleQLFilter(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureArgsCount(*input, 2, ctx)) {
+ return TStatus::Error;
+ }
+
+ const auto& type = input->Child(0);
+ if (!EnsureTypeWithStructType(*type, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& lambda = input->ChildRef(1);
+ const auto status = ConvertToLambda(lambda, ctx, 1);
+ if (status.Level != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+
+ if (!UpdateLambdaAllArgumentsTypes(lambda, {type->GetTypeAnn()->Cast<TTypeExprType>()->GetType()}, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!lambda->GetTypeAnn()) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ if (!EnsureSpecificDataType(*lambda, EDataSlot::Bool, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.MakeType<TUnitExprType>());
+ return TStatus::Ok;
+ }
+
private:
const TYtState::TPtr State_;
};
diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
index a47ae7b7c81..97b9fdeda92 100644
--- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
@@ -573,6 +573,25 @@ bool IsConstExpSortDirections(NNodes::TExprBase sortDirections) {
return false;
}
+void GetNodesToCalculateFromQLFilter(const TExprNode& qlFilter, TExprNode::TListType &needCalc, TNodeSet &uniqNodes) {
+ YQL_ENSURE(qlFilter.IsCallable("YtQLFilter"));
+ const auto lambdaBody = qlFilter.Child(1)->Child(1);
+ VisitExpr(lambdaBody, [&needCalc, &uniqNodes](const TExprNode::TPtr& node) {
+ if (node->IsCallable({"And", "Or", "Not", "<", "<=", ">", ">=", "==", "!="})) {
+ return true;
+ }
+ if (node->IsCallable("Member")) {
+ return false;
+ }
+ if (uniqNodes.insert(node.Get()).second) {
+ if (NeedCalc(TExprBase(node.Get()))) {
+ needCalc.push_back(node);
+ }
+ }
+ return false;
+ });
+}
+
TExprNode::TListType GetNodesToCalculate(const TExprNode::TPtr& input) {
TExprNode::TListType needCalc;
TNodeSet uniqNodes;
@@ -592,6 +611,9 @@ TExprNode::TListType GetNodesToCalculate(const TExprNode::TPtr& input) {
}
}
break;
+ case EYtSettingType::QLFilter:
+ GetNodesToCalculateFromQLFilter(setting.Value().Cast().Ref(), needCalc, uniqNodes);
+ break;
default:
break;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp b/yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp
index fdc3b118269..13d426e13dd 100644
--- a/yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_horizontal_join.cpp
@@ -54,7 +54,7 @@ bool THorizontalJoinBase::IsGoodForHorizontalJoin(TYtMap map) const {
}
// Map has output limit or is sharded MapJoin
- if (NYql::HasAnySetting(map.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::Sharded | EYtSettingType::JobCount | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputApplied)) {
+ if (NYql::HasAnySetting(map.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::Sharded | EYtSettingType::JobCount | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputApplied | EYtSettingType::QLFilter)) {
return false;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp b/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp
index d54ec130778..2d1ebbc5025 100644
--- a/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp
@@ -917,6 +917,17 @@ bool ValidateSettings(const TExprNode& settingsNode, EYtSettingTypes accepted, T
}
return true;
}
+ case EYtSettingType::QLFilter: {
+ if (!EnsureTupleSize(*setting, 2, ctx)) {
+ return false;
+ }
+ const auto qlFilter = setting->Child(1);
+ if (!qlFilter->IsCallable("YtQLFilter")) {
+ ctx.AddError(TIssue(ctx.GetPosition(qlFilter->Pos()), TStringBuilder()
+ << "Expected YtQLFilter node, got: " << qlFilter->Content()));
+ }
+ break;
+ }
case EYtSettingType::LAST: {
YQL_ENSURE(false, "Unexpected EYtSettingType");
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_op_settings.h b/yt/yql/providers/yt/provider/yql_yt_op_settings.h
index f249a51ddfb..e99450e1081 100644
--- a/yt/yql/providers/yt/provider/yql_yt_op_settings.h
+++ b/yt/yql/providers/yt/provider/yql_yt_op_settings.h
@@ -101,6 +101,7 @@ enum class EYtSettingType: ui64 {
BlockInputApplied /* "blockInputApplied" */, // hybrid supported
BlockOutputReady /* "blockOutputReady" */, // hybrid supported
BlockOutputApplied /* "blockOutputApplied" */, // hybrid supported
+ QLFilter /* "qlFilter" */,
// Out tables
UniqueBy /* "uniqueBy" */,
OpHash /* "opHash" */,
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer.txt b/yt/yql/tests/sql/suites/ql_filter/integer.txt
new file mode 100644
index 00000000000..e4a8e247c95
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer.txt
@@ -0,0 +1,10 @@
+{"a"=1;"b"=1;"c"=1u;"d"=1;"e"=1u;"escaping []`"=1};
+{"a"=2;"b"=-2;"c"=2u;"d"=10;"e"=10u;"escaping []`"=2};
+{"a"=-3;"b"=3;"c"=3u;"d"=-100;"e"=100u;"escaping []`"=3};
+{"a"=4;"b"=4;"c"=4u;"d"=1000;"e"=1000u;"escaping []`"=4};
+{"a"=5;"b"=-5;"c"=5u;"d"=10000;"e"=10000u;"escaping []`"=5};
+{"a"=-6;"b"=6;"c"=6u;"d"=-100000;"e"=100000u;"escaping []`"=6};
+{"a"=7;"b"=7;"c"=7u;"d"=1000000;"e"=1000000u;"escaping []`"=7};
+{"a"=8;"b"=-8;"c"=8u;"d"=10000000;"e"=10000000u;"escaping []`"=8};
+{"a"=-9;"b"=9;"c"=9u;"d"=-100000000;"e"=100000000u;"escaping []`"=9};
+{"a"=10;"b"=10;"c"=10u;"d"=1000000000;"e"=1000000000u;"escaping []`"=10};
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer.txt.attr b/yt/yql/tests/sql/suites/ql_filter/integer.txt.attr
new file mode 100644
index 00000000000..eb0ef07a6ef
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer.txt.attr
@@ -0,0 +1,12 @@
+{
+ "_yql_row_spec"={
+ "Type"=["StructType";[
+ ["a";["DataType";"Int32"]];
+ ["b";["DataType";"Int32"]];
+ ["c";["DataType";"Uint32"]];
+ ["d";["DataType";"Int64"]];
+ ["e";["DataType";"Uint64"]];
+ ["escaping []`";["DataType";"Int32"]]
+ ]]
+ }
+}
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_bounds.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql
new file mode 100644
index 00000000000..1f763a6b417
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql
@@ -0,0 +1,8 @@
+pragma yt.UseQLFilter;
+PRAGMA yt.UseSkiff='false';
+
+select c
+from plato.Input
+where
+ c > -1
+ AND a < 18446744073709551615;
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_escaping.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql
new file mode 100644
index 00000000000..3e074e24e60
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql
@@ -0,0 +1,8 @@
+pragma yt.UseQLFilter;
+PRAGMA yt.UseSkiff='false';
+
+select
+ `escaping []\``,
+ `escaping []\`` as renamed
+from plato.Input
+where `escaping []\`` > 5;
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_eval.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_eval.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_eval.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_eval.sql b/yt/yql/tests/sql/suites/ql_filter/integer_eval.sql
new file mode 100644
index 00000000000..a7772686f7b
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_eval.sql
@@ -0,0 +1,5 @@
+pragma yt.UseQLFilter;
+
+select a
+from plato.Input
+where a > 2+3; \ No newline at end of file
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_left.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql
new file mode 100644
index 00000000000..fa2d5840773
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql
@@ -0,0 +1,13 @@
+pragma yt.UseQLFilter;
+pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644
+
+select a, c, d, e
+from plato.Input
+where
+ a > 5
+ and
+ c > 5
+ and
+ d > 5
+ and
+ e > 5; \ No newline at end of file
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_right.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql
new file mode 100644
index 00000000000..56051ba52b5
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql
@@ -0,0 +1,13 @@
+pragma yt.UseQLFilter;
+pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644
+
+select a, c, d, e
+from plato.Input
+where
+ 5 < a
+ and
+ 5 < c
+ and
+ 5 < d
+ and
+ 5 < e; \ No newline at end of file
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_members.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_members.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_members.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_members.sql b/yt/yql/tests/sql/suites/ql_filter/integer_members.sql
new file mode 100644
index 00000000000..a9640db73c1
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_members.sql
@@ -0,0 +1,5 @@
+pragma yt.UseQLFilter;
+
+select a, b
+from plato.Input
+where a > b; \ No newline at end of file
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.sql b/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.sql
new file mode 100644
index 00000000000..a2f57ebbffc
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_members_eval.sql
@@ -0,0 +1,5 @@
+pragma yt.UseQLFilter;
+
+select a, b
+from plato.Input
+where a - b + 5 > 0; \ No newline at end of file
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_select_other.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql
new file mode 100644
index 00000000000..bdf2ebbcd26
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql
@@ -0,0 +1,7 @@
+pragma yt.UseQLFilter;
+pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644
+
+select b
+from plato.Input
+where
+ a > 5; \ No newline at end of file
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_single.cfg
new file mode 100644
index 00000000000..d0ce4581d7d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_single.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single.sql b/yt/yql/tests/sql/suites/ql_filter/integer_single.sql
new file mode 100644
index 00000000000..73d6933bd55
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_single.sql
@@ -0,0 +1,5 @@
+pragma yt.UseQLFilter;
+
+select a
+from plato.Input
+where a > 5;