aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorlucius <lucius@yandex-team.com>2025-02-27 20:10:56 +0300
committerlucius <lucius@yandex-team.com>2025-02-27 20:47:25 +0300
commit75d63f434cc0f0361068d1b880ec9a8d0e251317 (patch)
tree2045e096c502afced11c821c29378a2c0de653d7 /yt
parent9ae9b76b16407d250878aafff784174478f1b769 (diff)
downloadydb-75d63f434cc0f0361068d1b880ec9a8d0e251317.tar.gz
YQL-19382 enable ql filter with skiff
Тесты временно пометил как "yt can not", потому что еще ждем обновления yt\_local в тестах commit_hash:e956b00a7c9b3c3c11ab6c6ed756eeac89c582ea
Diffstat (limited to 'yt')
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp108
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql4
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql4
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql4
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.sql13
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql4
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql4
8 files changed, 88 insertions, 54 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 196ff9a1c3..fb021d149c 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -361,69 +361,79 @@ void GenerateInputQueryWhereExpression(const TExprNode::TPtr& node, TStringBuild
}
}
-TString GenerateInputQuery(const TExprNode& settings, const TVector<TInputInfo>& inputs) {
+TString GenerateInputQueryWhereExpression(const TExprNode& settings) {
auto qlFilterNode = NYql::GetSetting(settings, EYtSettingType::QLFilter);
if (!qlFilterNode) {
return "";
}
- // TODO: how to handle multiple inputs?
- YQL_ENSURE(inputs.size() == 1, "YtQLFilter: multiple inputs are not supported");
-
qlFilterNode = qlFilterNode->ChildPtr(1);
YQL_ENSURE(qlFilterNode && qlFilterNode->IsCallable("YtQLFilter"));
const TYtQLFilter qlFilter(qlFilterNode);
const auto predicate = qlFilter.Predicate().Body();
TStringBuilder result;
- bool foundSomeColumns = false;
- for (const auto& table: inputs) {
- YQL_ENSURE(table.Path.Columns_ && table.Path.Columns_->Parts_, "YtQLFilter: TRichYPath should have columns");
- for (const auto& column: table.Path.Columns_->Parts_) {
- if (foundSomeColumns) {
+ GenerateInputQueryWhereExpression(predicate.Ptr(), result);
+ YQL_CLOG(INFO, ProviderYt) << __FUNCTION__ << ": " << result;
+ return result;
+}
+
+TString GenerateInputQuery(const TRichYPath& path, const TString& whereExpression, bool useSystemColumns) {
+ YQL_ENSURE(whereExpression);
+ TStringBuilder result;
+ if (!path.Columns_ || !path.Columns_->Parts_) {
+ result << "*";
+ } else {
+ bool first = true;
+ for (const auto& column: path.Columns_->Parts_) {
+ if (!first) {
result << ", ";
}
QuoteColumnForQL(column, result);
- foundSomeColumns = true;
+ first = false;
+ }
+ if (useSystemColumns) {
+ result << ", `$row_index`, `$range_index`";
}
}
- if (!foundSomeColumns) {
- result << "*";
- }
- result << " WHERE ";
- GenerateInputQueryWhereExpression(predicate.Ptr(), result);
+ result << " WHERE " << whereExpression;
YQL_CLOG(INFO, ProviderYt) << __FUNCTION__ << ": " << result;
return result;
}
-void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery) {
+void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery, bool useSystemColumns) {
spec["input_query"] = inputQuery;
spec["input_query_filter_options"]["enable_chunk_filter"] = true;
spec["input_query_filter_options"]["enable_row_filter"] = true;
+ if (useSystemColumns) {
+ spec["input_query_options"]["use_system_columns"] = true;
+ }
}
-void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& inputQuery) {
+void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& whereExpression) {
// YQL-19382
- if (inputQuery) {
- for (auto& path : paths) {
- path.Columns_.Clear();
- }
- SetInputQuerySpec(spec, inputQuery);
+ if (whereExpression) {
+ YQL_ENSURE(paths.size() == 1, "YtQLFilter: multiple inputs are not supported");
+ auto& path = paths[0];
+ const TString inputQuery = GenerateInputQuery(path, whereExpression, /*useSystemColumns*/ false);
+ path.Columns_.Clear();
+ SetInputQuerySpec(spec, inputQuery, /*useSystemColumns*/ false);
}
}
template <typename T>
-void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& inputQuery, const bool useSkiff) {
+void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& whereExpression, bool useSystemColumns) {
// YQL-19382
- if (inputQuery) {
- YQL_ENSURE(!useSkiff, "QLFilter can't work with skiff on map/mapreduce right now, try with PRAGMA yt.UseSkiff='false'");
+ if (whereExpression) {
const auto& inputs = specWithPaths.GetInputs();
- for (size_t i = 0; i < inputs.size(); ++i) {
- auto path = inputs[i];
+ YQL_ENSURE(inputs.size() == 1, "YtQLFilter: multiple inputs are not supported");
+ auto path = inputs[0];
+ const TString inputQuery = GenerateInputQuery(path, whereExpression, useSystemColumns);
+ if (path.Columns_) {
path.Columns_.Clear();
- specWithPaths.SetInput(i, path);
+ specWithPaths.SetInput(0, path);
}
- SetInputQuerySpec(spec, inputQuery);
+ SetInputQuerySpec(spec, inputQuery, useSystemColumns);
}
}
@@ -3594,10 +3604,10 @@ private:
bool forceTransform = NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform);
bool combineChunks = NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::CombineChunks);
TMaybe<ui64> limit = GetLimit(merge.Settings().Ref());
- const TString inputQuery = GenerateInputQuery(merge.Settings().Ref(), execCtx->InputTables_);
+ const TString inputQueryExpr = GenerateInputQueryWhereExpression(merge.Settings().Ref());
- return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, inputQuery, execCtx]() {
- return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, inputQuery, execCtx] (const auto& f) {
+ return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, inputQueryExpr, execCtx]() {
+ return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, inputQueryExpr, execCtx] (const auto& f) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
auto entry = execCtx->GetEntry();
bool cacheHit = f.GetValue();
@@ -3643,7 +3653,7 @@ private:
spec["schema_inference_mode"] = "from_output"; // YTADMINREQ-17692
}
- PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQuery);
+ PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQueryExpr);
return execCtx->RunOperation([entry, mergeOpSpec = std::move(mergeOpSpec), spec = std::move(spec)](){
return entry->Tx->Merge(mergeOpSpec, TOperationOptions().StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec));
@@ -3662,13 +3672,13 @@ private:
TString mapLambda,
const TString& inputType,
const TExpressionResorceUsage& extraUsage,
- const TString& inputQuery,
+ const TString& inputQueryExpr,
const TExecContext<TRunOptions>::TPtr& execCtx
) {
const bool testRun = execCtx->Config_->GetLocalChainTest();
TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync();
return ret.Apply([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda,
- inputType, extraUsage, inputQuery, execCtx, testRun] (const auto& f) mutable
+ inputType, extraUsage, inputQueryExpr, execCtx, testRun] (const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
TTransactionCache::TEntry::TPtr entry;
@@ -3831,7 +3841,7 @@ private:
spec["job_count"] = static_cast<i64>(*jobCount);
}
- PrepareInputQueryForMap(spec, mapOpSpec, inputQuery, useSkiff);
+ PrepareInputQueryForMap(spec, mapOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff);
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
@@ -3870,12 +3880,12 @@ private:
}
auto extraUsage = execCtx->ScanExtraResourceUsage(map.Mapper().Body().Ref(), true);
TString inputType = NCommon::WriteTypeToYson(GetSequenceItemType(map.Input().Size() == 1U ? TExprBase(map.Input().Item(0)) : TExprBase(map.Mapper().Args().Arg(0)), true));
- const TString inputQuery = GenerateInputQuery(map.Settings().Ref(), execCtx->InputTables_);
+ const TString inputQueryExpr = GenerateInputQueryWhereExpression(map.Settings().Ref());
- return execCtx->Session_->Queue_->Async([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQuery, execCtx]() {
+ return execCtx->Session_->Queue_->Async([ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQueryExpr, execCtx]() {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
execCtx->MakeUserFiles();
- return ExecMap(ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQuery, execCtx);
+ return ExecMap(ordered, blockInput, blockOutput, jobCount, limit, sortLimitBy, mapLambda, inputType, extraUsage, inputQueryExpr, execCtx);
});
}
@@ -4103,14 +4113,14 @@ private:
NYT::TNode intermediateMeta,
const NYT::TNode& intermediateSchema,
const NYT::TNode& intermediateStreams,
- const TString& inputQuery,
+ const TString& inputQueryExpr,
const TExecContext<TRunOptions>::TPtr& execCtx
) {
const bool testRun = execCtx->Config_->GetLocalChainTest();
TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync();
return ret.Apply([reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs,
mapExtraUsage, reduceLambda, reduceInputType, reduceExtraUsage,
- intermediateMeta, intermediateSchema, intermediateStreams, inputQuery, execCtx, testRun]
+ intermediateMeta, intermediateSchema, intermediateStreams, inputQueryExpr, execCtx, testRun]
(const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
@@ -4328,7 +4338,7 @@ private:
spec["mapper"]["output_streams"] = intermediateStreams;
}
- PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQuery, useSkiff);
+ PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff);
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
@@ -4351,13 +4361,13 @@ private:
const TExpressionResorceUsage& reduceExtraUsage,
const NYT::TNode& intermediateSchema,
bool useIntermediateStreams,
- const TString& inputQuery,
+ const TString& inputQueryExpr,
const TExecContext<TRunOptions>::TPtr& execCtx
) {
const bool testRun = execCtx->Config_->GetLocalChainTest();
TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync();
return ret.Apply([reduceBy, sortBy, limit, sortLimitBy, reduceLambda, reduceInputType,
- reduceExtraUsage, intermediateSchema, useIntermediateStreams, inputQuery, execCtx, testRun]
+ reduceExtraUsage, intermediateSchema, useIntermediateStreams, inputQueryExpr, execCtx, testRun]
(const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
@@ -4475,7 +4485,7 @@ private:
spec["reducer"]["enable_input_table_index"] = true;
}
- PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQuery, useSkiff);
+ PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff);
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
@@ -4598,19 +4608,19 @@ private:
limit.Clear();
}
- const TString inputQuery = GenerateInputQuery(mapReduce.Settings().Ref(), execCtx->InputTables_);
+ const TString inputQueryExpr = GenerateInputQueryWhereExpression(mapReduce.Settings().Ref());
return execCtx->Session_->Queue_->Async([reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage,
- reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, useIntermediateStreams, inputQuery, execCtx]()
+ reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, useIntermediateStreams, inputQueryExpr, execCtx]()
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
execCtx->MakeUserFiles();
if (mapLambda) {
return ExecMapReduce(reduceBy, sortBy, limit, sortLimitBy, mapLambda, mapInputType, mapDirectOutputs, mapExtraUsage,
- reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, inputQuery, execCtx);
+ reduceLambda, reduceInputType, reduceExtraUsage, intermediateMeta, intermediateSchema, intermediateStreams, inputQueryExpr, execCtx);
} else {
return ExecMapReduce(reduceBy, sortBy, limit, sortLimitBy, reduceLambda, reduceInputType, reduceExtraUsage, intermediateSchema,
- useIntermediateStreams, inputQuery, execCtx);
+ useIntermediateStreams, inputQueryExpr, execCtx);
}
});
}
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql
index 1f763a6b41..0c2df74c4d 100644
--- a/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_bounds.sql
@@ -1,5 +1,7 @@
+/* yt can not */
+/* waiting for update YT-24048 */
+
pragma yt.UseQLFilter;
-PRAGMA yt.UseSkiff='false';
select c
from plato.Input
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql
index 3e074e24e6..a96d74390f 100644
--- a/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_escaping.sql
@@ -1,5 +1,7 @@
+/* yt can not */
+/* waiting for update YT-24048 */
+
pragma yt.UseQLFilter;
-PRAGMA yt.UseSkiff='false';
select
`escaping []\``,
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql
index fa2d584077..27eda8b58c 100644
--- a/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_left.sql
@@ -1,5 +1,7 @@
+/* yt can not */
+/* waiting for update YT-24048 */
+
pragma yt.UseQLFilter;
-pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644
select a, c, d, e
from plato.Input
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.cfg
new file mode 100644
index 0000000000..d0ce4581d7
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.sql
new file mode 100644
index 0000000000..e721fdbca2
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_noskiff.sql
@@ -0,0 +1,13 @@
+pragma yt.UseQLFilter;
+pragma yt.UseSkiff='false';
+
+select a, c, d, e
+from plato.Input
+where
+ a > 5
+ and
+ c > 5
+ and
+ d > 5
+ and
+ e > 5; \ No newline at end of file
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql
index 56051ba52b..2b95cbe376 100644
--- a/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_many_right.sql
@@ -1,5 +1,7 @@
+/* yt can not */
+/* waiting for update YT-24048 */
+
pragma yt.UseQLFilter;
-pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644
select a, c, d, e
from plato.Input
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql
index bdf2ebbcd2..88d93eba98 100644
--- a/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_select_other.sql
@@ -1,5 +1,7 @@
+/* yt can not */
+/* waiting for update YT-24048 */
+
pragma yt.UseQLFilter;
-pragma yt.UseSkiff='false'; -- temporary disable skiff https://st.yandex-team.ru/YT-14644
select b
from plato.Input