aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergei Puchin <s.puchin@gmail.com>2022-06-20 20:12:49 +0300
committerSergei Puchin <s.puchin@gmail.com>2022-06-20 20:12:49 +0300
commit13bffcce474f3588b3e177ea6c6015f16ded9ef6 (patch)
tree5717b823ceaef323b2152b1e8e916ddfbe958928
parent37b0429b14b6423eba85f06cc9e942db2893490e (diff)
downloadydb-13bffcce474f3588b3e177ea6c6015f16ded9ef6.tar.gz
Fast path for obtaining full table partitioning from scheme cache. (KIKIMR-15147)
ref:bc6ff3951a8846e0fdfebf0884e1665564058773
-rw-r--r--ydb/core/client/server/msgbus_server_s3_listing.cpp9
-rw-r--r--ydb/core/engine/minikql/flat_local_tx_minikql.h5
-rw-r--r--ydb/core/engine/mkql_engine_flat.cpp12
-rw-r--r--ydb/core/engine/mkql_engine_flat_ut.cpp16
-rw-r--r--ydb/core/grpc_services/rpc_import_data.cpp8
-rw-r--r--ydb/core/grpc_services/rpc_kh_describe.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_read_columns.cpp7
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp16
-rw-r--r--ydb/core/kqp/executer/kqp_partition_helper.cpp24
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp2
-rw-r--r--ydb/core/kqp/executer/kqp_table_resolver.cpp2
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.cpp2
-rw-r--r--ydb/core/scheme/scheme_tabledefs.cpp22
-rw-r--r--ydb/core/scheme/scheme_tabledefs.h7
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_distributed_erase.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp27
-rw-r--r--ydb/core/tx/datashard/datashard_ut_erase_rows.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_table.cpp2
-rw-r--r--ydb/core/tx/scheme_board/cache.cpp74
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.cpp4
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.cpp20
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.h4
-rw-r--r--ydb/core/tx/tx_proxy/commitreq.cpp2
-rw-r--r--ydb/core/tx/tx_proxy/datareq.cpp12
-rw-r--r--ydb/core/tx/tx_proxy/read_table_impl.cpp20
-rw-r--r--ydb/core/tx/tx_proxy/snapshotreq.cpp4
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h13
28 files changed, 206 insertions, 135 deletions
diff --git a/ydb/core/client/server/msgbus_server_s3_listing.cpp b/ydb/core/client/server/msgbus_server_s3_listing.cpp
index 3626866322..e021ffee12 100644
--- a/ydb/core/client/server/msgbus_server_s3_listing.cpp
+++ b/ydb/core/client/server/msgbus_server_s3_listing.cpp
@@ -382,9 +382,10 @@ private:
return JoinVectorIntoString(shards, ", ");
};
- LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " << getShardsString(KeyRange->Partitions));
+ LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: "
+ << getShardsString(KeyRange->GetPartitions()));
- if (KeyRange->Partitions.size() > 0) {
+ if (KeyRange->GetPartitions().size() > 0) {
CurrentShardIdx = 0;
MakeShardRequest(CurrentShardIdx, ctx);
} else {
@@ -393,7 +394,7 @@ private:
}
void MakeShardRequest(ui32 idx, const NActors::TActorContext& ctx) {
- ui64 shardId = KeyRange->Partitions[idx].ShardId;
+ ui64 shardId = KeyRange->GetPartitions()[idx].ShardId;
THolder<TEvDataShard::TEvS3ListingRequest> ev(new TEvDataShard::TEvS3ListingRequest());
ev->Record.SetTableId(KeyRange->TableId.PathId.LocalPathId);
@@ -472,7 +473,7 @@ private:
ContentsRows.emplace_back(shardResponse.GetContentsRows(i));
}
- if (CurrentShardIdx+1 < KeyRange->Partitions.size() &&
+ if (CurrentShardIdx+1 < KeyRange->GetPartitions().size() &&
MaxKeys > ContentsRows.size() + CommonPrefixesRows.size() &&
shardResponse.GetMoreRows())
{
diff --git a/ydb/core/engine/minikql/flat_local_tx_minikql.h b/ydb/core/engine/minikql/flat_local_tx_minikql.h
index 0feabf20e6..d7387ed8f4 100644
--- a/ydb/core/engine/minikql/flat_local_tx_minikql.h
+++ b/ydb/core/engine/minikql/flat_local_tx_minikql.h
@@ -261,7 +261,10 @@ class TFlatLocalMiniKQL : public NTabletFlatExecutor::ITransaction {
for (auto &key : proxyEngine->GetDbKeys()) {
key->Status = TKeyDesc::EStatus::Ok;
- key->Partitions.push_back(TKeyDesc::TPartitionInfo(TabletId));
+
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
+ partitions->push_back(TKeyDesc::TPartitionInfo(TabletId));
+ key->Partitioning = partitions;
for (const auto &x : key->Columns) {
key->ColumnInfos.push_back({x.Column, x.ExpectedType, 0, TKeyDesc::EStatus::Ok}); // type-check
diff --git a/ydb/core/engine/mkql_engine_flat.cpp b/ydb/core/engine/mkql_engine_flat.cpp
index 32e176a77a..3c972e6961 100644
--- a/ydb/core/engine/mkql_engine_flat.cpp
+++ b/ydb/core/engine/mkql_engine_flat.cpp
@@ -455,7 +455,7 @@ public:
for (auto& x : ProxyCallables) {
auto name = x.second.Node->GetType()->GetNameStr();
if (name == Strings.EraseRow || name == Strings.UpdateRow) {
- proxyShards[x.first] = &x.second.Key->Partitions;
+ proxyShards[x.first] = &x.second.Key->GetPartitions();
}
}
@@ -476,7 +476,7 @@ public:
if (readsets.size() > limits.RSCount) {
THashMap<ui64, TTableId> tableMap;
for (auto& key : DbKeys) {
- for (auto& partition : key->Partitions) {
+ for (auto& partition : key->GetPartitions()) {
tableMap[partition.ShardId] = key->TableId;
}
}
@@ -1498,7 +1498,7 @@ private:
};
static void AddShards(TSet<ui64>& set, const TKeyDesc& key) {
- for (auto& partition : key.Partitions) {
+ for (auto& partition : key.GetPartitions()) {
Y_VERIFY(partition.ShardId);
set.insert(partition.ShardId);
}
@@ -1576,7 +1576,7 @@ private:
auto uniqueName = ToString(callable->GetUniqueId());
shardsForReadBuilder.Add(uniqueName, ctx.ShardsForRead);
- for (auto& partition : ctx.Key->Partitions) {
+ for (auto& partition : ctx.Key->GetPartitions()) {
auto shardForRead = partition.ShardId;
if (shardForRead != shard) {
readsets.insert(std::make_pair(shardForRead, shard));
@@ -1589,7 +1589,7 @@ private:
auto key = ctx.Key;
Y_VERIFY(key);
- for (auto& partition : key->Partitions) {
+ for (auto& partition : key->GetPartitions()) {
if (partition.ShardId == shard) {
return true;
}
@@ -2039,7 +2039,7 @@ private:
Y_VERIFY(key);
TListLiteralBuilder listOfShards(Env, Ui64Type);
- for (auto& partition : key->Partitions) {
+ for (auto& partition : key->GetPartitions()) {
listOfShards.Add(TRuntimeNode(BuildDataLiteral(NUdf::TUnboxedValuePod(partition.ShardId),
NUdf::TDataType<ui64>::Id, Env), true));
}
diff --git a/ydb/core/engine/mkql_engine_flat_ut.cpp b/ydb/core/engine/mkql_engine_flat_ut.cpp
index e2b12e2d7a..7c547889c1 100644
--- a/ydb/core/engine/mkql_engine_flat_ut.cpp
+++ b/ydb/core/engine/mkql_engine_flat_ut.cpp
@@ -88,18 +88,26 @@ namespace {
void SingleShardResolver(TKeyDesc& key) {
key.Status = TKeyDesc::EStatus::Ok;
- key.Partitions.push_back(TKeyDesc::TPartitionInfo(Shard1));
+
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
+ partitions->push_back(TKeyDesc::TPartitionInfo(Shard1));
+ key.Partitioning = partitions;
}
void DoubleShardResolver(TKeyDesc& key) {
key.Status = TKeyDesc::EStatus::Ok;
- key.Partitions.push_back(TKeyDesc::TPartitionInfo(Shard1));
- key.Partitions.push_back(TKeyDesc::TPartitionInfo(Shard2));
+
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
+ partitions->push_back(TKeyDesc::TPartitionInfo(Shard1));
+ partitions->push_back(TKeyDesc::TPartitionInfo(Shard2));
+ key.Partitioning = partitions;
}
void TwoShardResolver(TKeyDesc& key) {
key.Status = TKeyDesc::EStatus::Ok;
- key.Partitions.push_back(TKeyDesc::TPartitionInfo(key.TableId.PathId.LocalPathId == Table1Id ? Shard1 : Shard2));
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
+ partitions->push_back(TKeyDesc::TPartitionInfo(key.TableId.PathId.LocalPathId == Table1Id ? Shard1 : Shard2));
+ key.Partitioning = partitions;
}
struct TDriver {
diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp
index c782b04de7..d9b32d742a 100644
--- a/ydb/core/grpc_services/rpc_import_data.cpp
+++ b/ydb/core/grpc_services/rpc_import_data.cpp
@@ -79,10 +79,10 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque
static ui64 GetShardId(const TTableRange& range, const TKeyDesc* keyDesc) {
Y_VERIFY(range.Point);
- Y_VERIFY(!keyDesc->Partitions.empty());
+ Y_VERIFY(!keyDesc->GetPartitions().empty());
TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound(
- keyDesc->Partitions.begin(), keyDesc->Partitions.end(), true,
+ keyDesc->GetPartitions().begin(), keyDesc->GetPartitions().end(), true,
[&](const TKeyDesc::TPartitionInfo& partition, bool) {
const int cmp = CompareBorders<true, false>(
partition.Range->EndKeyPrefix.GetCells(), range.From,
@@ -94,7 +94,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque
}
);
- Y_VERIFY(it != keyDesc->Partitions.end());
+ Y_VERIFY(it != keyDesc->GetPartitions().end());
return it->ShardId;
}
@@ -216,7 +216,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque
return Reply(StatusIds::SCHEME_ERROR, TIssuesIds::GENERIC_RESOLVE_ERROR);
}
- if (KeyDesc->Partitions.empty()) {
+ if (KeyDesc->GetPartitions().empty()) {
return Reply(StatusIds::SCHEME_ERROR, TIssuesIds::GENERIC_RESOLVE_ERROR);
}
diff --git a/ydb/core/grpc_services/rpc_kh_describe.cpp b/ydb/core/grpc_services/rpc_kh_describe.cpp
index 70c8767088..9882b0abb1 100644
--- a/ydb/core/grpc_services/rpc_kh_describe.cpp
+++ b/ydb/core/grpc_services/rpc_kh_describe.cpp
@@ -301,9 +301,9 @@ private:
LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Table ["
<< TEvKikhouseDescribeTableRequest::GetProtoRequest(Request)->path()
- << "] shards: " << getShardsString(KeyRange->Partitions));
+ << "] shards: " << getShardsString(KeyRange->GetPartitions()));
- for (const TKeyDesc::TPartitionInfo& partition : KeyRange->Partitions) {
+ for (const TKeyDesc::TPartitionInfo& partition : KeyRange->GetPartitions()) {
auto* p = Result.add_partitions();
p->set_tablet_id(partition.ShardId);
p->set_end_key(partition.Range->EndKeyPrefix.GetBuffer());
diff --git a/ydb/core/grpc_services/rpc_read_columns.cpp b/ydb/core/grpc_services/rpc_read_columns.cpp
index ccfaf1c1ff..d9f17687a9 100644
--- a/ydb/core/grpc_services/rpc_read_columns.cpp
+++ b/ydb/core/grpc_services/rpc_read_columns.cpp
@@ -617,13 +617,14 @@ private:
return JoinVectorIntoString(shards, ", ");
};
- LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " << getShardsString(KeyRange->Partitions));
+ LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: "
+ << getShardsString(KeyRange->GetPartitions()));
MakeShardRequests(ctx);
}
void MakeShardRequests(const NActors::TActorContext& ctx) {
- Y_VERIFY(!KeyRange->Partitions.empty());
+ Y_VERIFY(!KeyRange->GetPartitions().empty());
auto proto = GetProtoRequest();
// Send request to the first shard
@@ -642,7 +643,7 @@ private:
ev->Record.SetSnapshotTxId(SnapshotId.TxId);
}
- ui64 shardId = KeyRange->Partitions[0].ShardId;
+ ui64 shardId = KeyRange->GetPartitions()[0].ShardId;
LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Sending request to shards " << shardId);
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index e080777290..50995dc2e1 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -529,7 +529,7 @@ private:
auto keyDesc = std::move(request->ResultSet[0].KeyDescription);
- if (keyDesc->Partitions.empty()) {
+ if (keyDesc->GetPartitions().empty()) {
TString error = TStringBuilder() << "No partitions to read from '" << ScanData->TablePath << "'";
CA_LOG_E(error);
InternalError(TIssuesIds::KIKIMR_SCHEME_ERROR, error);
@@ -539,16 +539,16 @@ private:
const auto& tr = *AppData()->TypeRegistry;
TVector<TShardState> newShards;
- newShards.reserve(keyDesc->Partitions.size());
+ newShards.reserve(keyDesc->GetPartitions().size());
- for (ui64 idx = 0, i = 0; idx < keyDesc->Partitions.size(); ++idx) {
- const auto& partition = keyDesc->Partitions[idx];
+ for (ui64 idx = 0, i = 0; idx < keyDesc->GetPartitions().size(); ++idx) {
+ const auto& partition = keyDesc->GetPartitions()[idx];
TTableRange partitionRange{
- idx == 0 ? state.Ranges.front().From.GetCells() : keyDesc->Partitions[idx - 1].Range->EndKeyPrefix.GetCells(),
- idx == 0 ? state.Ranges.front().FromInclusive : !keyDesc->Partitions[idx - 1].Range->IsInclusive,
- keyDesc->Partitions[idx].Range->EndKeyPrefix.GetCells(),
- keyDesc->Partitions[idx].Range->IsInclusive
+ idx == 0 ? state.Ranges.front().From.GetCells() : keyDesc->GetPartitions()[idx - 1].Range->EndKeyPrefix.GetCells(),
+ idx == 0 ? state.Ranges.front().FromInclusive : !keyDesc->GetPartitions()[idx - 1].Range->IsInclusive,
+ keyDesc->GetPartitions()[idx].Range->EndKeyPrefix.GetCells(),
+ keyDesc->GetPartitions()[idx].Range->IsInclusive
};
CA_LOG_D("Processing resolved ShardId# " << partition.ShardId
diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp
index bebba15fcc..28c715e31b 100644
--- a/ydb/core/kqp/executer/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp
@@ -54,17 +54,17 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(const NDq::TMkqlV
auto keyValue = MakeKeyCells(paramValue, table.KeyColumnTypes, keyColumnIndices, typeEnv, /* copyValues */ true);
Y_VERIFY_DEBUG(keyValue.size() == keyLen);
- ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.Partitions, table.KeyColumnTypes,
+ ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), table.KeyColumnTypes,
[] (const auto& partition) { return *partition.Range; });
- ui64 shardId = key.Partitions[partitionIndex].ShardId;
+ ui64 shardId = key.GetPartitions()[partitionIndex].ShardId;
shardParamValues[shardId].emplace_back(std::move(paramValue));
auto point = TSerializedCellVec(TSerializedCellVec::Serialize(keyValue));
auto& shardData = ret[shardId];
- if (key.Partitions[partitionIndex].Range->IsPoint) {
+ if (key.GetPartitions()[partitionIndex].Range->IsPoint) {
// singular case when partition is just a point
shardData.FullRange.emplace(TSerializedTableRange(point.GetBuffer(), "", true, true));
shardData.FullRange->Point = true;
@@ -141,7 +141,7 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(const NDq::
auto range = TTableRange(fromValues, /* inclusiveFrom */ true,
point ? TConstArrayRef<TCell>() : toValuesPrefix, /* inclusiveTo */ true,
/* point */ point);
- TVector<TPartitionWithRange> rangePartitions = GetKeyRangePartitions(range, key.Partitions, keyFullType);
+ TVector<TPartitionWithRange> rangePartitions = GetKeyRangePartitions(range, key.GetPartitions(), keyFullType);
for (TPartitionWithRange& partitionWithRange : rangePartitions) {
ui64 shardId = partitionWithRange.PartitionInfo->ShardId;
@@ -437,8 +437,8 @@ TSerializedTableRange MakeKeyRange(const TVector<NUdf::TDataTypeId>& keyColumnTy
namespace {
void FillFullRange(const TStageInfo& stageInfo, THashMap<ui64, TShardInfo>& shardInfoMap, bool read) {
- for (ui64 i = 0; i < stageInfo.Meta.ShardKey->Partitions.size(); ++i) {
- auto& partition = stageInfo.Meta.ShardKey->Partitions[i];
+ for (ui64 i = 0; i < stageInfo.Meta.ShardKey->GetPartitions().size(); ++i) {
+ auto& partition = stageInfo.Meta.ShardKey->GetPartitions()[i];
auto& partitionRange = *partition.Range;
auto& shardInfo = shardInfoMap[partition.ShardId];
@@ -456,7 +456,7 @@ void FillFullRange(const TStageInfo& stageInfo, THashMap<ui64, TShardInfo>& shar
}
if (i != 0) {
- auto& prevPartition = stageInfo.Meta.ShardKey->Partitions[i - 1];
+ auto& prevPartition = stageInfo.Meta.ShardKey->GetPartitions()[i - 1];
ranges->MakeFull(TSerializedTableRange(prevPartition.Range->EndKeyPrefix.GetCells(), !prevPartition.Range->IsInclusive,
partitionRange.EndKeyPrefix.GetCells(), partitionRange.IsInclusive));
@@ -499,7 +499,8 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
YQL_ENSURE(readRange.HasKeyRange());
auto range = MakeKeyRange(keyColumnTypes, readRange.GetKeyRange(), stageInfo, holderFactory, typeEnv);
- auto readPartitions = GetKeyRangePartitions(range.ToTableRange(), stageInfo.Meta.ShardKey->Partitions, keyColumnTypes);
+ auto readPartitions = GetKeyRangePartitions(range.ToTableRange(), stageInfo.Meta.ShardKey->GetPartitions(),
+ keyColumnTypes);
THashMap<ui64, TShardInfo> shardInfoMap;
for (TPartitionWithRange& partitionWithRange : readPartitions) {
@@ -536,7 +537,8 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true)
: TTableRange(std::get<TSerializedTableRange>(range).ToTableRange());
- auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->Partitions, keyColumnTypes);
+ auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(),
+ keyColumnTypes);
for (TPartitionWithRange& partitionWithRange : readPartitions) {
auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId];
@@ -575,7 +577,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
if (ranges.empty())
return shardInfoMap;
- for (const auto& partition : stageInfo.Meta.ShardKey->Partitions) {
+ for (const auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
auto& shardInfo = shardInfoMap[partition.ShardId];
YQL_ENSURE(!shardInfo.KeyReadRanges);
@@ -720,7 +722,7 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow
}
auto range = TTableRange(keyFrom, true, keyTo, true, /* point */ false);
- auto partitions = GetKeyRangePartitions(range, stageInfo.Meta.ShardKey->Partitions, table.KeyColumnTypes);
+ auto partitions = GetKeyRangePartitions(range, stageInfo.Meta.ShardKey->GetPartitions(), table.KeyColumnTypes);
for (auto& partitionWithRange: partitions) {
ui64 shardId = partitionWithRange.PartitionInfo->ShardId;
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 5c7c57d6fe..135890f36e 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -95,7 +95,7 @@ public:
TSet<ui64> shardIds;
for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) {
if (stageInfo.Meta.ShardKey) {
- for (auto& partition : stageInfo.Meta.ShardKey->Partitions) {
+ for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
shardIds.insert(partition.ShardId);
}
}
diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp
index 846aba3be2..86613a29d8 100644
--- a/ydb/core/kqp/executer/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp
@@ -196,7 +196,7 @@ private:
return;
}
- for (auto& partition : entry.KeyDescription->Partitions) {
+ for (auto& partition : entry.KeyDescription->GetPartitions()) {
YQL_ENSURE(partition.Range);
}
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
index 0002fbfe21..ac9792ed9f 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
@@ -159,7 +159,7 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
{
YQL_ENSURE(stageInfo.Meta.ShardKey);
THashMap<ui64, const TKeyDesc::TPartitionInfo*> partitionsMap;
- for (auto& partition : stageInfo.Meta.ShardKey->Partitions) {
+ for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
partitionsMap[partition.ShardId] = &partition;
}
diff --git a/ydb/core/scheme/scheme_tabledefs.cpp b/ydb/core/scheme/scheme_tabledefs.cpp
index 8c91e03d0e..100f23ce62 100644
--- a/ydb/core/scheme/scheme_tabledefs.cpp
+++ b/ydb/core/scheme/scheme_tabledefs.cpp
@@ -10,6 +10,28 @@ bool TTableRange::IsEmptyRange(TConstArrayRef<const NScheme::TTypeId> cellTypeId
return (compares < 0);
}
+bool TTableRange::IsFullRange(ui32 columnsCount) const {
+ if (!InclusiveFrom) {
+ return false;
+ }
+
+ if (!To.empty()) {
+ return false;
+ }
+
+ if (!From.size() == columnsCount) {
+ return false;
+ }
+
+ for (const auto& value : From) {
+ if (value) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
bool TSerializedTableRange::IsEmpty(TConstArrayRef<NScheme::TTypeId> type) const
{
auto cmp = CompareBorders<true, false>(To.GetCells(), From.GetCells(), ToInclusive, FromInclusive, type);
diff --git a/ydb/core/scheme/scheme_tabledefs.h b/ydb/core/scheme/scheme_tabledefs.h
index 190233fdee..1e09a1e723 100644
--- a/ydb/core/scheme/scheme_tabledefs.h
+++ b/ydb/core/scheme/scheme_tabledefs.h
@@ -175,6 +175,7 @@ public:
}
bool IsEmptyRange(TConstArrayRef<const NScheme::TTypeId> cellTypeIds) const;
+ bool IsFullRange(ui32 columnsCount) const;
};
class TSerializedTableRange {
@@ -675,10 +676,11 @@ public:
// out
EStatus Status;
TVector<TColumnInfo> ColumnInfos;
- TVector<TPartitionInfo> Partitions;
+ std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
TIntrusivePtr<TSecurityObject> SecurityObject;
- bool IsSystemView() const { return Partitions.empty(); }
+ const TVector<TKeyDesc::TPartitionInfo>& GetPartitions() const { Y_VERIFY(Partitioning); return *Partitioning; }
+ bool IsSystemView() const { return GetPartitions().empty(); }
template<typename TKeyColumnTypes, typename TColumns>
TKeyDesc(const TTableId& tableId, const TTableRange& range, ERowOperation rowOperation,
@@ -692,6 +694,7 @@ public:
, Columns(columns.begin(), columns.end())
, Reverse(reverse)
, Status(EStatus::Unknown)
+ , Partitioning(std::make_shared<TVector<TKeyDesc::TPartitionInfo>>())
{}
};
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index bb2d356fb7..12c357d7dc 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -587,14 +587,14 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe
return;
}
- if (!entry.KeyDescription->Partitions) {
+ if (!entry.KeyDescription->GetPartitions()) {
LOG_W("Empty partitions list"
<< ": entry# " << entry.ToString(*AppData()->TypeRegistry));
return Retry();
}
KeyDesc = std::move(entry.KeyDescription);
- CreateSenders(MakePartitionIds(KeyDesc->Partitions));
+ CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()));
Become(&TThis::StateMain);
}
@@ -610,18 +610,18 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe
}
bool IsResolved() const override {
- return KeyDesc && KeyDesc->Partitions;
+ return KeyDesc && KeyDesc->GetPartitions();
}
ui64 GetPartitionId(const TChangeRecord& record) const override {
Y_VERIFY(KeyDesc);
- Y_VERIFY(KeyDesc->Partitions);
+ Y_VERIFY(KeyDesc->GetPartitions());
const auto range = TTableRange(record.GetKey());
Y_VERIFY(range.Point);
TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound(
- KeyDesc->Partitions.begin(), KeyDesc->Partitions.end(), true,
+ KeyDesc->GetPartitions().begin(), KeyDesc->GetPartitions().end(), true,
[&](const TKeyDesc::TPartitionInfo& partition, bool) {
const int compares = CompareBorders<true, false>(
partition.Range->EndKeyPrefix.GetCells(), range.From,
@@ -633,7 +633,7 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe
}
);
- Y_VERIFY(it != KeyDesc->Partitions.end());
+ Y_VERIFY(it != KeyDesc->GetPartitions().end());
return it->ShardId; // partition = shard
}
diff --git a/ydb/core/tx/datashard/datashard_distributed_erase.cpp b/ydb/core/tx/datashard/datashard_distributed_erase.cpp
index fae7d2edeb..e7a15a9eab 100644
--- a/ydb/core/tx/datashard/datashard_distributed_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_distributed_erase.cpp
@@ -271,10 +271,10 @@ class TDistEraser: public TActorBootstrapped<TDistEraser> {
static ui64 GetShardId(const TTableRange& range, const TKeyDesc* keyDesc) {
Y_VERIFY(range.Point);
- Y_VERIFY(!keyDesc->Partitions.empty());
+ Y_VERIFY(!keyDesc->GetPartitions().empty());
TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound(
- keyDesc->Partitions.begin(), keyDesc->Partitions.end(), true,
+ keyDesc->GetPartitions().begin(), keyDesc->GetPartitions().end(), true,
[&](const TKeyDesc::TPartitionInfo& partition, bool) {
const int compares = CompareBorders<true, false>(
partition.Range->EndKeyPrefix.GetCells(), range.From,
@@ -286,7 +286,7 @@ class TDistEraser: public TActorBootstrapped<TDistEraser> {
}
);
- Y_VERIFY(it != keyDesc->Partitions.end());
+ Y_VERIFY(it != keyDesc->GetPartitions().end());
return it->ShardId;
}
@@ -541,7 +541,7 @@ class TDistEraser: public TActorBootstrapped<TDistEraser> {
return;
}
- if (entry.KeyDescription->Partitions.empty()) {
+ if (entry.KeyDescription->GetPartitions().empty()) {
return SchemeError(TStringBuilder() << "Empty partitions list"
<< ": entry# " << entry.ToString(*AppData()->TypeRegistry));
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 384b2e5c3f..b0438ef795 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -98,22 +98,26 @@ void TTester::EmptyShardKeyResolver(TKeyDesc& key) {
void TTester::SingleShardKeyResolver(TKeyDesc& key) {
key.Status = TKeyDesc::EStatus::Ok;
- key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0));
+
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
+ partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0));
+ key.Partitioning = partitions;
}
void TTester::ThreeShardPointKeyResolver(TKeyDesc& key) {
const ui32 ShardBorder1 = 1000;
const ui32 ShardBorder2 = 2000;
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
key.Status = TKeyDesc::EStatus::Ok;
if (key.Range.Point) {
ui32 key0 = *(ui32*)key.Range.From[0].Data();
if (key0 < ShardBorder1) {
- key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0));
+ partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0));
} else if (key0 < ShardBorder2) {
- key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet1));
+ partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet1));
} else {
- key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet2));
+ partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet2));
}
} else {
UNIT_ASSERT(key.Range.From.size() > 0);
@@ -126,12 +130,14 @@ void TTester::ThreeShardPointKeyResolver(TKeyDesc& key) {
UNIT_ASSERT(from <= to);
if (from < ShardBorder1)
- key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0));
+ partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0));
if (from < ShardBorder2 && to >= ShardBorder1)
- key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet1));
+ partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet1));
if (to >= ShardBorder2)
- key.Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet2));
+ partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet2));
}
+
+ key.Partitioning = partitions;
}
TTester::TKeyResolver TTester::GetKeyResolver() const {
@@ -332,7 +338,7 @@ ui32 TFakeProxyTx::SetProgram(TTester& tester, const TString& programText) {
keyResolver(*dbKey);
UNIT_ASSERT(dbKey->Status == TKeyDesc::EStatus::Ok);
- for (auto& partition : dbKey->Partitions) {
+ for (auto& partition : dbKey->GetPartitions()) {
resolvedShards.insert(partition.ShardId);
}
}
@@ -997,7 +1003,10 @@ TKeyExtractor::TKeyExtractor(TTester& tester, TString programText) {
for (auto& key : Engine->GetDbKeys()) {
key->Status = TKeyDesc::EStatus::Ok;
- key->Partitions.push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0));
+
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
+ partitions->push_back(TKeyDesc::TPartitionInfo((ui64)TTestTxConfig::TxTablet0));
+ key->Partitioning = partitions;
}
}
diff --git a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
index 9ce091e22b..03fa7a8797 100644
--- a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
@@ -1402,7 +1402,8 @@ tkey = 100, key = 4
check(TEvResponse::ProtoRecordType::SCHEME_ERROR, "Empty partitions list", [](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvResolve::EventType) {
- ev->Get<TEvResolve>()->Request->ResultSet.at(0).KeyDescription->Partitions.clear();
+ ev->Get<TEvResolve>()->Request->ResultSet.at(0).KeyDescription->Partitioning =
+ std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
}
});
diff --git a/ydb/core/tx/datashard/datashard_ut_read_table.cpp b/ydb/core/tx/datashard/datashard_ut_read_table.cpp
index c11e21f2f2..6747d06c46 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_table.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_table.cpp
@@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(DataShardReadTableSnapshots) {
Cerr << "... ignored TEvResolveKeySetResult with errors" << Endl;
break;
}
- size_t partitions = request->ResultSet[0].KeyDescription->Partitions.size();
+ size_t partitions = request->ResultSet[0].KeyDescription->GetPartitions().size();
if (partitions == captureResolveKeySetResultPartitions) {
Cerr << "... captured TEvResolveKeySetResult with " << partitions << " partitions" << Endl;
capturedResolveKeySetResults.emplace_back(ev.Release());
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp
index 152ba66d8e..da39c00ff0 100644
--- a/ydb/core/tx/scheme_board/cache.cpp
+++ b/ydb/core/tx/scheme_board/cache.cpp
@@ -220,7 +220,7 @@ namespace {
entry.DomainInfo.Drop();
TKeyDesc& keyDesc = *entry.KeyDescription;
keyDesc.ColumnInfos.clear();
- keyDesc.Partitions.clear();
+ keyDesc.Partitioning = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
keyDesc.SecurityObject.Drop();
}
@@ -696,7 +696,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
NotNullColumns.clear();
Indexes.clear();
CdcStreams.clear();
- Partitioning.clear();
+ Partitioning = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
Self.Drop();
@@ -746,16 +746,19 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}
if (pathDesc.TablePartitionsSize()) {
- Partitioning.resize(pathDesc.TablePartitionsSize());
+ auto partitioning = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
+ partitioning->resize(pathDesc.TablePartitionsSize());
for (ui32 i : xrange(pathDesc.TablePartitionsSize())) {
const auto& src = pathDesc.GetTablePartitions(i);
- auto& partition = Partitioning[i];
+ auto& partition = (*partitioning)[i];
partition.Range = TKeyDesc::TPartitionRangeInfo();
partition.Range->EndKeyPrefix.Parse(src.GetEndOfRangeKeyPrefix());
partition.Range->IsInclusive = src.HasIsInclusive() && src.GetIsInclusive();
partition.Range->IsPoint = src.HasIsPoint() && src.GetIsPoint();
partition.ShardId = src.GetDatashardId();
}
+
+ Partitioning = std::move(partitioning);
}
if (pathDesc.HasDomainDescription()) {
@@ -851,24 +854,25 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
ptr->Description = std::move(desc);
}
- // copy-paste from core/tx/scheme_cache/scheme_cache_impl.cpp
- void FillRangePartitioning(
- const TTableRange& range,
- TVector<TKeyDesc::TPartitionInfo>& partitions
- ) const {
- Y_VERIFY(!Partitioning.empty());
+ std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> FillRangePartitioning(const TTableRange& range) const {
+ Y_VERIFY(Partitioning);
+ Y_VERIFY(!Partitioning->empty());
+
+ if (range.IsFullRange(KeyColumnTypes.size())) {
+ return Partitioning;
+ }
- partitions.clear();
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
// Temporary fix: for an empty range we need to return some datashard
// so that it can handle readset logic (send empty result to other tx participants etc.)
if (range.IsEmptyRange(KeyColumnTypes)) {
- partitions.push_back(*Partitioning.begin());
- return;
+ partitions->push_back(*Partitioning->begin());
+ return partitions;
}
TVector<TKeyDesc::TPartitionInfo>::const_iterator low = LowerBound(
- Partitioning.begin(), Partitioning.end(), true,
+ Partitioning->begin(), Partitioning->end(), true,
[&](const TKeyDesc::TPartitionInfo& left, bool) {
const int compares = CompareBorders<true, false>(
left.Range->EndKeyPrefix.GetCells(), range.From,
@@ -880,13 +884,13 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}
);
- Y_VERIFY(low != Partitioning.end(), "last key must be (inf)");
+ Y_VERIFY(low != Partitioning->end(), "last key must be (inf)");
do {
- partitions.push_back(*low);
+ partitions->push_back(*low);
if (range.Point) {
- return;
+ return partitions;
}
const int prevComp = CompareBorders<true, true>(
@@ -896,10 +900,12 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
);
if (prevComp >= 0) {
- return;
+ return partitions;
}
- } while (++low != Partitioning.end());
+ } while (++low != Partitioning->end());
+
+ return partitions;
}
bool IsSysTable() const {
@@ -962,6 +968,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
, IsPrivatePath(false)
, IsVirtual(isVirtual)
, SchemaVersion(0)
+ , Partitioning(std::make_shared<TVector<TKeyDesc::TPartitionInfo>>())
{
}
@@ -980,6 +987,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
, IsPrivatePath(other.IsPrivatePath)
, IsVirtual(other.IsVirtual)
, SchemaVersion(other.SchemaVersion)
+ , Partitioning(std::make_shared<TVector<TKeyDesc::TPartitionInfo>>())
{
if (other.Subscriber) {
other.Subscriber = TSubscriber();
@@ -1817,18 +1825,22 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
} else if (Kind == TNavigate::KindOlapStore) {
FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapStore);
// Add all shards of the OLAP store
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
for (ui64 columnShard : OlapStoreInfo->Description.GetColumnShards()) {
- keyDesc.Partitions.push_back(TKeyDesc::TPartitionInfo(columnShard));
- keyDesc.Partitions.back().Range = TKeyDesc::TPartitionRangeInfo();
+ partitions->push_back(TKeyDesc::TPartitionInfo(columnShard));
+ partitions->back().Range = TKeyDesc::TPartitionRangeInfo();
}
+ keyDesc.Partitioning = std::move(partitions);
return;
} else if (Kind == TNavigate::KindOlapTable) {
FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapTable);
// Add all shards of the OLAP table
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
for (ui64 columnShard : OlapTableInfo->Description.GetSharding().GetColumnShards()) {
- keyDesc.Partitions.push_back(TKeyDesc::TPartitionInfo(columnShard));
- keyDesc.Partitions.back().Range = TKeyDesc::TPartitionRangeInfo();
+ partitions->push_back(TKeyDesc::TPartitionInfo(columnShard));
+ partitions->back().Range = TKeyDesc::TPartitionRangeInfo();
}
+ keyDesc.Partitioning = std::move(partitions);
return;
}
@@ -1854,7 +1866,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
keyDesc.Range.From, TSysTables::TLocksTable::EColumns::DataShard, shard
);
if (ok) {
- keyDesc.Partitions.push_back(TKeyDesc::TPartitionInfo(shard));
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
+ partitions->push_back(TKeyDesc::TPartitionInfo(shard));
+ keyDesc.Partitioning = std::move(partitions);
} else {
keyDesc.Status = TKeyDesc::EStatus::OperationNotSupported;
++context->Request->ErrorCount;
@@ -1862,17 +1876,19 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}
} else if (OlapTableInfo) {
// TODO: return proper partitioning info (KIKIMR-11069)
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
for (ui64 columnShard : OlapTableInfo->Description.GetSharding().GetColumnShards()) {
- keyDesc.Partitions.push_back(TKeyDesc::TPartitionInfo(columnShard));
- keyDesc.Partitions.back().Range = TKeyDesc::TPartitionRangeInfo();
+ partitions->push_back(TKeyDesc::TPartitionInfo(columnShard));
+ partitions->back().Range = TKeyDesc::TPartitionRangeInfo();
}
+ keyDesc.Partitioning = std::move(partitions);
} else {
if (Partitioning) {
- FillRangePartitioning(keyDesc.Range, keyDesc.Partitions);
+ keyDesc.Partitioning = FillRangePartitioning(keyDesc.Range);
}
}
- if (keyDesc.Partitions.empty()) {
+ if (keyDesc.GetPartitions().empty()) {
entry.Status = TResolve::EStatus::TypeCheckError;
keyDesc.Status = TKeyDesc::EStatus::OperationNotSupported;
}
@@ -1925,7 +1941,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
THashSet<TString> NotNullColumns;
TVector<NKikimrSchemeOp::TIndexDescription> Indexes;
TVector<NKikimrSchemeOp::TCdcStreamDescription> CdcStreams;
- TVector<TKeyDesc::TPartitionInfo> Partitioning;
+ std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
TIntrusivePtr<TNavigate::TDirEntryInfo> Self;
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.cpp b/ydb/core/tx/scheme_cache/scheme_cache.cpp
index 2ccc4b31a1..1024d73c19 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.cpp
+++ b/ydb/core/tx/scheme_cache/scheme_cache.cpp
@@ -84,7 +84,7 @@ TString TSchemeCacheRequest::TEntry::ToString() const {
<< " SyncVersion: " << (SyncVersion ? "true" : "false")
<< " Status: " << Status
<< " Kind: " << Kind
- << " PartitionsCount: " << (KeyDescription ? ::ToString(KeyDescription->Partitions.size()) : "<moved>")
+ << " PartitionsCount: " << (KeyDescription ? ::ToString(KeyDescription->GetPartitions().size()) : "<moved>")
<< " DomainInfo " << (DomainInfo ? DomainInfo->ToString() : "<null>")
<< " }";
}
@@ -97,7 +97,7 @@ TString TSchemeCacheRequest::TEntry::ToString(const NScheme::TTypeRegistry& type
<< " SyncVersion: " << (SyncVersion ? "true" : "false")
<< " Status: " << Status
<< " Kind: " << Kind
- << " PartitionsCount: " << (KeyDescription ? ::ToString(KeyDescription->Partitions.size()) : "<moved>")
+ << " PartitionsCount: " << (KeyDescription ? ::ToString(KeyDescription->GetPartitions().size()) : "<moved>")
<< " DomainInfo " << (DomainInfo ? DomainInfo->ToString() : "<null>");
if (KeyDescription) {
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
index b2543b22f1..9ed954553c 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
@@ -1833,7 +1833,7 @@ namespace NSchemeShardUT_Private {
for (auto& dbKey : dbKeys) {
ResolveKey(*dbKey);
UNIT_ASSERT(dbKey->Status == TKeyDesc::EStatus::Ok);
- for (auto& partition : dbKey->Partitions) {
+ for (auto& partition : dbKey->GetPartitions()) {
resolvedShards.insert(partition.ShardId);
}
}
@@ -2021,16 +2021,18 @@ namespace NSchemeShardUT_Private {
TestLs(Runtime, Table, true, fnFillInfo);
}
- void TFakeDataReq::TTablePartitioningInfo::ResolveKey(const TTableRange &range, TVector<TKeyDesc::TPartitionInfo> &partitions) const {
+ std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> TFakeDataReq::TTablePartitioningInfo::ResolveKey(
+ const TTableRange& range) const
+ {
Y_VERIFY(!Partitioning.empty());
- partitions.clear();
+ auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
// Temporary fix: for an empty range we need to return some datashard so that it can handle readset logic (
// send empty result to other tx participants etc.)
if (range.IsEmptyRange(KeyColumnTypes)) {
- partitions.push_back(TKeyDesc::TPartitionInfo(Partitioning.begin()->Datashard));
- return;
+ partitions->push_back(TKeyDesc::TPartitionInfo(Partitioning.begin()->Datashard));
+ return partitions;
}
TVector<TBorder>::const_iterator low = LowerBound(Partitioning.begin(), Partitioning.end(), true,
@@ -2041,15 +2043,17 @@ namespace NSchemeShardUT_Private {
Y_VERIFY(low != Partitioning.end(), "last key must be (inf)");
do {
- partitions.push_back(TKeyDesc::TPartitionInfo(low->Datashard));
+ partitions->push_back(TKeyDesc::TPartitionInfo(low->Datashard));
if (range.Point)
- return;
+ return partitions;
int prevComp = CompareBorders<true, true>(low->KeyTuple.GetCells(), range.To, low->Point || low->Inclusive, range.InclusiveTo, KeyColumnTypes);
if (prevComp >= 0)
- return;
+ return partitions;
} while (++low != Partitioning.end());
+
+ return partitions;
}
TEvSchemeShard::TEvModifySchemeTransaction* CombineSchemeTransactions(const TVector<TEvSchemeShard::TEvModifySchemeTransaction*>& transactions) {
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
index de4aaa0f42..24d6ddd6d7 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
@@ -440,7 +440,7 @@ namespace NSchemeShardUT_Private {
TVector<NScheme::TTypeId> KeyColumnTypes;
TVector<TBorder> Partitioning;
- void ResolveKey(const TTableRange& range, TVector<TKeyDesc::TPartitionInfo>& partitions) const;
+ std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> ResolveKey(const TTableRange& range) const;
};
void FillTablePartitioningInfo();
@@ -450,7 +450,7 @@ namespace NSchemeShardUT_Private {
FillTablePartitioningInfo();
}
- TablePartitioningInfo.ResolveKey(dbKey.Range, dbKey.Partitions);
+ dbKey.Partitioning = TablePartitioningInfo.ResolveKey(dbKey.Range);
dbKey.Status = TKeyDesc::EStatus::Ok;
}
diff --git a/ydb/core/tx/tx_proxy/commitreq.cpp b/ydb/core/tx/tx_proxy/commitreq.cpp
index dff7af4a18..3a7982a8db 100644
--- a/ydb/core/tx/tx_proxy/commitreq.cpp
+++ b/ydb/core/tx/tx_proxy/commitreq.cpp
@@ -240,7 +240,7 @@ private:
return Die(ctx);
}
- for (auto& partition : entry.KeyDescription->Partitions) {
+ for (auto& partition : entry.KeyDescription->GetPartitions()) {
auto& state = PerShardStates[partition.ShardId];
state.Tables.insert(entry.KeyDescription->TableId);
}
diff --git a/ydb/core/tx/tx_proxy/datareq.cpp b/ydb/core/tx/tx_proxy/datareq.cpp
index 9ce5729899..c933a19412 100644
--- a/ydb/core/tx/tx_proxy/datareq.cpp
+++ b/ydb/core/tx/tx_proxy/datareq.cpp
@@ -1095,7 +1095,7 @@ void TDataReq::ContinueFlatMKQLResolve(const TActorContext &ctx) {
// we would need shard -> table mapping for scheme cache invalidation on errors
for (const auto& keyDescription : keyDescriptions) {
- for (auto& partition : keyDescription->Partitions) {
+ for (auto& partition : keyDescription->GetPartitions()) {
if (auto *x = PerTablet.FindPtr(partition.ShardId)) {
x->TableId = keyDescription->TableId;
}
@@ -1137,7 +1137,7 @@ void TDataReq::ProcessReadTableResolve(NSchemeCache::TSchemeCacheRequest *cacheR
auto &entry = cacheRequest->ResultSet[0];
ReadTableRequest->KeyDesc = std::move(entry.KeyDescription);
- bool singleShard = ReadTableRequest->KeyDesc->Partitions.size() == 1;
+ bool singleShard = ReadTableRequest->KeyDesc->GetPartitions().size() == 1;
CanUseFollower = false;
bool immediate = singleShard;
@@ -1147,7 +1147,7 @@ void TDataReq::ProcessReadTableResolve(NSchemeCache::TSchemeCacheRequest *cacheR
immediate = true;
}
- for (auto& partition : ReadTableRequest->KeyDesc->Partitions) {
+ for (auto& partition : ReadTableRequest->KeyDesc->GetPartitions()) {
NKikimrTxDataShard::TDataTransaction dataTransaction;
dataTransaction.SetStreamResponse(StreamResponse);
dataTransaction.SetImmediate(immediate);
@@ -1185,7 +1185,7 @@ void TDataReq::ProcessReadTableResolve(NSchemeCache::TSchemeCacheRequest *cacheR
"Actor# " << ctx.SelfID.ToString() << " txid# " << TxId
<< " SEND TEvProposeTransaction to datashard " << partition.ShardId
<< " with read table request"
- << " affected shards " << ReadTableRequest->KeyDesc->Partitions.size()
+ << " affected shards " << ReadTableRequest->KeyDesc->GetPartitions().size()
<< " followers " << (CanUseFollower ? "allowed" : "disallowed") << " marker# P4b");
const TActorId pipeCache = CanUseFollower ? Services.FollowerPipeCache : Services.LeaderPipeCache;
@@ -2777,8 +2777,8 @@ ui64 GetFirstTablet(NSchemeCache::TSchemeCacheRequest &cacheRequest) {
NSchemeCache::TSchemeCacheRequest::TEntry& firstEntry= *cacheRequest.ResultSet.begin();
NKikimr::TKeyDesc& firstKey = *firstEntry.KeyDescription;
- Y_VERIFY(!firstKey.Partitions.empty());
- return firstKey.Partitions.begin()->ShardId;
+ Y_VERIFY(!firstKey.GetPartitions().empty());
+ return firstKey.GetPartitions().begin()->ShardId;
}
const TDomainsInfo::TDomain& TDataReq::SelectDomain(NSchemeCache::TSchemeCacheRequest &cacheRequest, const TActorContext &ctx) {
diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp
index 6c30975fd9..a364d5ddc1 100644
--- a/ydb/core/tx/tx_proxy/read_table_impl.cpp
+++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp
@@ -739,7 +739,7 @@ private:
KeyDesc = std::move(request->ResultSet[0].KeyDescription);
- if (KeyDesc->Partitions.empty()) {
+ if (KeyDesc->GetPartitions().empty()) {
TString error = TStringBuilder() << "No partitions to read from '" << Settings.TablePath << "'";
TXLOG_E(error);
return ReplyAndDie(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::WrongRequest, NKikimrIssues::TStatusIds::BAD_REQUEST, ctx);
@@ -754,8 +754,8 @@ private:
// Do we need to create a new snapshot?
const bool needSnapshot = Settings.ReadVersion.IsMax();
- for (size_t idx = 0; idx < KeyDesc->Partitions.size(); ++idx) {
- const auto& partition = KeyDesc->Partitions[idx];
+ for (size_t idx = 0; idx < KeyDesc->GetPartitions().size(); ++idx) {
+ const auto& partition = KeyDesc->GetPartitions()[idx];
const ui64 shardId = partition.ShardId;
auto [it, inserted] = ShardMap.emplace(
@@ -772,12 +772,12 @@ private:
range.From = KeyFromValues;
range.FromInclusive = KeyDesc->Range.InclusiveFrom;
} else {
- const auto& prevRange = *KeyDesc->Partitions[idx - 1].Range;
+ const auto& prevRange = *KeyDesc->GetPartitions()[idx - 1].Range;
range.From = prevRange.EndKeyPrefix;
range.FromInclusive = !prevRange.IsInclusive; // N.B. always true for now
}
- if (idx == KeyDesc->Partitions.size() - 1) {
+ if (idx == KeyDesc->GetPartitions().size() - 1) {
// Last shard in range
range.To = KeyToValues;
range.ToInclusive = KeyDesc->Range.InclusiveTo;
@@ -2393,7 +2393,7 @@ private:
KeyDesc = std::move(request->ResultSet[0].KeyDescription);
- if (KeyDesc->Partitions.empty()) {
+ if (KeyDesc->GetPartitions().empty()) {
TString error = TStringBuilder() << "No partitions to read from '" << Settings.TablePath << "'";
TXLOG_E(error);
return ReplyAndDie(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::WrongRequest, NKikimrIssues::TStatusIds::BAD_REQUEST, ctx);
@@ -2421,8 +2421,8 @@ private:
THashSet<TShardState*> removed;
- for (size_t idx = 0; idx < KeyDesc->Partitions.size(); ++idx) {
- const auto& partition = KeyDesc->Partitions[idx];
+ for (size_t idx = 0; idx < KeyDesc->GetPartitions().size(); ++idx) {
+ const auto& partition = KeyDesc->GetPartitions()[idx];
const ui64 shardId = partition.ShardId;
TXLOG_T("Processing resolved shard ShardId# " << shardId);
@@ -2452,12 +2452,12 @@ private:
shardRange.From = KeyFromValues;
shardRange.FromInclusive = KeyDesc->Range.InclusiveFrom;
} else {
- const auto& prevRange = *KeyDesc->Partitions[idx - 1].Range;
+ const auto& prevRange = *KeyDesc->GetPartitions()[idx - 1].Range;
shardRange.From = prevRange.EndKeyPrefix;
shardRange.FromInclusive = !prevRange.IsInclusive; // N.B. always true for now
}
- if (idx == KeyDesc->Partitions.size() - 1) {
+ if (idx == KeyDesc->GetPartitions().size() - 1) {
// Last shard in range
shardRange.To = KeyToValues;
shardRange.ToInclusive = KeyDesc->Range.InclusiveTo;
diff --git a/ydb/core/tx/tx_proxy/snapshotreq.cpp b/ydb/core/tx/tx_proxy/snapshotreq.cpp
index 4b2d7fe275..28fc4dd779 100644
--- a/ydb/core/tx/tx_proxy/snapshotreq.cpp
+++ b/ydb/core/tx/tx_proxy/snapshotreq.cpp
@@ -328,7 +328,7 @@ public:
return Die(ctx);
}
- for (auto& partition : entry.KeyDescription->Partitions) {
+ for (auto& partition : entry.KeyDescription->GetPartitions()) {
auto& state = PerShardStates[partition.ShardId];
state.Tables.insert(entry.KeyDescription->TableId);
}
@@ -1367,7 +1367,7 @@ public:
return Die(ctx);
}
- for (auto& partition : entry.KeyDescription->Partitions) {
+ for (auto& partition : entry.KeyDescription->GetPartitions()) {
auto& state = PerShardStates[partition.ShardId];
state.Tables.insert(entry.KeyDescription->TableId);
}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index c05c11e6e2..e123b55c41 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -862,7 +862,8 @@ private:
return JoinVectorIntoString(shards, ", ");
};
- LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: " << getShardsString(GetKeyRange()->Partitions));
+ LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Range shards: "
+ << getShardsString(GetKeyRange()->GetPartitions()));
MakeShardRequests(ctx);
}
@@ -870,13 +871,13 @@ private:
void MakeShardRequests(const NActors::TActorContext& ctx) {
const auto* keyRange = GetKeyRange();
- Y_VERIFY(!keyRange->Partitions.empty());
+ Y_VERIFY(!keyRange->GetPartitions().empty());
// Group rows by shard id
- TVector<std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>> shardRequests(keyRange->Partitions.size());
+ TVector<std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>> shardRequests(keyRange->GetPartitions().size());
for (const auto& keyValue : GetRows()) {
// Find partition for the key
- auto it = std::lower_bound(keyRange->Partitions.begin(), keyRange->Partitions.end(), keyValue.first.GetCells(),
+ auto it = std::lower_bound(keyRange->GetPartitions().begin(), keyRange->GetPartitions().end(), keyValue.first.GetCells(),
[this](const auto &partition, const auto& key) {
const auto& range = *partition.Range;
const int cmp = CompareBorders<true, false>(range.EndKeyPrefix.GetCells(), key,
@@ -885,7 +886,7 @@ private:
return (cmp < 0);
});
- size_t shardIdx = it - keyRange->Partitions.begin();
+ size_t shardIdx = it - keyRange->GetPartitions().begin();
TEvDataShard::TEvUploadRowsRequest* ev = shardRequests[shardIdx].get();
if (!ev) {
@@ -915,7 +916,7 @@ private:
if (!shardRequests[idx])
continue;
- TTabletId shardId = keyRange->Partitions[idx].ShardId;
+ TTabletId shardId = keyRange->GetPartitions()[idx].ShardId;
LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Sending request to shards " << shardId);