aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-05-16 14:23:37 +0300
committerssmike <ssmike@ydb.tech>2023-05-16 14:23:37 +0300
commita117bbcb18c304b8bad6ef1113d2356bb2f0f320 (patch)
tree0b2a1b31d97203ad11a7e9cdf079c086272cedbe
parentc6500a4c991881a561bf4e2c7187b410f3b5fbbd (diff)
downloadydb-a117bbcb18c304b8bad6ef1113d2356bb2f0f320.tar.gz
Disable partitioning for reads with limit
-rw-r--r--ydb/core/kqp/common/kqp_yql.cpp14
-rw-r--r--ydb/core/kqp/common/kqp_yql.h2
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp12
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h44
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp39
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.h3
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp4
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h2
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp4
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp29
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp_physical.proto1
-rw-r--r--ydb/core/protos/tx_datashard.proto2
16 files changed, 148 insertions, 19 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp
index ec488a54686..dfb523c203a 100644
--- a/ydb/core/kqp/common/kqp_yql.cpp
+++ b/ydb/core/kqp/common/kqp_yql.cpp
@@ -160,6 +160,9 @@ TKqpReadTableSettings ParseInternal(const TCoNameValueTupleList& node) {
} else if (name == TKqpReadTableSettings::SortedSettingName) {
YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
settings.Sorted = true;
+ } else if (name == TKqpReadTableSettings::SequentialSettingName) {
+ YQL_ENSURE(tuple.Ref().ChildrenSize() == 2);
+ settings.SequentialHint = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value());
} else {
YQL_ENSURE(false, "Unknown KqpReadTable setting name '" << name << "'");
}
@@ -228,6 +231,17 @@ NNodes::TCoNameValueTupleList TKqpReadTableSettings::BuildNode(TExprContext& ctx
.Done());
}
+ if (SequentialHint) {
+ settings.emplace_back(
+ Build<TCoNameValueTuple>(ctx, pos)
+ .Name()
+ .Build(SequentialSettingName)
+ .Value<TCoAtom>()
+ .Value(ToString(*SequentialHint))
+ .Build()
+ .Done());
+ }
+
return Build<TCoNameValueTupleList>(ctx, pos)
.Add(settings)
.Done();
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index 3713e742273..78454d78b1b 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -50,11 +50,13 @@ struct TKqpReadTableSettings {
static constexpr TStringBuf ItemsLimitSettingName = "ItemsLimit";
static constexpr TStringBuf ReverseSettingName = "Reverse";
static constexpr TStringBuf SortedSettingName = "Sorted";
+ static constexpr TStringBuf SequentialSettingName = "Sequential";
TVector<TString> SkipNullKeys;
TExprNode::TPtr ItemsLimit;
bool Reverse = false;
bool Sorted = false;
+ TMaybe<ui64> SequentialHint;
void AddSkipNullKey(const TString& key);
void SetItemsLimit(const TExprNode::TPtr& expr) { ItemsLimit = expr; }
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 423a55de80a..6952bf9ea88 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -367,6 +367,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
kqpConfig.EnablePredicateExtractForDataQuery = serviceConfig.GetEnablePredicateExtractForDataQueries();
kqpConfig.EnablePredicateExtractForScanQuery = serviceConfig.GetEnablePredicateExtractForScanQueries();
+ kqpConfig.EnableSequentialHints = serviceConfig.GetEnableSequentialHints();
}
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 160c313d7be..6032e7e8eaa 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -371,6 +371,8 @@ private:
bool enableKqpDataQueryPredicateExtract = Config.GetEnablePredicateExtractForDataQueries();
bool enableKqpScanQueryPredicateExtract = Config.GetEnablePredicateExtractForScanQueries();
+ bool enableSequentialHints = Config.GetEnableSequentialHints();
+
Config.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
@@ -383,7 +385,8 @@ private:
Config.GetEnableKqpDataQuerySourceRead() != enableKqpDataQuerySourceRead ||
Config.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead ||
Config.GetEnablePredicateExtractForDataQueries() != enableKqpDataQueryPredicateExtract ||
- Config.GetEnablePredicateExtractForScanQueries() != enableKqpScanQueryPredicateExtract) {
+ Config.GetEnablePredicateExtractForScanQueries() != enableKqpScanQueryPredicateExtract ||
+ Config.GetEnableSequentialHints() != enableSequentialHints) {
LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE,
"Iterator read flags was changed. StreamLookup from " << enableKqpDataQueryStreamLookup <<
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index bb952aa5aa5..1d07df03c15 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1469,7 +1469,7 @@ private:
}
case NKqpProto::TKqpPhyConnection::kStreamLookup:
- HasStreamLookup = true;
+ UnknownAffectedShardCount = true;
case NKqpProto::TKqpPhyConnection::kMap: {
partitionsCount = originStageInfo.Tasks.size();
break;
@@ -1667,7 +1667,11 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- readActors += BuildScanTasksFromSource(stageInfo, LockTxId);
+ if (auto actors = BuildScanTasksFromSource(stageInfo, LockTxId)) {
+ readActors += *actors;
+ } else {
+ UnknownAffectedShardCount = true;
+ }
break;
case NKqpProto::TKqpSource::kExternalSource:
BuildReadTasksFromSource(stageInfo);
@@ -1763,7 +1767,7 @@ private:
auto datashardTxs = BuildDatashardTxs(datashardTasks, topicTxs);
// Single-shard transactions are always immediate
- ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize() + readActors) <= 1 && !HasStreamLookup;
+ ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize() + readActors) <= 1 && !UnknownAffectedShardCount;
if (ImmediateTx) {
// Transaction cannot be both immediate and volatile
@@ -2324,7 +2328,7 @@ private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool StreamResult = false;
- bool HasStreamLookup = false;
+ bool UnknownAffectedShardCount = false;
ui64 TxCoordinator = 0;
THashMap<ui64, TShardState> ShardStates;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 267b3ca27ce..cf50500b210 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -700,7 +700,7 @@ protected:
}
}
- size_t BuildScanTasksFromSource(TStageInfo& stageInfo, const TMaybe<ui64> lockTxId = {}) {
+ TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const TMaybe<ui64> lockTxId = {}) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
@@ -720,7 +720,6 @@ protected:
YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap);
auto columns = BuildKqpColumns(source, table);
- auto partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
ui64 itemsLimit = 0;
@@ -730,15 +729,17 @@ protected:
const auto& snapshot = GetSnapshot();
- for (auto& [shardId, shardInfo] : partitions) {
+ auto addPartiton = [&](TMaybe<ui64> shardId, const TShardInfo& shardInfo, TMaybe<ui64> maxInFlightShards = Nothing()) {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.ExecuterId = this->SelfId();
- if (auto ptr = ShardIdToNodeId.FindPtr(shardId)) {
- task.Meta.NodeId = *ptr;
- } else {
- task.Meta.ShardId = shardId;
+ if (shardId) {
+ if (auto ptr = ShardIdToNodeId.FindPtr(*shardId)) {
+ task.Meta.NodeId = *ptr;
+ } else {
+ task.Meta.ShardId = *shardId;
+ }
}
for (auto& [name, value] : shardInfo.Params) {
@@ -785,9 +786,15 @@ protected:
settings.SetReverse(source.GetReverse());
settings.SetSorted(source.GetSorted());
- settings.SetShardIdHint(shardId);
- if (Stats) {
- Stats->AffectedShards.insert(shardId);
+ if (maxInFlightShards) {
+ settings.SetMaxInFlightShards(*maxInFlightShards);
+ }
+
+ if (shardId) {
+ settings.SetShardIdHint(*shardId);
+ if (Stats) {
+ Stats->AffectedShards.insert(*shardId);
+ }
}
ExtractItemsLimit(stageInfo, source.GetItemsLimit(), Request.TxAlloc->HolderFactory,
@@ -807,8 +814,23 @@ protected:
taskSourceSettings.ConstructInPlace();
taskSourceSettings->PackFrom(settings);
input.SourceType = NYql::KqpReadRangesSourceName;
+ };
+
+ if (source.GetSequentialAccessHint()) {
+ auto shardInfo = MakeFakePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
+ if (shardInfo.KeyReadRanges) {
+ addPartiton({}, shardInfo, source.GetSequentialAccessHint());
+ return {};
+ } else {
+ return 0;
+ }
+ } else {
+ THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
+ for (auto& [shardId, shardInfo] : partitions) {
+ addPartiton(shardId, shardInfo, {});
+ }
+ return partitions.size();
}
- return partitions.size();
}
protected:
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index e5ac9797408..19caff8c196 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -580,11 +580,11 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
return shardInfoMap;
}
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
+TVector<TSerializedPointOrRange> ExtractRanges(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
- const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv,
+ TGuard<NKikimr::NMiniKQL::TScopedAlloc>&)
{
- auto guard = typeEnv.BindAllocator();
const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
YQL_ENSURE(table);
@@ -607,6 +607,39 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
ranges = BuildFullRange(keyColumnTypes);
}
+ return ranges;
+}
+
+TShardInfo MakeFakePartition(const TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
+{
+ auto guard = typeEnv.BindAllocator();
+ auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard);
+ TShardInfo result;
+ for (auto& range: ranges) {
+ if (!result.KeyReadRanges) {
+ result.KeyReadRanges.ConstructInPlace();
+ }
+
+ result.KeyReadRanges->Add(std::move(range));
+ }
+
+ return result;
+}
+
+
+THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
+{
+ auto guard = typeEnv.BindAllocator();
+ const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
+ YQL_ENSURE(table);
+
+ const auto& keyColumnTypes = table->KeyColumnTypes;
+ auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard);
+
THashMap<ui64, TShardInfo> shardInfoMap;
// KeyReadRanges must be sorted & non-intersecting, they came in such condition from predicate extraction.
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
index 93c9fa4a002..3a2b9a61dfd 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
@@ -71,6 +71,9 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
+TShardInfo MakeFakePartition(const TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
index 59863118c78..56139dc31f4 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
@@ -37,6 +37,10 @@ TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqp
return node; // already set?
}
+ if (kqpCtx.Config.Get()->EnableSequentialHints) {
+ settings.SequentialHint = 1;
+ }
+
TMaybeNode<TExprBase> limitValue;
auto maybeTakeCount = take.Count().Maybe<TCoUint64>();
auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>();
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index ce461a9c1f2..ae76d4c484c 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -137,6 +137,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableKqpScanQueryStreamIdxLookupJoin = false;
bool EnablePredicateExtractForScanQuery = true;
bool EnablePredicateExtractForDataQuery = false;
+
+ bool EnableSequentialHints = false;
};
}
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index edb8445e1d3..eb8ae949f65 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -847,6 +847,10 @@ private:
readProto.SetSorted(readSettings.Sorted);
YQL_ENSURE(readSettings.SkipNullKeys.empty());
+ if (readSettings.SequentialHint) {
+ readProto.SetSequentialAccessHint(*readSettings.SequentialHint);
+ }
+
auto ranges = settings.RangesExpr().template Maybe<TCoParameter>();
if (ranges.IsValid()) {
auto& rangesParam = *readProto.MutableRanges();
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 54e55c19569..b8ce4b9714b 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -383,6 +383,10 @@ public:
}
Counters->ReadActorsCount->Inc();
Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId());
+
+ if (settings.HasMaxInFlightShards()) {
+ MaxInFlight = settings.GetMaxInFlightShards();
+ }
}
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 81acae2b567..16da405a897 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3664,6 +3664,35 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
}
+ Y_UNIT_TEST(DqSourceSequentialLimit) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
+ appConfig.MutableTableServiceConfig()->SetEnableSequentialHints(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ NKikimrTxDataShard::TEvRead evread;
+ evread.SetMaxRowsInResult(2);
+ InjectRangeEvReadSettings(evread);
+
+ NKikimrTxDataShard::TEvReadAck evreadack;
+ InjectRangeEvReadAckSettings(evreadack);
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ SELECT Key, Data FROM `/Root/EightShard` ORDER BY Key LIMIT 1;
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
Y_UNIT_TEST(DqSourceLocksEffects) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 13e505831d0..5cecf835a6d 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1283,6 +1283,7 @@ message TTableServiceConfig {
optional bool EnableKqpScanQueryStreamIdxLookupJoin = 35 [default = false];
optional bool EnablePredicateExtractForScanQueries = 36 [default = true];
optional bool EnablePredicateExtractForDataQueries = 37 [default = true];
+ optional bool EnableSequentialHints = 38 [default = false];
};
// Config describes immediate controls and allows
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 1d1ba2fe3b5..ba80ae6940c 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -281,6 +281,7 @@ message TKqpReadRangesSource {
}
repeated string SkipNullKeys = 8;
+ uint64 SequentialAccessHint = 9;
}
message TKqpExternalSource {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index b27fcbf1609..8a1f68e16cb 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -273,6 +273,8 @@ message TKqpReadRangesSourceSettings {
optional uint64 LockTxId = 13;
optional uint32 LockNodeId = 14;
+
+ optional uint64 MaxInFlightShards = 16;
}
message TKqpTaskInfo {