aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <yulia@ydb.tech>2024-11-19 15:21:48 +0100
committerGitHub <noreply@github.com>2024-11-19 15:21:48 +0100
commit92bb6c70bfe1c162c1d2e6ffe2170e0438834018 (patch)
tree542a3822849e0d2224f538bbc0128b60bda52032
parent563d9af5befccb977bddc76fdb722e4a9c0c0e84 (diff)
downloadydb-92bb6c70bfe1c162c1d2e6ffe2170e0438834018.tar.gz
feat(kqp): add pragma for sequential reads (#11715)
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.cpp5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h2
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp39
4 files changed, 51 insertions, 0 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
index c73d34e7db..91d828e07d 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
@@ -110,6 +110,11 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim
matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
}
+ if (kqpCtx.Config->HasMaxSequentialReadsInFlight()) {
+ settings.SequentialInFlight = *kqpCtx.Config->MaxSequentialReadsInFlight.Get();
+ matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
+ }
+
TVector<TExprBase> inputs;
TVector<TCoArgument> args;
TNodeOnNodeOwnedMap argReplaces;
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp
index e19bc39952..4598d36f0b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp
@@ -95,6 +95,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
REGISTER_SETTING(*this, MaxTasksPerStage);
+ REGISTER_SETTING(*this, MaxSequentialReadsInFlight);
/* Runtime */
REGISTER_SETTING(*this, ScanQuery);
@@ -147,6 +148,10 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const {
return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled;
}
+bool TKikimrSettings::HasMaxSequentialReadsInFlight() const {
+ return !MaxSequentialReadsInFlight.Get().Empty();
+}
+
EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index 34f26f7e30..3eaeffa987 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -72,6 +72,7 @@ struct TKikimrSettings {
NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
+ NCommon::TConfSetting<ui32, false> MaxSequentialReadsInFlight;
/* Runtime */
NCommon::TConfSetting<bool, true> ScanQuery;
@@ -88,6 +89,7 @@ struct TKikimrSettings {
bool HasOptEnableOlapPushdown() const;
bool HasOptEnableOlapProvideComputeSharding() const;
bool HasOptUseFinalizeByKey() const;
+ bool HasMaxSequentialReadsInFlight() const;
EOptionalFlag GetOptPredicateExtract() const;
EOptionalFlag GetUseLlvm() const;
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 37e1037ba2..e73c17e793 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -4322,7 +4322,46 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
}
+ Y_UNIT_TEST_TWIN(SequentialReadsPragma, Enabled) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ NYdb::NTable::TExecDataQuerySettings querySettings;
+ querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
+
+ TString query = R"(
+ SELECT Key, Data FROM `/Root/EightShard`
+ WHERE Text = "Value1"
+ ORDER BY Key
+ LIMIT 1;
+ )";
+
+ if (Enabled) {
+ TString pragma = TString(R"(
+ PRAGMA ydb.MaxSequentialReadsInFlight = "1";
+ )");
+ query = pragma + query;
+ }
+
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), querySettings).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0)));
+
+ auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ for (const auto& phase : stats.query_phases()) {
+ for (const auto& access : phase.table_access()) {
+ if (access.name() == "/Root/EightShard") {
+ if (Enabled) {
+ UNIT_ASSERT_LT(access.partitions_count(), 8);
+ } else {
+ UNIT_ASSERT_EQUAL(access.partitions_count(), 8);
+ }
+ }
+ }
+ }
+ }
}