diff options
author | Sergei Puchin <s.puchin@gmail.com> | 2022-06-20 20:12:49 +0300 |
---|---|---|
committer | Sergei Puchin <s.puchin@gmail.com> | 2022-06-20 20:12:49 +0300 |
commit | 13bffcce474f3588b3e177ea6c6015f16ded9ef6 (patch) | |
tree | 5717b823ceaef323b2152b1e8e916ddfbe958928 | |
parent | 37b0429b14b6423eba85f06cc9e942db2893490e (diff) | |
download | ydb-13bffcce474f3588b3e177ea6c6015f16ded9ef6.tar.gz |
Fast path for obtaining full table partitioning from scheme cache. (KIKIMR-15147)
ref:bc6ff3951a8846e0fdfebf0884e1665564058773
28 files changed, 206 insertions, 135 deletions
diff --git a/ydb/core/client/server/msgbus_server_s3_listing.cpp b/ydb/core/client/server/msgbus_server_s3_listing.cpp index 3626866322..e021ffee12 100644 --- a/ydb/core/client/server/msgbus_server_s3_listing.cpp +++ b/ydb/core/client/server/msgbus_server_s3_listing.cpp @@ -382,9 +382,10 @@ private: return JoinVectorIntoString(shards, ", "); }; - LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " << getShardsString(KeyRange->Partitions)); + LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " + << getShardsString(KeyRange->GetPartitions())); - if (KeyRange->Partitions.size() > 0) { + if (KeyRange->GetPartitions().size() > 0) { CurrentShardIdx = 0; MakeShardRequest(CurrentShardIdx, ctx); } else { @@ -393,7 +394,7 @@ private: } void MakeShardRequest(ui32 idx, const NActors::TActorContext& ctx) { - ui64 shardId = KeyRange->Partitions[idx].ShardId; + ui64 shardId = KeyRange->GetPartitions()[idx].ShardId; THolder<TEvDataShard::TEvS3ListingRequest> ev(new TEvDataShard::TEvS3ListingRequest()); ev->Record.SetTableId(KeyRange->TableId.PathId.LocalPathId); @@ -472,7 +473,7 @@ private: ContentsRows.emplace_back(shardResponse.GetContentsRows(i)); } - if (CurrentShardIdx+1 < KeyRange->Partitions.size() && + if (CurrentShardIdx+1 < KeyRange->GetPartitions().size() && MaxKeys > ContentsRows.size() + CommonPrefixesRows.size() && shardResponse.GetMoreRows()) { diff --git a/ydb/core/engine/minikql/flat_local_tx_minikql.h b/ydb/core/engine/minikql/flat_local_tx_minikql.h index 0feabf20e6..d7387ed8f4 100644 --- a/ydb/core/engine/minikql/flat_local_tx_minikql.h +++ b/ydb/core/engine/minikql/flat_local_tx_minikql.h @@ -261,7 +261,10 @@ class TFlatLocalMiniKQL : public NTabletFlatExecutor::ITransaction { for (auto &key : proxyEngine->GetDbKeys()) { key->Status = TKeyDesc::EStatus::Ok; - key->Partitions.push_back(TKeyDesc::TPartitionInfo(TabletId)); + + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); + partitions->push_back(TKeyDesc::TPartitionInfo(TabletId)); + key->Partitioning = partitions; for (const auto &x : key->Columns) { key->ColumnInfos.push_back({x.Column, x.ExpectedType, 0, TKeyDesc::EStatus::Ok}); // type-check diff --git a/ydb/core/engine/mkql_engine_flat.cpp b/ydb/core/engine/mkql_engine_flat.cpp index 32e176a77a..3c972e6961 100644 --- a/ydb/core/engine/mkql_engine_flat.cpp +++ b/ydb/core/engine/mkql_engine_flat.cpp @@ -455,7 +455,7 @@ public: for (auto& x : ProxyCallables) { auto name = x.second.Node->GetType()->GetNameStr(); if (name == Strings.EraseRow || name == Strings.UpdateRow) { - proxyShards[x.first] = &x.second.Key->Partitions; + proxyShards[x.first] = &x.second.Key->GetPartitions(); } } @@ -476,7 +476,7 @@ public: if (readsets.size() > limits.RSCount) { THashMap<ui64, TTableId> tableMap; for (auto& key : DbKeys) { - for (auto& partition : key->Partitions) { + for (auto& partition : key->GetPartitions()) { tableMap[partition.ShardId] = key->TableId; } } @@ -1498,7 +1498,7 @@ private: }; static void AddShards(TSet<ui64>& set, const TKeyDesc& key) { - for (auto& partition : key.Partitions) { + for (auto& partition : key.GetPartitions()) { Y_VERIFY(partition.ShardId); set.insert(partition.ShardId); } @@ -1576,7 +1576,7 @@ private: auto uniqueName = ToString(callable->GetUniqueId()); shardsForReadBuilder.Add(uniqueName, ctx.ShardsForRead); - for (auto& partition : ctx.Key->Partitions) { + for (auto& partition : ctx.Key->GetPartitions()) { auto shardForRead = partition.ShardId; if (shardForRead != shard) { readsets.insert(std::make_pair(shardForRead, shard)); @@ -1589,7 +1589,7 @@ private: auto key = ctx.Key; Y_VERIFY(key); - for (auto& partition : key->Partitions) { + for (auto& partition : key->GetPartitions()) { if (partition.ShardId == shard) { return true; } @@ -2039,7 +2039,7 @@ private: Y_VERIFY(key); TListLiteralBuilder listOfShards(Env, Ui64Type); - for (auto& partition : key->Partitions) { + for (auto& partition : key->GetPartitions()) { listOfShards.Add(TRuntimeNode(BuildDataLiteral(NUdf::TUnboxedValuePod(partition.ShardId), NUdf::TDataType<ui64>::Id, Env), true)); } diff --git a/ydb/core/engine/mkql_engine_flat_ut.cpp b/ydb/core/engine/mkql_engine_flat_ut.cpp index e2b12e2d7a..7c547889c1 100644 --- a/ydb/core/engine/mkql_engine_flat_ut.cpp +++ b/ydb/core/engine/mkql_engine_flat_ut.cpp @@ -88,18 +88,26 @@ namespace { void SingleShardResolver(TKeyDesc& key) { key.Status = TKeyDesc::EStatus::Ok; - key.Partitions.push_back(TKeyDesc::TPartitionInfo(Shard1)); + + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); + partitions->push_back(TKeyDesc::TPartitionInfo(Shard1)); + key.Partitioning = partitions; } void DoubleShardResolver(TKeyDesc& key) { key.Status = TKeyDesc::EStatus::Ok; - key.Partitions.push_back(TKeyDesc::TPartitionInfo(Shard1)); - key.Partitions.push_back(TKeyDesc::TPartitionInfo(Shard2)); + + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); + partitions->push_back(TKeyDesc::TPartitionInfo(Shard1)); + partitions->push_back(TKeyDesc::TPartitionInfo(Shard2)); + key.Partitioning = partitions; } void TwoShardResolver(TKeyDesc& key) { key.Status = TKeyDesc::EStatus::Ok; - key.Partitions.push_back(TKeyDesc::TPartitionInfo(key.TableId.PathId.LocalPathId == Table1Id ? Shard1 : Shard2)); + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); + partitions->push_back(TKeyDesc::TPartitionInfo(key.TableId.PathId.LocalPathId == Table1Id ? Shard1 : Shard2)); + key.Partitioning = partitions; } struct TDriver { diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp index c782b04de7..d9b32d742a 100644 --- a/ydb/core/grpc_services/rpc_import_data.cpp +++ b/ydb/core/grpc_services/rpc_import_data.cpp @@ -79,10 +79,10 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque static ui64 GetShardId(const TTableRange& range, const TKeyDesc* keyDesc) { Y_VERIFY(range.Point); - Y_VERIFY(!keyDesc->Partitions.empty()); + Y_VERIFY(!keyDesc->GetPartitions().empty()); TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( - keyDesc->Partitions.begin(), keyDesc->Partitions.end(), true, + keyDesc->GetPartitions().begin(), keyDesc->GetPartitions().end(), true, [&](const TKeyDesc::TPartitionInfo& partition, bool) { const int cmp = CompareBorders<true, false>( partition.Range->EndKeyPrefix.GetCells(), range.From, @@ -94,7 +94,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque } ); - Y_VERIFY(it != keyDesc->Partitions.end()); + Y_VERIFY(it != keyDesc->GetPartitions().end()); return it->ShardId; } @@ -216,7 +216,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque return Reply(StatusIds::SCHEME_ERROR, TIssuesIds::GENERIC_RESOLVE_ERROR); } - if (KeyDesc->Partitions.empty()) { + if (KeyDesc->GetPartitions().empty()) { return Reply(StatusIds::SCHEME_ERROR, TIssuesIds::GENERIC_RESOLVE_ERROR); } diff --git a/ydb/core/grpc_services/rpc_kh_describe.cpp b/ydb/core/grpc_services/rpc_kh_describe.cpp index 70c8767088..9882b0abb1 100644 --- a/ydb/core/grpc_services/rpc_kh_describe.cpp +++ b/ydb/core/grpc_services/rpc_kh_describe.cpp @@ -301,9 +301,9 @@ private: LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Table [" << TEvKikhouseDescribeTableRequest::GetProtoRequest(Request)->path() - << "] shards: " << getShardsString(KeyRange->Partitions)); + << "] shards: " << getShardsString(KeyRange->GetPartitions())); - for (const TKeyDesc::TPartitionInfo& partition : KeyRange->Partitions) { + for (const TKeyDesc::TPartitionInfo& partition : KeyRange->GetPartitions()) { auto* p = Result.add_partitions(); p->set_tablet_id(partition.ShardId); p->set_end_key(partition.Range->EndKeyPrefix.GetBuffer()); diff --git a/ydb/core/grpc_services/rpc_read_columns.cpp b/ydb/core/grpc_services/rpc_read_columns.cpp index ccfaf1c1ff..d9f17687a9 100644 --- a/ydb/core/grpc_services/rpc_read_columns.cpp +++ b/ydb/core/grpc_services/rpc_read_columns.cpp @@ -617,13 +617,14 @@ private: return JoinVectorIntoString(shards, ", "); }; - LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " << getShardsString(KeyRange->Partitions)); + LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " + << getShardsString(KeyRange->GetPartitions())); MakeShardRequests(ctx); } void MakeShardRequests(const NActors::TActorContext& ctx) { - Y_VERIFY(!KeyRange->Partitions.empty()); + Y_VERIFY(!KeyRange->GetPartitions().empty()); auto proto = GetProtoRequest(); // Send request to the first shard @@ -642,7 +643,7 @@ private: ev->Record.SetSnapshotTxId(SnapshotId.TxId); } - ui64 shardId = KeyRange->Partitions[0].ShardId; + ui64 shardId = KeyRange->GetPartitions()[0].ShardId; LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Sending request to shards " << shardId); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index e080777290..50995dc2e1 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -529,7 +529,7 @@ private: auto keyDesc = std::move(request->ResultSet[0].KeyDescription); - if (keyDesc->Partitions.empty()) { + if (keyDesc->GetPartitions().empty()) { TString error = TStringBuilder() << "No partitions to read from '" << ScanData->TablePath << "'"; CA_LOG_E(error); InternalError(TIssuesIds::KIKIMR_SCHEME_ERROR, error); @@ -539,16 +539,16 @@ private: const auto& tr = *AppData()->TypeRegistry; TVector<TShardState> newShards; - newShards.reserve(keyDesc->Partitions.size()); + newShards.reserve(keyDesc->GetPartitions().size()); - for (ui64 idx = 0, i = 0; idx < keyDesc->Partitions.size(); ++idx) { - const auto& partition = keyDesc->Partitions[idx]; + for (ui64 idx = 0, i = 0; idx < keyDesc->GetPartitions().size(); ++idx) { + const auto& partition = keyDesc->GetPartitions()[idx]; TTableRange partitionRange{ - idx == 0 ? state.Ranges.front().From.GetCells() : keyDesc->Partitions[idx - 1].Range->EndKeyPrefix.GetCells(), - idx == 0 ? state.Ranges.front().FromInclusive : !keyDesc->Partitions[idx - 1].Range->IsInclusive, - keyDesc->Partitions[idx].Range->EndKeyPrefix.GetCells(), - keyDesc->Partitions[idx].Range->IsInclusive + idx == 0 ? state.Ranges.front().From.GetCells() : keyDesc->GetPartitions()[idx - 1].Range->EndKeyPrefix.GetCells(), + idx == 0 ? state.Ranges.front().FromInclusive : !keyDesc->GetPartitions()[idx - 1].Range->IsInclusive, + keyDesc->GetPartitions()[idx].Range->EndKeyPrefix.GetCells(), + keyDesc->GetPartitions()[idx].Range->IsInclusive }; CA_LOG_D("Processing resolved ShardId# " << partition.ShardId diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp index bebba15fcc..28c715e31b 100644 --- a/ydb/core/kqp/executer/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp @@ -54,17 +54,17 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(const NDq::TMkqlV auto keyValue = MakeKeyCells(paramValue, table.KeyColumnTypes, keyColumnIndices, typeEnv, /* copyValues */ true); Y_VERIFY_DEBUG(keyValue.size() == keyLen); - ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.Partitions, table.KeyColumnTypes, + ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), table.KeyColumnTypes, [] (const auto& partition) { return *partition.Range; }); - ui64 shardId = key.Partitions[partitionIndex].ShardId; + ui64 shardId = key.GetPartitions()[partitionIndex].ShardId; shardParamValues[shardId].emplace_back(std::move(paramValue)); auto point = TSerializedCellVec(TSerializedCellVec::Serialize(keyValue)); auto& shardData = ret[shardId]; - if (key.Partitions[partitionIndex].Range->IsPoint) { + if (key.GetPartitions()[partitionIndex].Range->IsPoint) { // singular case when partition is just a point shardData.FullRange.emplace(TSerializedTableRange(point.GetBuffer(), "", true, true)); shardData.FullRange->Point = true; @@ -141,7 +141,7 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(const NDq:: auto range = TTableRange(fromValues, /* inclusiveFrom */ true, point ? TConstArrayRef<TCell>() : toValuesPrefix, /* inclusiveTo */ true, /* point */ point); - TVector<TPartitionWithRange> rangePartitions = GetKeyRangePartitions(range, key.Partitions, keyFullType); + TVector<TPartitionWithRange> rangePartitions = GetKeyRangePartitions(range, key.GetPartitions(), keyFullType); for (TPartitionWithRange& partitionWithRange : rangePartitions) { ui64 shardId = partitionWithRange.PartitionInfo->ShardId; @@ -437,8 +437,8 @@ TSerializedTableRange MakeKeyRange(const TVector<NUdf::TDataTypeId>& keyColumnTy namespace { void FillFullRange(const TStageInfo& stageInfo, THashMap<ui64, TShardInfo>& shardInfoMap, bool read) { - for (ui64 i = 0; i < stageInfo.Meta.ShardKey->Partitions.size(); ++i) { - auto& partition = stageInfo.Meta.ShardKey->Partitions[i]; + for (ui64 i = 0; i < stageInfo.Meta.ShardKey->GetPartitions().size(); ++i) { + auto& partition = stageInfo.Meta.ShardKey->GetPartitions()[i]; auto& partitionRange = *partition.Range; auto& shardInfo = shardInfoMap[partition.ShardId]; @@ -456,7 +456,7 @@ void FillFullRange(const TStageInfo& stageInfo, THashMap<ui64, TShardInfo>& shar } if (i != 0) { - auto& prevPartition = stageInfo.Meta.ShardKey->Partitions[i - 1]; + auto& prevPartition = stageInfo.Meta.ShardKey->GetPartitions()[i - 1]; ranges->MakeFull(TSerializedTableRange(prevPartition.Range->EndKeyPrefix.GetCells(), !prevPartition.Range->IsInclusive, partitionRange.EndKeyPrefix.GetCells(), partitionRange.IsInclusive)); @@ -499,7 +499,8 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, YQL_ENSURE(readRange.HasKeyRange()); auto range = MakeKeyRange(keyColumnTypes, readRange.GetKeyRange(), stageInfo, holderFactory, typeEnv); - auto readPartitions = GetKeyRangePartitions(range.ToTableRange(), stageInfo.Meta.ShardKey->Partitions, keyColumnTypes); + auto readPartitions = GetKeyRangePartitions(range.ToTableRange(), stageInfo.Meta.ShardKey->GetPartitions(), + keyColumnTypes); THashMap<ui64, TShardInfo> shardInfoMap; for (TPartitionWithRange& partitionWithRange : readPartitions) { @@ -536,7 +537,8 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, ? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true) : TTableRange(std::get<TSerializedTableRange>(range).ToTableRange()); - auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->Partitions, keyColumnTypes); + auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), + keyColumnTypes); for (TPartitionWithRange& partitionWithRange : readPartitions) { auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId]; @@ -575,7 +577,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, if (ranges.empty()) return shardInfoMap; - for (const auto& partition : stageInfo.Meta.ShardKey->Partitions) { + for (const auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) { auto& shardInfo = shardInfoMap[partition.ShardId]; YQL_ENSURE(!shardInfo.KeyReadRanges); @@ -720,7 +722,7 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow } auto range = TTableRange(keyFrom, true, keyTo, true, /* point */ false); - auto partitions = GetKeyRangePartitions(range, stageInfo.Meta.ShardKey->Partitions, table.KeyColumnTypes); + auto partitions = GetKeyRangePartitions(range, stageInfo.Meta.ShardKey->GetPartitions(), table.KeyColumnTypes); for (auto& partitionWithRange: partitions) { ui64 shardId = partitionWithRange.PartitionInfo->ShardId; diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 5c7c57d6fe..135890f36e 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -95,7 +95,7 @@ public: TSet<ui64> shardIds; for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) { if (stageInfo.Meta.ShardKey) { - for (auto& partition : stageInfo.Meta.ShardKey->Partitions) { + for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) { shardIds.insert(partition.ShardId); } } diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp index 846aba3be2..86613a29d8 100644 --- a/ydb/core/kqp/executer/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp @@ -196,7 +196,7 @@ private: return; } - for (auto& partition : entry.KeyDescription->Partitions) { + for (auto& partition : entry.KeyDescription->GetPartitions()) { YQL_ENSURE(partition.Range); } diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp index 0002fbfe21..ac9792ed9f 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp @@ -159,7 +159,7 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf { YQL_ENSURE(stageInfo.Meta.ShardKey); THashMap<ui64, const TKeyDesc::TPartitionInfo*> partitionsMap; - for (auto& partition : stageInfo.Meta.ShardKey->Partitions) { + for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) { partitionsMap[partition.ShardId] = &partition; } diff --git a/ydb/core/scheme/scheme_tabledefs.cpp b/ydb/core/scheme/scheme_tabledefs.cpp index 8c91e03d0e..100f23ce62 100644 --- a/ydb/core/scheme/scheme_tabledefs.cpp +++ b/ydb/core/scheme/scheme_tabledefs.cpp @@ -10,6 +10,28 @@ bool TTableRange::IsEmptyRange(TConstArrayRef<const NScheme::TTypeId> cellTypeId return (compares < 0); } +bool TTableRange::IsFullRange(ui32 columnsCount) const { + if (!InclusiveFrom) { + return false; + } + + if (!To.empty()) { + return false; + } + + if (!From.size() == columnsCount) { + return false; + } + + for (const auto& value : From) { + if (value) { + return false; + } + } + + return true; +} + bool TSerializedTableRange::IsEmpty(TConstArrayRef<NScheme::TTypeId> type) const { auto cmp = CompareBorders<true, false>(To.GetCells(), From.GetCells(), ToInclusive, FromInclusive, type); diff --git a/ydb/core/scheme/scheme_tabledefs.h b/ydb/core/scheme/scheme_tabledefs.h index 190233fdee..1e09a1e723 100644 --- a/ydb/core/scheme/scheme_tabledefs.h +++ b/ydb/core/scheme/scheme_tabledefs.h @@ -175,6 +175,7 @@ public: } bool IsEmptyRange(TConstArrayRef<const NScheme::TTypeId> cellTypeIds) const; + bool IsFullRange(ui32 columnsCount) const; }; class TSerializedTableRange { @@ -675,10 +676,11 @@ public: // out EStatus Status; TVector<TColumnInfo> ColumnInfos; - TVector<TPartitionInfo> Partitions; + std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning; TIntrusivePtr<TSecurityObject> SecurityObject; - bool IsSystemView() const { return Partitions.empty(); } + const TVector<TKeyDesc::TPartitionInfo>& GetPartitions() const { Y_VERIFY(Partitioning); return *Partitioning; } + bool IsSystemView() const { return GetPartitions().empty(); } template<typename TKeyColumnTypes, typename TColumns> TKeyDesc(const TTableId& tableId, const TTableRange& range, ERowOperation rowOperation, @@ -692,6 +694,7 @@ public: , Columns(columns.begin(), columns.end()) , Reverse(reverse) , Status(EStatus::Unknown) + , Partitioning(std::make_shared<TVector<TKeyDesc::TPartitionInfo>>()) {} }; diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index bb2d356fb7..12c357d7dc 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -587,14 +587,14 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe return; } - if (!entry.KeyDescription->Partitions) { + if (!entry.KeyDescription->GetPartitions()) { LOG_W("Empty partitions list" << ": entry# " << entry.ToString(*AppData()->TypeRegistry)); return Retry(); } KeyDesc = std::move(entry.KeyDescription); - CreateSenders(MakePartitionIds(KeyDesc->Partitions)); + CreateSenders(MakePartitionIds(KeyDesc->GetPartitions())); Become(&TThis::StateMain); } @@ -610,18 +610,18 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe } bool IsResolved() const override { - return KeyDesc && KeyDesc->Partitions; + return KeyDesc && KeyDesc->GetPartitions(); } ui64 GetPartitionId(const TChangeRecord& record) const override { Y_VERIFY(KeyDesc); - Y_VERIFY(KeyDesc->Partitions); + Y_VERIFY(KeyDesc->GetPartitions()); const auto range = TTableRange(record.GetKey()); Y_VERIFY(range.Point); TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( - KeyDesc->Partitions.begin(), KeyDesc->Partitions.end(), true, + KeyDesc->GetPartitions().begin(), KeyDesc->GetPartitions().end(), true, [&](const TKeyDesc::TPartitionInfo& partition, bool) { const int compares = CompareBorders<true, false>( partition.Range->EndKeyPrefix.GetCells(), range.From, @@ -633,7 +633,7 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe } ); - Y_VERIFY(it != KeyDesc->Partitions.end()); + Y_VERIFY(it != KeyDesc->GetPartitions().end()); return it->ShardId; // partition = shard } diff --git a/ydb/core/tx/datashard/datashard_distributed_erase.cpp b/ydb/core/tx/datashard/datashard_distributed_erase.cpp index fae7d2edeb..e7a15a9eab 100644 --- a/ydb/core/tx/datashard/datashard_distributed_erase.cpp +++ b/ydb/core/tx/datashard/datashard_distributed_erase.cpp @@ -271,10 +271,10 @@ class TDistEraser: public TActorBootstrapped<TDistEraser> { static ui64 GetShardId(const TTableRange& range, const TKeyDesc* keyDesc) { Y_VERIFY(range.Point); - Y_VERIFY(!keyDesc->Partitions.empty()); + Y_VERIFY(!keyDesc->GetPartitions().empty()); TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( - keyDesc->Partitions.begin(), keyDesc->Partitions.end(), true, + keyDesc->GetPartitions().begin(), keyDesc->GetPartitions().end(), true, [&](const TKeyDesc::TPartitionInfo& partition, bool) { const int compares = CompareBorders<true, false>( partition.Range->EndKeyPrefix.GetCells(), range.From, @@ -286,7 +286,7 @@ class TDistEraser: public TActorBootstrapped<TDistEraser> { } ); - Y_VERIFY(it != keyDesc->Partitions.end()); + Y_VERIFY(it != keyDesc->GetPartitions().end()); return it->ShardId; } @@ -541,7 +541,7 @@ class TDistEraser: public TActorBootstrapped<TDistEraser> { return; } - if (entry.KeyDescription->Partitions.empty()) { + if (entry.KeyDescription->GetPartitions().empty()) { return SchemeError(TStringBuilder() << "Empty partitions list" << ": entry# " << entry.ToString(*AppData()->TypeRegistry)); } diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 384b2e5c3f..b0438ef795 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -98,22 +98,26 @@ void TTester::EmptyShardKeyResolver(TKeyDesc& key) { void TTester::SingleShardKeyResolver(TKeyDesc& key) { key.Status = TKeyDesc::EStatus::Ok; - key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0)); + + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); + partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0)); + key.Partitioning = partitions; } void TTester::ThreeShardPointKeyResolver(TKeyDesc& key) { const ui32 ShardBorder1 = 1000; const ui32 ShardBorder2 = 2000; + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); key.Status = TKeyDesc::EStatus::Ok; if (key.Range.Point) { ui32 key0 = *(ui32*)key.Range.From[0].Data(); if (key0 < ShardBorder1) { - key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0)); + partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0)); } else if (key0 < ShardBorder2) { - key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet1)); + partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet1)); } else { - key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet2)); + partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet2)); } } else { UNIT_ASSERT(key.Range.From.size() > 0); @@ -126,12 +130,14 @@ void TTester::ThreeShardPointKeyResolver(TKeyDesc& key) { UNIT_ASSERT(from <= to); if (from < ShardBorder1) - key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0)); + partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0)); if (from < ShardBorder2 && to >= ShardBorder1) - key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet1)); + partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet1)); if (to >= ShardBorder2) - key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet2)); + partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet2)); } + + key.Partitioning = partitions; } TTester::TKeyResolver TTester::GetKeyResolver() const { @@ -332,7 +338,7 @@ ui32 TFakeProxyTx::SetProgram(TTester& tester, const TString& programText) { keyResolver(*dbKey); UNIT_ASSERT(dbKey->Status == TKeyDesc::EStatus::Ok); - for (auto& partition : dbKey->Partitions) { + for (auto& partition : dbKey->GetPartitions()) { resolvedShards.insert(partition.ShardId); } } @@ -997,7 +1003,10 @@ TKeyExtractor::TKeyExtractor(TTester& tester, TString programText) { for (auto& key : Engine->GetDbKeys()) { key->Status = TKeyDesc::EStatus::Ok; - key->Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0)); + + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); + partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0)); + key->Partitioning = partitions; } } diff --git a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp index 9ce091e22b..03fa7a8797 100644 --- a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp +++ b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp @@ -1402,7 +1402,8 @@ tkey = 100, key = 4 check(TEvResponse::ProtoRecordType::SCHEME_ERROR, "Empty partitions list", [](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == TEvResolve::EventType) { - ev->Get<TEvResolve>()->Request->ResultSet.at(0).KeyDescription->Partitions.clear(); + ev->Get<TEvResolve>()->Request->ResultSet.at(0).KeyDescription->Partitioning = + std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); } }); diff --git a/ydb/core/tx/datashard/datashard_ut_read_table.cpp b/ydb/core/tx/datashard/datashard_ut_read_table.cpp index c11e21f2f2..6747d06c46 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_table.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_table.cpp @@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(DataShardReadTableSnapshots) { Cerr << "... ignored TEvResolveKeySetResult with errors" << Endl; break; } - size_t partitions = request->ResultSet[0].KeyDescription->Partitions.size(); + size_t partitions = request->ResultSet[0].KeyDescription->GetPartitions().size(); if (partitions == captureResolveKeySetResultPartitions) { Cerr << "... captured TEvResolveKeySetResult with " << partitions << " partitions" << Endl; capturedResolveKeySetResults.emplace_back(ev.Release()); diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 152ba66d8e..da39c00ff0 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -220,7 +220,7 @@ namespace { entry.DomainInfo.Drop(); TKeyDesc& keyDesc = *entry.KeyDescription; keyDesc.ColumnInfos.clear(); - keyDesc.Partitions.clear(); + keyDesc.Partitioning = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); keyDesc.SecurityObject.Drop(); } @@ -696,7 +696,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { NotNullColumns.clear(); Indexes.clear(); CdcStreams.clear(); - Partitioning.clear(); + Partitioning = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); Self.Drop(); @@ -746,16 +746,19 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { } if (pathDesc.TablePartitionsSize()) { - Partitioning.resize(pathDesc.TablePartitionsSize()); + auto partitioning = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); + partitioning->resize(pathDesc.TablePartitionsSize()); for (ui32 i : xrange(pathDesc.TablePartitionsSize())) { const auto& src = pathDesc.GetTablePartitions(i); - auto& partition = Partitioning[i]; + auto& partition = (*partitioning)[i]; partition.Range = TKeyDesc::TPartitionRangeInfo(); partition.Range->EndKeyPrefix.Parse(src.GetEndOfRangeKeyPrefix()); partition.Range->IsInclusive = src.HasIsInclusive() && src.GetIsInclusive(); partition.Range->IsPoint = src.HasIsPoint() && src.GetIsPoint(); partition.ShardId = src.GetDatashardId(); } + + Partitioning = std::move(partitioning); } if (pathDesc.HasDomainDescription()) { @@ -851,24 +854,25 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { ptr->Description = std::move(desc); } - // copy-paste from core/tx/scheme_cache/scheme_cache_impl.cpp - void FillRangePartitioning( - const TTableRange& range, - TVector<TKeyDesc::TPartitionInfo>& partitions - ) const { - Y_VERIFY(!Partitioning.empty()); + std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> FillRangePartitioning(const TTableRange& range) const { + Y_VERIFY(Partitioning); + Y_VERIFY(!Partitioning->empty()); + + if (range.IsFullRange(KeyColumnTypes.size())) { + return Partitioning; + } - partitions.clear(); + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); // Temporary fix: for an empty range we need to return some datashard // so that it can handle readset logic (send empty result to other tx participants etc.) if (range.IsEmptyRange(KeyColumnTypes)) { - partitions.push_back(*Partitioning.begin()); - return; + partitions->push_back(*Partitioning->begin()); + return partitions; } TVector<TKeyDesc::TPartitionInfo>::const_iterator low = LowerBound( - Partitioning.begin(), Partitioning.end(), true, + Partitioning->begin(), Partitioning->end(), true, [&](const TKeyDesc::TPartitionInfo& left, bool) { const int compares = CompareBorders<true, false>( left.Range->EndKeyPrefix.GetCells(), range.From, @@ -880,13 +884,13 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { } ); - Y_VERIFY(low != Partitioning.end(), "last key must be (inf)"); + Y_VERIFY(low != Partitioning->end(), "last key must be (inf)"); do { - partitions.push_back(*low); + partitions->push_back(*low); if (range.Point) { - return; + return partitions; } const int prevComp = CompareBorders<true, true>( @@ -896,10 +900,12 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { ); if (prevComp >= 0) { - return; + return partitions; } - } while (++low != Partitioning.end()); + } while (++low != Partitioning->end()); + + return partitions; } bool IsSysTable() const { @@ -962,6 +968,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { , IsPrivatePath(false) , IsVirtual(isVirtual) , SchemaVersion(0) + , Partitioning(std::make_shared<TVector<TKeyDesc::TPartitionInfo>>()) { } @@ -980,6 +987,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { , IsPrivatePath(other.IsPrivatePath) , IsVirtual(other.IsVirtual) , SchemaVersion(other.SchemaVersion) + , Partitioning(std::make_shared<TVector<TKeyDesc::TPartitionInfo>>()) { if (other.Subscriber) { other.Subscriber = TSubscriber(); @@ -1817,18 +1825,22 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { } else if (Kind == TNavigate::KindOlapStore) { FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapStore); // Add all shards of the OLAP store + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); for (ui64 columnShard : OlapStoreInfo->Description.GetColumnShards()) { - keyDesc.Partitions.push_back(TKeyDesc::TPartitionInfo(columnShard)); - keyDesc.Partitions.back().Range = TKeyDesc::TPartitionRangeInfo(); + partitions->push_back(TKeyDesc::TPartitionInfo(columnShard)); + partitions->back().Range = TKeyDesc::TPartitionRangeInfo(); } + keyDesc.Partitioning = std::move(partitions); return; } else if (Kind == TNavigate::KindOlapTable) { FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapTable); // Add all shards of the OLAP table + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); for (ui64 columnShard : OlapTableInfo->Description.GetSharding().GetColumnShards()) { - keyDesc.Partitions.push_back(TKeyDesc::TPartitionInfo(columnShard)); - keyDesc.Partitions.back().Range = TKeyDesc::TPartitionRangeInfo(); + partitions->push_back(TKeyDesc::TPartitionInfo(columnShard)); + partitions->back().Range = TKeyDesc::TPartitionRangeInfo(); } + keyDesc.Partitioning = std::move(partitions); return; } @@ -1854,7 +1866,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { keyDesc.Range.From, TSysTables::TLocksTable::EColumns::DataShard, shard ); if (ok) { - keyDesc.Partitions.push_back(TKeyDesc::TPartitionInfo(shard)); + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); + partitions->push_back(TKeyDesc::TPartitionInfo(shard)); + keyDesc.Partitioning = std::move(partitions); } else { keyDesc.Status = TKeyDesc::EStatus::OperationNotSupported; ++context->Request->ErrorCount; @@ -1862,17 +1876,19 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { } } else if (OlapTableInfo) { // TODO: return proper partitioning info (KIKIMR-11069) + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); for (ui64 columnShard : OlapTableInfo->Description.GetSharding().GetColumnShards()) { - keyDesc.Partitions.push_back(TKeyDesc::TPartitionInfo(columnShard)); - keyDesc.Partitions.back().Range = TKeyDesc::TPartitionRangeInfo(); + partitions->push_back(TKeyDesc::TPartitionInfo(columnShard)); + partitions->back().Range = TKeyDesc::TPartitionRangeInfo(); } + keyDesc.Partitioning = std::move(partitions); } else { if (Partitioning) { - FillRangePartitioning(keyDesc.Range, keyDesc.Partitions); + keyDesc.Partitioning = FillRangePartitioning(keyDesc.Range); } } - if (keyDesc.Partitions.empty()) { + if (keyDesc.GetPartitions().empty()) { entry.Status = TResolve::EStatus::TypeCheckError; keyDesc.Status = TKeyDesc::EStatus::OperationNotSupported; } @@ -1925,7 +1941,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { THashSet<TString> NotNullColumns; TVector<NKikimrSchemeOp::TIndexDescription> Indexes; TVector<NKikimrSchemeOp::TCdcStreamDescription> CdcStreams; - TVector<TKeyDesc::TPartitionInfo> Partitioning; + std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning; TIntrusivePtr<TNavigate::TDirEntryInfo> Self; diff --git a/ydb/core/tx/scheme_cache/scheme_cache.cpp b/ydb/core/tx/scheme_cache/scheme_cache.cpp index 2ccc4b31a1..1024d73c19 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.cpp +++ b/ydb/core/tx/scheme_cache/scheme_cache.cpp @@ -84,7 +84,7 @@ TString TSchemeCacheRequest::TEntry::ToString() const { << " SyncVersion: " << (SyncVersion ? "true" : "false") << " Status: " << Status << " Kind: " << Kind - << " PartitionsCount: " << (KeyDescription ? ::ToString(KeyDescription->Partitions.size()) : "<moved>") + << " PartitionsCount: " << (KeyDescription ? ::ToString(KeyDescription->GetPartitions().size()) : "<moved>") << " DomainInfo " << (DomainInfo ? DomainInfo->ToString() : "<null>") << " }"; } @@ -97,7 +97,7 @@ TString TSchemeCacheRequest::TEntry::ToString(const NScheme::TTypeRegistry& type << " SyncVersion: " << (SyncVersion ? "true" : "false") << " Status: " << Status << " Kind: " << Kind - << " PartitionsCount: " << (KeyDescription ? ::ToString(KeyDescription->Partitions.size()) : "<moved>") + << " PartitionsCount: " << (KeyDescription ? ::ToString(KeyDescription->GetPartitions().size()) : "<moved>") << " DomainInfo " << (DomainInfo ? DomainInfo->ToString() : "<null>"); if (KeyDescription) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index b2543b22f1..9ed954553c 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -1833,7 +1833,7 @@ namespace NSchemeShardUT_Private { for (auto& dbKey : dbKeys) { ResolveKey(*dbKey); UNIT_ASSERT(dbKey->Status == TKeyDesc::EStatus::Ok); - for (auto& partition : dbKey->Partitions) { + for (auto& partition : dbKey->GetPartitions()) { resolvedShards.insert(partition.ShardId); } } @@ -2021,16 +2021,18 @@ namespace NSchemeShardUT_Private { TestLs(Runtime, Table, true, fnFillInfo); } - void TFakeDataReq::TTablePartitioningInfo::ResolveKey(const TTableRange &range, TVector<TKeyDesc::TPartitionInfo> &partitions) const { + std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> TFakeDataReq::TTablePartitioningInfo::ResolveKey( + const TTableRange& range) const + { Y_VERIFY(!Partitioning.empty()); - partitions.clear(); + auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); // Temporary fix: for an empty range we need to return some datashard so that it can handle readset logic ( // send empty result to other tx participants etc.) if (range.IsEmptyRange(KeyColumnTypes)) { - partitions.push_back(TKeyDesc::TPartitionInfo(Partitioning.begin()->Datashard)); - return; + partitions->push_back(TKeyDesc::TPartitionInfo(Partitioning.begin()->Datashard)); + return partitions; } TVector<TBorder>::const_iterator low = LowerBound(Partitioning.begin(), Partitioning.end(), true, @@ -2041,15 +2043,17 @@ namespace NSchemeShardUT_Private { Y_VERIFY(low != Partitioning.end(), "last key must be (inf)"); do { - partitions.push_back(TKeyDesc::TPartitionInfo(low->Datashard)); + partitions->push_back(TKeyDesc::TPartitionInfo(low->Datashard)); if (range.Point) - return; + return partitions; int prevComp = CompareBorders<true, true>(low->KeyTuple.GetCells(), range.To, low->Point || low->Inclusive, range.InclusiveTo, KeyColumnTypes); if (prevComp >= 0) - return; + return partitions; } while (++low != Partitioning.end()); + + return partitions; } TEvSchemeShard::TEvModifySchemeTransaction* CombineSchemeTransactions(const TVector<TEvSchemeShard::TEvModifySchemeTransaction*>& transactions) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index de4aaa0f42..24d6ddd6d7 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -440,7 +440,7 @@ namespace NSchemeShardUT_Private { TVector<NScheme::TTypeId> KeyColumnTypes; TVector<TBorder> Partitioning; - void ResolveKey(const TTableRange& range, TVector<TKeyDesc::TPartitionInfo>& partitions) const; + std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> ResolveKey(const TTableRange& range) const; }; void FillTablePartitioningInfo(); @@ -450,7 +450,7 @@ namespace NSchemeShardUT_Private { FillTablePartitioningInfo(); } - TablePartitioningInfo.ResolveKey(dbKey.Range, dbKey.Partitions); + dbKey.Partitioning = TablePartitioningInfo.ResolveKey(dbKey.Range); dbKey.Status = TKeyDesc::EStatus::Ok; } diff --git a/ydb/core/tx/tx_proxy/commitreq.cpp b/ydb/core/tx/tx_proxy/commitreq.cpp index dff7af4a18..3a7982a8db 100644 --- a/ydb/core/tx/tx_proxy/commitreq.cpp +++ b/ydb/core/tx/tx_proxy/commitreq.cpp @@ -240,7 +240,7 @@ private: return Die(ctx); } - for (auto& partition : entry.KeyDescription->Partitions) { + for (auto& partition : entry.KeyDescription->GetPartitions()) { auto& state = PerShardStates[partition.ShardId]; state.Tables.insert(entry.KeyDescription->TableId); } diff --git a/ydb/core/tx/tx_proxy/datareq.cpp b/ydb/core/tx/tx_proxy/datareq.cpp index 9ce5729899..c933a19412 100644 --- a/ydb/core/tx/tx_proxy/datareq.cpp +++ b/ydb/core/tx/tx_proxy/datareq.cpp @@ -1095,7 +1095,7 @@ void TDataReq::ContinueFlatMKQLResolve(const TActorContext &ctx) { // we would need shard -> table mapping for scheme cache invalidation on errors for (const auto& keyDescription : keyDescriptions) { - for (auto& partition : keyDescription->Partitions) { + for (auto& partition : keyDescription->GetPartitions()) { if (auto *x = PerTablet.FindPtr(partition.ShardId)) { x->TableId = keyDescription->TableId; } @@ -1137,7 +1137,7 @@ void TDataReq::ProcessReadTableResolve(NSchemeCache::TSchemeCacheRequest *cacheR auto &entry = cacheRequest->ResultSet[0]; ReadTableRequest->KeyDesc = std::move(entry.KeyDescription); - bool singleShard = ReadTableRequest->KeyDesc->Partitions.size() == 1; + bool singleShard = ReadTableRequest->KeyDesc->GetPartitions().size() == 1; CanUseFollower = false; bool immediate = singleShard; @@ -1147,7 +1147,7 @@ void TDataReq::ProcessReadTableResolve(NSchemeCache::TSchemeCacheRequest *cacheR immediate = true; } - for (auto& partition : ReadTableRequest->KeyDesc->Partitions) { + for (auto& partition : ReadTableRequest->KeyDesc->GetPartitions()) { NKikimrTxDataShard::TDataTransaction dataTransaction; dataTransaction.SetStreamResponse(StreamResponse); dataTransaction.SetImmediate(immediate); @@ -1185,7 +1185,7 @@ void TDataReq::ProcessReadTableResolve(NSchemeCache::TSchemeCacheRequest *cacheR "Actor# " << ctx.SelfID.ToString() << " txid# " << TxId << " SEND TEvProposeTransaction to datashard " << partition.ShardId << " with read table request" - << " affected shards " << ReadTableRequest->KeyDesc->Partitions.size() + << " affected shards " << ReadTableRequest->KeyDesc->GetPartitions().size() << " followers " << (CanUseFollower ? "allowed" : "disallowed") << " marker# P4b"); const TActorId pipeCache = CanUseFollower ? Services.FollowerPipeCache : Services.LeaderPipeCache; @@ -2777,8 +2777,8 @@ ui64 GetFirstTablet(NSchemeCache::TSchemeCacheRequest &cacheRequest) { NSchemeCache::TSchemeCacheRequest::TEntry& firstEntry= *cacheRequest.ResultSet.begin(); NKikimr::TKeyDesc& firstKey = *firstEntry.KeyDescription; - Y_VERIFY(!firstKey.Partitions.empty()); - return firstKey.Partitions.begin()->ShardId; + Y_VERIFY(!firstKey.GetPartitions().empty()); + return firstKey.GetPartitions().begin()->ShardId; } const TDomainsInfo::TDomain& TDataReq::SelectDomain(NSchemeCache::TSchemeCacheRequest &cacheRequest, const TActorContext &ctx) { diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp index 6c30975fd9..a364d5ddc1 100644 --- a/ydb/core/tx/tx_proxy/read_table_impl.cpp +++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp @@ -739,7 +739,7 @@ private: KeyDesc = std::move(request->ResultSet[0].KeyDescription); - if (KeyDesc->Partitions.empty()) { + if (KeyDesc->GetPartitions().empty()) { TString error = TStringBuilder() << "No partitions to read from '" << Settings.TablePath << "'"; TXLOG_E(error); return ReplyAndDie(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::WrongRequest, NKikimrIssues::TStatusIds::BAD_REQUEST, ctx); @@ -754,8 +754,8 @@ private: // Do we need to create a new snapshot? const bool needSnapshot = Settings.ReadVersion.IsMax(); - for (size_t idx = 0; idx < KeyDesc->Partitions.size(); ++idx) { - const auto& partition = KeyDesc->Partitions[idx]; + for (size_t idx = 0; idx < KeyDesc->GetPartitions().size(); ++idx) { + const auto& partition = KeyDesc->GetPartitions()[idx]; const ui64 shardId = partition.ShardId; auto [it, inserted] = ShardMap.emplace( @@ -772,12 +772,12 @@ private: range.From = KeyFromValues; range.FromInclusive = KeyDesc->Range.InclusiveFrom; } else { - const auto& prevRange = *KeyDesc->Partitions[idx - 1].Range; + const auto& prevRange = *KeyDesc->GetPartitions()[idx - 1].Range; range.From = prevRange.EndKeyPrefix; range.FromInclusive = !prevRange.IsInclusive; // N.B. always true for now } - if (idx == KeyDesc->Partitions.size() - 1) { + if (idx == KeyDesc->GetPartitions().size() - 1) { // Last shard in range range.To = KeyToValues; range.ToInclusive = KeyDesc->Range.InclusiveTo; @@ -2393,7 +2393,7 @@ private: KeyDesc = std::move(request->ResultSet[0].KeyDescription); - if (KeyDesc->Partitions.empty()) { + if (KeyDesc->GetPartitions().empty()) { TString error = TStringBuilder() << "No partitions to read from '" << Settings.TablePath << "'"; TXLOG_E(error); return ReplyAndDie(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::WrongRequest, NKikimrIssues::TStatusIds::BAD_REQUEST, ctx); @@ -2421,8 +2421,8 @@ private: THashSet<TShardState*> removed; - for (size_t idx = 0; idx < KeyDesc->Partitions.size(); ++idx) { - const auto& partition = KeyDesc->Partitions[idx]; + for (size_t idx = 0; idx < KeyDesc->GetPartitions().size(); ++idx) { + const auto& partition = KeyDesc->GetPartitions()[idx]; const ui64 shardId = partition.ShardId; TXLOG_T("Processing resolved shard ShardId# " << shardId); @@ -2452,12 +2452,12 @@ private: shardRange.From = KeyFromValues; shardRange.FromInclusive = KeyDesc->Range.InclusiveFrom; } else { - const auto& prevRange = *KeyDesc->Partitions[idx - 1].Range; + const auto& prevRange = *KeyDesc->GetPartitions()[idx - 1].Range; shardRange.From = prevRange.EndKeyPrefix; shardRange.FromInclusive = !prevRange.IsInclusive; // N.B. always true for now } - if (idx == KeyDesc->Partitions.size() - 1) { + if (idx == KeyDesc->GetPartitions().size() - 1) { // Last shard in range shardRange.To = KeyToValues; shardRange.ToInclusive = KeyDesc->Range.InclusiveTo; diff --git a/ydb/core/tx/tx_proxy/snapshotreq.cpp b/ydb/core/tx/tx_proxy/snapshotreq.cpp index 4b2d7fe275..28fc4dd779 100644 --- a/ydb/core/tx/tx_proxy/snapshotreq.cpp +++ b/ydb/core/tx/tx_proxy/snapshotreq.cpp @@ -328,7 +328,7 @@ public: return Die(ctx); } - for (auto& partition : entry.KeyDescription->Partitions) { + for (auto& partition : entry.KeyDescription->GetPartitions()) { auto& state = PerShardStates[partition.ShardId]; state.Tables.insert(entry.KeyDescription->TableId); } @@ -1367,7 +1367,7 @@ public: return Die(ctx); } - for (auto& partition : entry.KeyDescription->Partitions) { + for (auto& partition : entry.KeyDescription->GetPartitions()) { auto& state = PerShardStates[partition.ShardId]; state.Tables.insert(entry.KeyDescription->TableId); } diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index c05c11e6e2..e123b55c41 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -862,7 +862,8 @@ private: return JoinVectorIntoString(shards, ", "); }; - LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " << getShardsString(GetKeyRange()->Partitions)); + LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " + << getShardsString(GetKeyRange()->GetPartitions())); MakeShardRequests(ctx); } @@ -870,13 +871,13 @@ private: void MakeShardRequests(const NActors::TActorContext& ctx) { const auto* keyRange = GetKeyRange(); - Y_VERIFY(!keyRange->Partitions.empty()); + Y_VERIFY(!keyRange->GetPartitions().empty()); // Group rows by shard id - TVector<std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>> shardRequests(keyRange->Partitions.size()); + TVector<std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>> shardRequests(keyRange->GetPartitions().size()); for (const auto& keyValue : GetRows()) { // Find partition for the key - auto it = std::lower_bound(keyRange->Partitions.begin(), keyRange->Partitions.end(), keyValue.first.GetCells(), + auto it = std::lower_bound(keyRange->GetPartitions().begin(), keyRange->GetPartitions().end(), keyValue.first.GetCells(), [this](const auto &partition, const auto& key) { const auto& range = *partition.Range; const int cmp = CompareBorders<true, false>(range.EndKeyPrefix.GetCells(), key, @@ -885,7 +886,7 @@ private: return (cmp < 0); }); - size_t shardIdx = it - keyRange->Partitions.begin(); + size_t shardIdx = it - keyRange->GetPartitions().begin(); TEvDataShard::TEvUploadRowsRequest* ev = shardRequests[shardIdx].get(); if (!ev) { @@ -915,7 +916,7 @@ private: if (!shardRequests[idx]) continue; - TTabletId shardId = keyRange->Partitions[idx].ShardId; + TTabletId shardId = keyRange->GetPartitions()[idx].ShardId; LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Sending request to shards " << shardId); |