diff options
author | ssmike <ssmike@ydb.tech> | 2023-05-16 14:23:37 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-05-16 14:23:37 +0300 |
commit | a117bbcb18c304b8bad6ef1113d2356bb2f0f320 (patch) | |
tree | 0b2a1b31d97203ad11a7e9cdf079c086272cedbe | |
parent | c6500a4c991881a561bf4e2c7187b410f3b5fbbd (diff) | |
download | ydb-a117bbcb18c304b8bad6ef1113d2356bb2f0f320.tar.gz |
Disable partitioning for reads with limit
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 44 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.cpp | 39 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 29 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 2 |
16 files changed, 148 insertions, 19 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index ec488a54686..dfb523c203a 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -160,6 +160,9 @@ TKqpReadTableSettings ParseInternal(const TCoNameValueTupleList& node) { } else if (name == TKqpReadTableSettings::SortedSettingName) { YQL_ENSURE(tuple.Ref().ChildrenSize() == 1); settings.Sorted = true; + } else if (name == TKqpReadTableSettings::SequentialSettingName) { + YQL_ENSURE(tuple.Ref().ChildrenSize() == 2); + settings.SequentialHint = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value()); } else { YQL_ENSURE(false, "Unknown KqpReadTable setting name '" << name << "'"); } @@ -228,6 +231,17 @@ NNodes::TCoNameValueTupleList TKqpReadTableSettings::BuildNode(TExprContext& ctx .Done()); } + if (SequentialHint) { + settings.emplace_back( + Build<TCoNameValueTuple>(ctx, pos) + .Name() + .Build(SequentialSettingName) + .Value<TCoAtom>() + .Value(ToString(*SequentialHint)) + .Build() + .Done()); + } + return Build<TCoNameValueTupleList>(ctx, pos) .Add(settings) .Done(); diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 3713e742273..78454d78b1b 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -50,11 +50,13 @@ struct TKqpReadTableSettings { static constexpr TStringBuf ItemsLimitSettingName = "ItemsLimit"; static constexpr TStringBuf ReverseSettingName = "Reverse"; static constexpr TStringBuf SortedSettingName = "Sorted"; + static constexpr TStringBuf SequentialSettingName = "Sequential"; TVector<TString> SkipNullKeys; TExprNode::TPtr ItemsLimit; bool Reverse = false; bool Sorted = false; + TMaybe<ui64> SequentialHint; void AddSkipNullKey(const TString& key); void SetItemsLimit(const TExprNode::TPtr& expr) { ItemsLimit = expr; } diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 423a55de80a..6952bf9ea88 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -367,6 +367,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin(); kqpConfig.EnablePredicateExtractForDataQuery = serviceConfig.GetEnablePredicateExtractForDataQueries(); kqpConfig.EnablePredicateExtractForScanQuery = serviceConfig.GetEnablePredicateExtractForScanQueries(); + kqpConfig.EnableSequentialHints = serviceConfig.GetEnableSequentialHints(); } IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 160c313d7be..6032e7e8eaa 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -371,6 +371,8 @@ private: bool enableKqpDataQueryPredicateExtract = Config.GetEnablePredicateExtractForDataQueries(); bool enableKqpScanQueryPredicateExtract = Config.GetEnablePredicateExtractForScanQueries(); + bool enableSequentialHints = Config.GetEnableSequentialHints(); + Config.Swap(event.MutableConfig()->MutableTableServiceConfig()); LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config"); @@ -383,7 +385,8 @@ private: Config.GetEnableKqpDataQuerySourceRead() != enableKqpDataQuerySourceRead || Config.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead || Config.GetEnablePredicateExtractForDataQueries() != enableKqpDataQueryPredicateExtract || - Config.GetEnablePredicateExtractForScanQueries() != enableKqpScanQueryPredicateExtract) { + Config.GetEnablePredicateExtractForScanQueries() != enableKqpScanQueryPredicateExtract || + Config.GetEnableSequentialHints() != enableSequentialHints) { LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Iterator read flags was changed. StreamLookup from " << enableKqpDataQueryStreamLookup << diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index bb952aa5aa5..1d07df03c15 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1469,7 +1469,7 @@ private: } case NKqpProto::TKqpPhyConnection::kStreamLookup: - HasStreamLookup = true; + UnknownAffectedShardCount = true; case NKqpProto::TKqpPhyConnection::kMap: { partitionsCount = originStageInfo.Tasks.size(); break; @@ -1667,7 +1667,11 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - readActors += BuildScanTasksFromSource(stageInfo, LockTxId); + if (auto actors = BuildScanTasksFromSource(stageInfo, LockTxId)) { + readActors += *actors; + } else { + UnknownAffectedShardCount = true; + } break; case NKqpProto::TKqpSource::kExternalSource: BuildReadTasksFromSource(stageInfo); @@ -1763,7 +1767,7 @@ private: auto datashardTxs = BuildDatashardTxs(datashardTasks, topicTxs); // Single-shard transactions are always immediate - ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize() + readActors) <= 1 && !HasStreamLookup; + ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize() + readActors) <= 1 && !UnknownAffectedShardCount; if (ImmediateTx) { // Transaction cannot be both immediate and volatile @@ -2324,7 +2328,7 @@ private: NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; bool StreamResult = false; - bool HasStreamLookup = false; + bool UnknownAffectedShardCount = false; ui64 TxCoordinator = 0; THashMap<ui64, TShardState> ShardStates; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 267b3ca27ce..cf50500b210 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -700,7 +700,7 @@ protected: } } - size_t BuildScanTasksFromSource(TStageInfo& stageInfo, const TMaybe<ui64> lockTxId = {}) { + TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const TMaybe<ui64> lockTxId = {}) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; @@ -720,7 +720,6 @@ protected: YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap); auto columns = BuildKqpColumns(source, table); - auto partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); ui64 itemsLimit = 0; @@ -730,15 +729,17 @@ protected: const auto& snapshot = GetSnapshot(); - for (auto& [shardId, shardInfo] : partitions) { + auto addPartiton = [&](TMaybe<ui64> shardId, const TShardInfo& shardInfo, TMaybe<ui64> maxInFlightShards = Nothing()) { YQL_ENSURE(!shardInfo.KeyWriteRanges); auto& task = TasksGraph.AddTask(stageInfo); task.Meta.ExecuterId = this->SelfId(); - if (auto ptr = ShardIdToNodeId.FindPtr(shardId)) { - task.Meta.NodeId = *ptr; - } else { - task.Meta.ShardId = shardId; + if (shardId) { + if (auto ptr = ShardIdToNodeId.FindPtr(*shardId)) { + task.Meta.NodeId = *ptr; + } else { + task.Meta.ShardId = *shardId; + } } for (auto& [name, value] : shardInfo.Params) { @@ -785,9 +786,15 @@ protected: settings.SetReverse(source.GetReverse()); settings.SetSorted(source.GetSorted()); - settings.SetShardIdHint(shardId); - if (Stats) { - Stats->AffectedShards.insert(shardId); + if (maxInFlightShards) { + settings.SetMaxInFlightShards(*maxInFlightShards); + } + + if (shardId) { + settings.SetShardIdHint(*shardId); + if (Stats) { + Stats->AffectedShards.insert(*shardId); + } } ExtractItemsLimit(stageInfo, source.GetItemsLimit(), Request.TxAlloc->HolderFactory, @@ -807,8 +814,23 @@ protected: taskSourceSettings.ConstructInPlace(); taskSourceSettings->PackFrom(settings); input.SourceType = NYql::KqpReadRangesSourceName; + }; + + if (source.GetSequentialAccessHint()) { + auto shardInfo = MakeFakePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); + if (shardInfo.KeyReadRanges) { + addPartiton({}, shardInfo, source.GetSequentialAccessHint()); + return {}; + } else { + return 0; + } + } else { + THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); + for (auto& [shardId, shardInfo] : partitions) { + addPartiton(shardId, shardInfo, {}); + } + return partitions.size(); } - return partitions.size(); } protected: diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index e5ac9797408..19caff8c196 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -580,11 +580,11 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, return shardInfoMap; } -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, +TVector<TSerializedPointOrRange> ExtractRanges(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, - const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, + TGuard<NKikimr::NMiniKQL::TScopedAlloc>&) { - auto guard = typeEnv.BindAllocator(); const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); YQL_ENSURE(table); @@ -607,6 +607,39 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, ranges = BuildFullRange(keyColumnTypes); } + return ranges; +} + +TShardInfo MakeFakePartition(const TKqpTableKeys& tableKeys, + const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) +{ + auto guard = typeEnv.BindAllocator(); + auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard); + TShardInfo result; + for (auto& range: ranges) { + if (!result.KeyReadRanges) { + result.KeyReadRanges.ConstructInPlace(); + } + + result.KeyReadRanges->Add(std::move(range)); + } + + return result; +} + + +THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, + const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) +{ + auto guard = typeEnv.BindAllocator(); + const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); + YQL_ENSURE(table); + + const auto& keyColumnTypes = table->KeyColumnTypes; + auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard); + THashMap<ui64, TShardInfo> shardInfoMap; // KeyReadRanges must be sorted & non-intersecting, they came in such condition from predicate extraction. diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h index 93c9fa4a002..3a2b9a61dfd 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h @@ -71,6 +71,9 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); +TShardInfo MakeFakePartition(const TKqpTableKeys& tableKeys, + const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp index 59863118c78..56139dc31f4 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp @@ -37,6 +37,10 @@ TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqp return node; // already set? } + if (kqpCtx.Config.Get()->EnableSequentialHints) { + settings.SequentialHint = 1; + } + TMaybeNode<TExprBase> limitValue; auto maybeTakeCount = take.Count().Maybe<TCoUint64>(); auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>(); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index ce461a9c1f2..ae76d4c484c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -137,6 +137,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableKqpScanQueryStreamIdxLookupJoin = false; bool EnablePredicateExtractForScanQuery = true; bool EnablePredicateExtractForDataQuery = false; + + bool EnableSequentialHints = false; }; } diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index edb8445e1d3..eb8ae949f65 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -847,6 +847,10 @@ private: readProto.SetSorted(readSettings.Sorted); YQL_ENSURE(readSettings.SkipNullKeys.empty()); + if (readSettings.SequentialHint) { + readProto.SetSequentialAccessHint(*readSettings.SequentialHint); + } + auto ranges = settings.RangesExpr().template Maybe<TCoParameter>(); if (ranges.IsValid()) { auto& rangesParam = *readProto.MutableRanges(); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 54e55c19569..b8ce4b9714b 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -383,6 +383,10 @@ public: } Counters->ReadActorsCount->Inc(); Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId()); + + if (settings.HasMaxInFlightShards()) { + MaxInFlight = settings.GetMaxInFlightShards(); + } } static constexpr NKikimrServices::TActivity::EType ActorActivityType() { diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 81acae2b567..16da405a897 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3664,6 +3664,35 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } } + Y_UNIT_TEST(DqSourceSequentialLimit) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); + appConfig.MutableTableServiceConfig()->SetEnableSequentialHints(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(2); + InjectRangeEvReadSettings(evread); + + NKikimrTxDataShard::TEvReadAck evreadack; + InjectRangeEvReadAckSettings(evreadack); + + { + auto result = session.ExecuteDataQuery(R"( + SELECT Key, Data FROM `/Root/EightShard` ORDER BY Key LIMIT 1; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + Y_UNIT_TEST(DqSourceLocksEffects) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 13e505831d0..5cecf835a6d 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1283,6 +1283,7 @@ message TTableServiceConfig { optional bool EnableKqpScanQueryStreamIdxLookupJoin = 35 [default = false]; optional bool EnablePredicateExtractForScanQueries = 36 [default = true]; optional bool EnablePredicateExtractForDataQueries = 37 [default = true]; + optional bool EnableSequentialHints = 38 [default = false]; }; // Config describes immediate controls and allows diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 1d1ba2fe3b5..ba80ae6940c 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -281,6 +281,7 @@ message TKqpReadRangesSource { } repeated string SkipNullKeys = 8; + uint64 SequentialAccessHint = 9; } message TKqpExternalSource { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index b27fcbf1609..8a1f68e16cb 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -273,6 +273,8 @@ message TKqpReadRangesSourceSettings { optional uint64 LockTxId = 13; optional uint32 LockNodeId = 14; + + optional uint64 MaxInFlightShards = 16; } message TKqpTaskInfo { |