diff options
author | Iuliia Sidorina <yulia@ydb.tech> | 2024-11-19 15:21:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-19 15:21:48 +0100 |
commit | 92bb6c70bfe1c162c1d2e6ffe2170e0438834018 (patch) | |
tree | 542a3822849e0d2224f538bbc0128b60bda52032 | |
parent | 563d9af5befccb977bddc76fdb722e4a9c0c0e84 (diff) | |
download | ydb-92bb6c70bfe1c162c1d2e6ffe2170e0438834018.tar.gz |
feat(kqp): add pragma for sequential reads (#11715)
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 39 |
4 files changed, 51 insertions, 0 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp index c73d34e7db..91d828e07d 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp @@ -110,6 +110,11 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos()); } + if (kqpCtx.Config->HasMaxSequentialReadsInFlight()) { + settings.SequentialInFlight = *kqpCtx.Config->MaxSequentialReadsInFlight.Get(); + matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos()); + } + TVector<TExprBase> inputs; TVector<TCoArgument> args; TNodeOnNodeOwnedMap argReplaces; diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index e19bc39952..4598d36f0b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -95,6 +95,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, MaxDPccpDPTableSize); REGISTER_SETTING(*this, MaxTasksPerStage); + REGISTER_SETTING(*this, MaxSequentialReadsInFlight); /* Runtime */ REGISTER_SETTING(*this, ScanQuery); @@ -147,6 +148,10 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const { return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled; } +bool TKikimrSettings::HasMaxSequentialReadsInFlight() const { + return !MaxSequentialReadsInFlight.Get().Empty(); +} + EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const { return GetOptionalFlagValue(OptEnablePredicateExtract.Get()); } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 34f26f7e30..3eaeffa987 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -72,6 +72,7 @@ struct TKikimrSettings { NCommon::TConfSetting<ui32, false> MaxTasksPerStage; + NCommon::TConfSetting<ui32, false> MaxSequentialReadsInFlight; /* Runtime */ NCommon::TConfSetting<bool, true> ScanQuery; @@ -88,6 +89,7 @@ struct TKikimrSettings { bool HasOptEnableOlapPushdown() const; bool HasOptEnableOlapProvideComputeSharding() const; bool HasOptUseFinalizeByKey() const; + bool HasMaxSequentialReadsInFlight() const; EOptionalFlag GetOptPredicateExtract() const; EOptionalFlag GetUseLlvm() const; diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 37e1037ba2..e73c17e793 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -4322,7 +4322,46 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } } + Y_UNIT_TEST_TWIN(SequentialReadsPragma, Enabled) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + NYdb::NTable::TExecDataQuerySettings querySettings; + querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + + TString query = R"( + SELECT Key, Data FROM `/Root/EightShard` + WHERE Text = "Value1" + ORDER BY Key + LIMIT 1; + )"; + + if (Enabled) { + TString pragma = TString(R"( + PRAGMA ydb.MaxSequentialReadsInFlight = "1"; + )"); + query = pragma + query; + } + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), querySettings).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + for (const auto& phase : stats.query_phases()) { + for (const auto& access : phase.table_access()) { + if (access.name() == "/Root/EightShard") { + if (Enabled) { + UNIT_ASSERT_LT(access.partitions_count(), 8); + } else { + UNIT_ASSERT_EQUAL(access.partitions_count(), 8); + } + } + } + } + } } |