diff options
author | lucius <lucius@yandex-team.com> | 2025-02-27 20:10:56 +0300 |
---|---|---|
committer | lucius <lucius@yandex-team.com> | 2025-02-27 20:47:25 +0300 |
commit | 75d63f434cc0f0361068d1b880ec9a8d0e251317 (patch) | |
tree | 2045e096c502afced11c821c29378a2c0de653d7 /yt/yql | |
parent | 9ae9b76b16407d250878aafff784174478f1b769 (diff) | |
download | ydb-75d63f434cc0f0361068d1b880ec9a8d0e251317.tar.gz |
YQL-19382 enable ql filter with skiff
Тесты временно пометил как "yt can not", потому что еще ждем обновления yt\_local в тестах
commit_hash:e956b00a7c9b3c3c11ab6c6ed756eeac89c582ea
Diffstat (limited to 'yt/yql')
8 files changed, 88 insertions, 54 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index 196ff9a1c3..fb021d149c 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -361,69 +361,79 @@ void GenerateInputQueryWhereExpression(const TExprNode::TPtr& node, TStringBuild } } -TString GenerateInputQuery(const TExprNode& settings, const TVector<TInputInfo>& inputs) { +TString GenerateInputQueryWhereExpression(const TExprNode& settings) { auto qlFilterNode = NYql::GetSetting(settings, EYtSettingType::QLFilter); if (!qlFilterNode) { return ""; } - // TODO: how to handle multiple inputs? - YQL_ENSURE(inputs.size() == 1, "YtQLFilter: multiple inputs are not supported"); - qlFilterNode = qlFilterNode->ChildPtr(1); YQL_ENSURE(qlFilterNode && qlFilterNode->IsCallable("YtQLFilter")); const TYtQLFilter qlFilter(qlFilterNode); const auto predicate = qlFilter.Predicate().Body(); TStringBuilder result; - bool foundSomeColumns = false; - for (const auto& table: inputs) { - YQL_ENSURE(table.Path.Columns_ && table.Path.Columns_->Parts_, "YtQLFilter: TRichYPath should have columns"); - for (const auto& column: table.Path.Columns_->Parts_) { - if (foundSomeColumns) { + GenerateInputQueryWhereExpression(predicate.Ptr(), result); + YQL_CLOG(INFO, ProviderYt) << __FUNCTION__ << ": " << result; + return result; +} + +TString GenerateInputQuery(const TRichYPath& path, const TString& whereExpression, bool useSystemColumns) { + YQL_ENSURE(whereExpression); + TStringBuilder result; + if (!path.Columns_ || !path.Columns_->Parts_) { + result << "*"; + } else { + bool first = true; + for (const auto& column: path.Columns_->Parts_) { + if (!first) { result << ", "; } QuoteColumnForQL(column, result); - foundSomeColumns = true; + first = false; + } + if (useSystemColumns) { + result << ", `$row_index`, `$range_index`"; } } - if (!foundSomeColumns) { - result << "*"; - } - result << " WHERE "; - GenerateInputQueryWhereExpression(predicate.Ptr(), result); + result << " WHERE " << whereExpression; YQL_CLOG(INFO, ProviderYt) << __FUNCTION__ << ": " << result; return result; } -void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery) { +void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery, bool useSystemColumns) { spec["input_query"] = inputQuery; spec["input_query_filter_options"]["enable_chunk_filter"] = true; spec["input_query_filter_options"]["enable_row_filter"] = true; + if (useSystemColumns) { + spec["input_query_options"]["use_system_columns"] = true; + } } -void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& inputQuery) { +void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& whereExpression) { // YQL-19382 - if (inputQuery) { - for (auto& path : paths) { - path.Columns_.Clear(); - } - SetInputQuerySpec(spec, inputQuery); + if (whereExpression) { + YQL_ENSURE(paths.size() == 1, "YtQLFilter: multiple inputs are not supported"); + auto& path = paths[0]; + const TString inputQuery = GenerateInputQuery(path, whereExpression, /*useSystemColumns*/ false); + path.Columns_.Clear(); + SetInputQuerySpec(spec, inputQuery, /*useSystemColumns*/ false); } } template <typename T> -void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& inputQuery, const bool useSkiff) { +void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& whereExpression, bool useSystemColumns) { // YQL-19382 - if (inputQuery) { - YQL_ENSURE(!useSkiff, "QLFilter can't work with skiff on map/mapreduce right now, try with PRAGMA yt.UseSkiff='false'"); + if (whereExpression) { const auto& inputs = specWithPaths.GetInputs(); - for (size_t i = 0; i < inputs.size(); ++i) { - auto path = inputs[i]; + YQL_ENSURE(inputs.size() == 1, "YtQLFilter: multiple inputs are not supported"); + auto path = inputs[0]; + const TString inputQuery = GenerateInputQuery(path, whereExpression, useSystemColumns); + if (path.Columns_) { path.Columns_.Clear(); - specWithPaths.SetInput(i, path); + specWithPaths.SetInput(0, path); } - SetInputQuerySpec(spec, inputQuery); + SetInputQuerySpec(spec, inputQuery, useSystemColumns); } } @@ -3594,10 +3604,10 @@ private: bool forceTransform = NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform); bool combineChunks = NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::CombineChunks); TMaybe<ui64> limit = GetLimit(merge.Settings().Ref()); - const TString inputQuery = GenerateInputQuery(merge.Settings().Ref(), execCtx->InputTables_); + const TString inputQueryExpr = GenerateInputQueryWhereExpression(merge.Settings().Ref()); - return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, inputQuery, execCtx]() { - return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, inputQuery, execCtx] (const auto& f) { + return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, inputQueryExpr, execCtx]() { + return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, inputQueryExpr, execCtx] (const auto& f) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); auto entry = execCtx->GetEntry(); bool cacheHit = f.GetValue(); @@ -3643,7 +3653,7 @@ private: spec["schema_inference_mode"] = "from_output"; // YTADMINREQ-17692 } - PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQuery); + PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQueryExpr); return execCtx->RunOperation([entry, mergeOpSpec = std::move(mergeOpSpec), spec = std::move(spec)](){ return entry->Tx->Merge(mergeOpSpec, TOperationOptions().StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec)); @@ -3662,13 +3672,13 @@ private: TString mapLambda, const TString& inputType, const TExpressionResorceUsage& extraUsage, - const TString& inputQuery, + const TString& inputQueryExpr, const TExecContext<TRunOptions>::TPtr& execCtx ) { const bool testRun = execCtx->Config_->GetLocalChainTest(); TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync(); return ret.Apply([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, - inputType, extraUsage, inputQuery, execCtx, testRun] (const auto& f) mutable + inputType, extraUsage, inputQueryExpr, execCtx, testRun] (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); TTransactionCache::TEntry::TPtr entry; @@ -3831,7 +3841,7 @@ private: spec["job_count"] = static_cast<i64>(*jobCount); } - PrepareInputQueryForMap(spec, mapOpSpec, inputQuery, useSkiff); + PrepareInputQueryForMap(spec, mapOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff); TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); @@ -3870,12 +3880,12 @@ private: } auto extraUsage = execCtx->ScanExtraResourceUsage(map.Mapper().Body().Ref(), true); TString inputType = NCommon::WriteTypeToYson(GetSequenceItemType(map.Input().Size() == 1U ? TExprBase(map.Input().Item(0)) : TExprBase(map.Mapper().Args().Arg(0)), true)); - const TString inputQuery = GenerateInputQuery(map.Settings().Ref(), execCtx->InputTables_); + const TString inputQueryExpr = GenerateInputQueryWhereExpression(map.Settings().Ref()); - return execCtx->Session_->Queue_->Async([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQuery, execCtx]() { + return execCtx->Session_->Queue_->Async([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQueryExpr, execCtx]() { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); execCtx->MakeUserFiles(); - return ExecMap(ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQuery, execCtx); + return ExecMap(ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQueryExpr, execCtx); }); } @@ -4103,14 +4113,14 @@ private: NYT::TNode intermediateMeta, const NYT::TNode& intermediateSchema, const NYT::TNode& intermediateStreams, - const TString& inputQuery, + const TString& inputQueryExpr, const TExecContext<TRunOptions>::TPtr& execCtx ) { const bool testRun = execCtx->Config_->GetLocalChainTest(); TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync(); return ret.Apply([reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage, reduceLambda, reduceInputType, reduceExtraUsage, - intermediateMeta, intermediateSchema, intermediateStreams, inputQuery, execCtx, testRun] + intermediateMeta, intermediateSchema, intermediateStreams, inputQueryExpr, execCtx, testRun] (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); @@ -4328,7 +4338,7 @@ private: spec["mapper"]["output_streams"] = intermediateStreams; } - PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQuery, useSkiff); + PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff); TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); @@ -4351,13 +4361,13 @@ private: const TExpressionResorceUsage& reduceExtraUsage, const NYT::TNode& intermediateSchema, bool useIntermediateStreams, - const TString& inputQuery, + const TString& inputQueryExpr, const TExecContext<TRunOptions>::TPtr& execCtx ) { const bool testRun = execCtx->Config_->GetLocalChainTest(); TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync(); return ret.Apply([reduceBy, sortBy, limit, sortLimitBy, reduceLambda, reduceInputType, - reduceExtraUsage, intermediateSchema, useIntermediateStreams, inputQuery, execCtx, testRun] + reduceExtraUsage, intermediateSchema, useIntermediateStreams, inputQueryExpr, execCtx, testRun] (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); @@ -4475,7 +4485,7 @@ private: spec["reducer"]["enable_input_table_index"] = true; } - PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQuery, useSkiff); + PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff); TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); @@ -4598,19 +4608,19 @@ private: limit.Clear(); } - const TString inputQuery = GenerateInputQuery(mapReduce.Settings().Ref(), execCtx->InputTables_); + const TString inputQueryExpr = GenerateInputQueryWhereExpression(mapReduce.Settings().Ref()); return execCtx->Session_->Queue_->Async([reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage, - reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, useIntermediateStreams, inputQuery, execCtx]() + reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, useIntermediateStreams, inputQueryExpr, execCtx]() { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); execCtx->MakeUserFiles(); if (mapLambda) { return ExecMapReduce(reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage, - reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, inputQuery, execCtx); + reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, inputQueryExpr, execCtx); } else { return ExecMapReduce(reduceBy, sortBy, limit, sortLimitBy, reduceLambda, reduceInputType, reduceExtraUsage, intermediateSchema, - useIntermediateStreams, inputQuery, execCtx); + useIntermediateStreams, inputQueryExpr, execCtx); } }); } diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql index 1f763a6b41..0c2df74c4d 100644 --- a/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql +++ b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql @@ -1,5 +1,7 @@ +/* yt can not */ +/* waiting for update YT-24048 */ + pragma yt.UseQLFilter; -PRAGMA yt.UseSkiff='false'; select c from plato.Input diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql index 3e074e24e6..a96d74390f 100644 --- a/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql +++ b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql @@ -1,5 +1,7 @@ +/* yt can not */ +/* waiting for update YT-24048 */ + pragma yt.UseQLFilter; -PRAGMA yt.UseSkiff='false'; select `escaping []\``, diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql index fa2d584077..27eda8b58c 100644 --- a/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql +++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql @@ -1,5 +1,7 @@ +/* yt can not */ +/* waiting for update YT-24048 */ + pragma yt.UseQLFilter; -pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644 select a, c, d, e from plato.Input diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.cfg new file mode 100644 index 0000000000..d0ce4581d7 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.sql new file mode 100644 index 0000000000..e721fdbca2 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.sql @@ -0,0 +1,13 @@ +pragma yt.UseQLFilter; +pragma yt.UseSkiff='false'; + +select a, c, d, e +from plato.Input +where + a > 5 + and + c > 5 + and + d > 5 + and + e > 5;
\ No newline at end of file diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql index 56051ba52b..2b95cbe376 100644 --- a/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql +++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql @@ -1,5 +1,7 @@ +/* yt can not */ +/* waiting for update YT-24048 */ + pragma yt.UseQLFilter; -pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644 select a, c, d, e from plato.Input diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql index bdf2ebbcd2..88d93eba98 100644 --- a/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql +++ b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql @@ -1,5 +1,7 @@ +/* yt can not */ +/* waiting for update YT-24048 */ + pragma yt.UseQLFilter; -pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644 select b from plato.Input |