aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-02-09 16:43:14 +0300
committerilnaz <ilnaz@ydb.tech>2023-02-09 16:43:14 +0300
commit44532357e0abcca4f714c8003e346da33b24c7d8 (patch)
tree9eac263dfda4142911d1dea064527508969faa0c
parentc5b21b295d416f2222d7e9232de4a6f97455bcf4 (diff)
downloadydb-44532357e0abcca4f714c8003e346da33b24c7d8.tar.gz
Controller per replication
-rw-r--r--ydb/core/tx/replication/controller/target_base.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp14
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp62
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp70
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp30
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h19
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp7
-rw-r--r--ydb/core/tx/schemeshard/ut_replication.cpp45
10 files changed, 156 insertions, 113 deletions
diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp
index 9fce0155f03..0adcb2e17ce 100644
--- a/ydb/core/tx/replication/controller/target_base.cpp
+++ b/ydb/core/tx/replication/controller/target_base.cpp
@@ -6,6 +6,7 @@
namespace NKikimr::NReplication::NController {
+using ETargetKind = TReplication::ETargetKind;
using EDstState = TReplication::EDstState;
using EStreamState = TReplication::EStreamState;
@@ -75,7 +76,7 @@ ui64 TTargetBase::GetTargetId() const {
return TargetId;
}
-TReplication::ETargetKind TTargetBase::GetTargetKind() const {
+ETargetKind TTargetBase::GetTargetKind() const {
return Kind;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
index e69ae1be7c5..673228354dd 100644
--- a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
@@ -131,9 +131,6 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase {
case ETabletType::SequenceShard:
domain->RemoveSequenceShard(ShardIdx);
break;
- case ETabletType::ReplicationController:
- domain->RemoveReplicationController(ShardIdx);
- break;
default:
break;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index d0a07368740..228cf257e4a 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2011,6 +2011,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
THashMap<TPathId, TShardIdx> nbsVolumeShards; // pathId -> shardIdx
THashMap<TPathId, TShardIdx> fileStoreShards; // pathId -> shardIdx
THashMap<TPathId, TShardIdx> kesusShards; // pathId -> shardIdx
+ THashMap<TPathId, TShardIdx> replicationControllers;
THashMap<TPathId, TShardIdx> blobDepotShards;
THashMap<TPathId, TVector<TShardIdx>> olapColumnShards;
{
@@ -2063,6 +2064,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
case ETabletType::ColumnShard:
olapColumnShards[shard.PathId].push_back(idx);
break;
+ case ETabletType::ReplicationController:
+ replicationControllers.emplace(shard.PathId, idx);
+ break;
case ETabletType::BlobDepot:
blobDepotShards.emplace(shard.PathId, idx);
break;
@@ -3887,7 +3891,6 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
break;
case ETabletType::ReplicationController:
Self->TabletCounters->Simple()[COUNTER_REPLICATION_CONTROLLER_COUNT].Add(1);
- domainInfo->AddReplicationController(shardIdx);
break;
case ETabletType::BlobDepot:
Self->TabletCounters->Simple()[COUNTER_BLOB_DEPOT_COUNT].Add(1);
@@ -4670,6 +4673,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Self->Replications[pathId] = replicationInfo;
Self->IncrementPathDbRefCount(pathId);
+ if (replicationControllers.contains(pathId)) {
+ replicationInfo->ControllerShardIdx = replicationControllers.at(pathId);
+ }
+
if (!rowset.Next()) {
return false;
}
@@ -4692,7 +4699,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TReplicationInfo::TPtr alterData = new TReplicationInfo(alterVersion, std::move(description));
Y_VERIFY_S(Self->Replications.contains(pathId),
"Cannot load alter for replication " << pathId);
- Self->Replications[pathId]->AlterData = alterData;
+ auto replicationInfo = Self->Replications.at(pathId);
+
+ alterData->ControllerShardIdx = replicationInfo->ControllerShardIdx;
+ replicationInfo->AlterData = alterData;
if (!rowset.Next()) {
return false;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
index cdb3d6da15f..6ee4787ab39 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
@@ -266,10 +266,6 @@ public:
}
}
- const auto domainPathId = parentPath.GetPathIdForDomain();
- auto domainInfo = parentPath.DomainInfo();
- const ui64 shardsToCreate = domainInfo->GetReplicationControllers().empty();
-
auto path = parentPath.Child(name);
{
const auto checks = path.Check();
@@ -293,7 +289,7 @@ public:
.DepthLimit()
.PathsLimit()
.DirChildrenLimit()
- .ShardsLimit(shardsToCreate)
+ .ShardsLimit(1)
.IsValidACL(acl);
}
@@ -315,12 +311,10 @@ public:
}
TChannelsBindings channelsBindings;
- if (shardsToCreate) {
- if (!context.SS->ResolveTabletChannels(0, domainPathId, channelsBindings)) {
- result->SetError(NKikimrScheme::StatusInvalidParameter,
- "Unable to construct channel binding for replication controller with the storage pool");
- return result;
- }
+ if (!context.SS->ResolveTabletChannels(0, parentPath.GetPathIdForDomain(), channelsBindings)) {
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "Unable to construct channel binding for replication controller with the storage pool");
+ return result;
}
path.MaterializeLeaf(owner);
@@ -332,39 +326,25 @@ public:
context.SS->IncrementPathDbRefCount(path->PathId);
parentPath->IncAliveChildren();
- domainInfo->IncPathsInside();
+ parentPath.DomainInfo()->IncPathsInside();
+
+ auto replication = TReplicationInfo::Create(std::move(desc));
+ context.SS->Replications[path->PathId] = replication;
context.SS->TabletCounters->Simple()[COUNTER_REPLICATION_COUNT].Add(1);
+ replication->AlterData->ControllerShardIdx = context.SS->RegisterShardInfo(
+ TShardInfo::ReplicationControllerInfo(OperationId.GetTxId(), path->PathId)
+ .WithBindedChannels(channelsBindings));
+ context.SS->TabletCounters->Simple()[COUNTER_REPLICATION_CONTROLLER_COUNT].Add(1);
+
Y_VERIFY(!context.SS->FindTx(OperationId));
auto& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateReplication, path->PathId);
+ txState.Shards.emplace_back(replication->AlterData->ControllerShardIdx,
+ ETabletType::ReplicationController, TTxState::CreateParts);
+ txState.State = TTxState::CreateParts;
- if (shardsToCreate) {
- const auto shardIdx = context.SS->RegisterShardInfo(
- TShardInfo::ReplicationControllerInfo(OperationId.GetTxId(), domainPathId)
- .WithBindedChannels(channelsBindings));
- context.SS->TabletCounters->Simple()[COUNTER_REPLICATION_CONTROLLER_COUNT].Add(1);
-
- txState.Shards.emplace_back(shardIdx, ETabletType::ReplicationController, TTxState::CreateParts);
- txState.State = TTxState::CreateParts;
-
- Y_VERIFY(context.SS->PathsById.contains(domainPathId));
- context.SS->PathsById.at(domainPathId)->IncShardsInside();
-
- domainInfo->AddInternalShard(shardIdx);
- domainInfo->AddReplicationController(shardIdx);
- } else {
- const auto shardIdx = *domainInfo->GetReplicationControllers().begin();
-
- txState.Shards.emplace_back(shardIdx, ETabletType::ReplicationController, TTxState::ConfigureParts);
- txState.State = TTxState::ConfigureParts;
-
- Y_VERIFY(context.SS->ShardInfos.contains(shardIdx));
- const auto& shardInfo = context.SS->ShardInfos.at(shardIdx);
-
- if (shardInfo.CurrentTxId != OperationId.GetTxId()) {
- context.OnComplete.Dependence(shardInfo.CurrentTxId, OperationId.GetTxId());
- }
- }
+ path->IncShardsInside();
+ parentPath.DomainInfo()->AddInternalShards(txState);
if (parentPath->HasActiveChanges()) {
const auto parentTxId = parentPath->PlannedToCreate() ? parentPath->CreateTxId : parentPath->LastTxId;
@@ -379,8 +359,6 @@ public:
context.SS->PersistACL(db, path.Base());
}
- auto replication = TReplicationInfo::Create(std::move(desc));
- context.SS->Replications[path->PathId] = replication;
context.SS->PersistReplication(db, path->PathId, *replication);
context.SS->PersistReplicationAlter(db, path->PathId, *replication->AlterData);
@@ -390,7 +368,7 @@ public:
const TShardInfo& shardInfo = context.SS->ShardInfos.at(shard.Idx);
if (shard.Operation == TTxState::CreateParts) {
- context.SS->PersistShardMapping(db, shard.Idx, InvalidTabletId, domainPathId, OperationId.GetTxId(), shard.TabletType);
+ context.SS->PersistShardMapping(db, shard.Idx, InvalidTabletId, path->PathId, OperationId.GetTxId(), shard.TabletType);
context.SS->PersistChannelsBinding(db, shard.Idx, shardInfo.BindedChannels);
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
index d7c53c30e11..93431a6ba3e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
@@ -94,7 +94,7 @@ public:
}
NIceDb::TNiceDb db(context.GetDB());
- context.SS->ChangeTxState(db, OperationId, TTxState::Propose);
+ context.SS->ChangeTxState(db, OperationId, TTxState::DeleteParts);
context.OnComplete.ActivateTx(OperationId);
return true;
@@ -105,6 +105,45 @@ private:
}; // TDropParts
+class TDeleteParts: public TSubOperationState {
+ TString DebugHint() const override {
+ return TStringBuilder()
+ << "TDropReplication TDeleteParts"
+ << " opId# " << OperationId << " ";
+ }
+
+public:
+ explicit TDeleteParts(TOperationId id)
+ : OperationId(id)
+ {
+ IgnoreMessages(DebugHint(), {
+ NReplication::TEvController::TEvDropReplicationResult::EventType,
+ });
+ }
+
+ bool ProgressState(TOperationContext& context) override {
+ LOG_I(DebugHint() << "ProgressState");
+
+ TTxState* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+ Y_VERIFY(txState->TxType == TTxState::TxDropReplication);
+
+ for (const auto& shard : txState->Shards) {
+ context.OnComplete.DeleteShard(shard.Idx);
+ }
+
+ NIceDb::TNiceDb db(context.GetDB());
+ context.SS->ChangeTxState(db, OperationId, TTxState::Propose);
+ context.OnComplete.ActivateTx(OperationId);
+
+ return true;
+ }
+
+private:
+ const TOperationId OperationId;
+
+}; // TDeleteParts
+
class TPropose: public TSubOperationState {
TString DebugHint() const override {
return TStringBuilder()
@@ -190,6 +229,8 @@ class TDropReplication: public TSubOperation {
TTxState::ETxState NextState(TTxState::ETxState state) const override {
switch (state) {
case TTxState::DropParts:
+ return TTxState::DeleteParts;
+ case TTxState::DeleteParts:
return TTxState::Propose;
case TTxState::Propose:
return TTxState::Done;
@@ -202,6 +243,8 @@ class TDropReplication: public TSubOperation {
switch (state) {
case TTxState::DropParts:
return MakeHolder<TDropParts>(OperationId);
+ case TTxState::DeleteParts:
+ return MakeHolder<TDeleteParts>(OperationId);
case TTxState::Propose:
return MakeHolder<TPropose>(OperationId);
case TTxState::Done:
@@ -285,11 +328,10 @@ public:
txState.State = TTxState::DropParts;
txState.MinStep = TStepId(1);
- for (const auto& shardIdx: path.DomainInfo()->GetReplicationControllers()) {
- Y_VERIFY_S(context.SS->ShardInfos.contains(shardIdx), "Unknown shardIdx " << shardIdx);
- const auto tabletType = context.SS->ShardInfos.at(shardIdx).TabletType;
- txState.Shards.emplace_back(shardIdx, tabletType, TTxState::DropParts);
- }
+ const auto& shardIdx = replication->ControllerShardIdx;
+ Y_VERIFY_S(context.SS->ShardInfos.contains(shardIdx), "Unknown shardIdx " << shardIdx);
+ const auto tabletType = context.SS->ShardInfos.at(shardIdx).TabletType;
+ txState.Shards.emplace_back(shardIdx, tabletType, TTxState::DropParts);
path->PathState = TPathElement::EPathState::EPathStateDrop;
path->DropTxId = OperationId.GetTxId();
@@ -324,6 +366,22 @@ public:
LOG_N("TDropReplication AbortUnsafe"
<< ": opId# " << OperationId
<< ", txId# " << txId);
+
+ TTxState* txState = context.SS->FindTx(OperationId);
+
+ Y_VERIFY(txState);
+ const auto& pathId = txState->TargetPathId;
+
+ Y_VERIFY(context.SS->PathsById.contains(pathId));
+ auto path = context.SS->PathsById.at(pathId);
+
+ Y_VERIFY(path);
+ if (path->Dropped()) {
+ for (auto shard : txState->Shards) {
+ context.OnComplete.DeleteShard(shard.Idx);
+ }
+ }
+
context.OnComplete.DoneOperation(OperationId);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 117351ed001..5f807a5cfff 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1279,36 +1279,6 @@ TShardIdx TSchemeShard::NextShardIdx(const TShardIdx& shardIdx, ui64 inc) const
return MakeLocalId(TLocalShardIdx(nextLocalId));
}
-TShardIdx TSchemeShard::RegisterShardInfo(TShardInfo&& shardInfo) {
- TShardIdx shardIdx = ReserveShardIdxs(1);
- return RegisterShardInfo(shardIdx, std::move(shardInfo));
-}
-
-TShardIdx TSchemeShard::RegisterShardInfo(const TShardInfo& shardInfo) {
- TShardIdx shardIdx = ReserveShardIdxs(1);
- return RegisterShardInfo(shardIdx, shardInfo);
-}
-
-TShardIdx TSchemeShard::RegisterShardInfo(const TShardIdx& shardIdx, TShardInfo&& shardInfo) {
- Y_VERIFY(shardIdx.GetOwnerId() == TabletID());
- ui64 localId = ui64(shardIdx.GetLocalId());
- Y_VERIFY_S(localId < NextLocalShardIdx, "shardIdx: " << shardIdx << " NextLocalShardIdx: " << NextLocalShardIdx);
- Y_VERIFY_S(!ShardInfos.contains(shardIdx), "shardIdx: " << shardIdx << " already registered");
- IncrementPathDbRefCount(shardInfo.PathId, "new shard created");
- ShardInfos.emplace(shardIdx, std::move(shardInfo));
- return shardIdx;
-}
-
-TShardIdx TSchemeShard::RegisterShardInfo(const TShardIdx& shardIdx, const TShardInfo& shardInfo) {
- Y_VERIFY(shardIdx.GetOwnerId() == TabletID());
- ui64 localId = ui64(shardIdx.GetLocalId());
- Y_VERIFY_S(localId < NextLocalShardIdx, "shardIdx: " << shardIdx << " NextLocalShardIdx: " << NextLocalShardIdx);
- Y_VERIFY_S(!ShardInfos.contains(shardIdx), "shardIdx: " << shardIdx << " already registered");
- IncrementPathDbRefCount(shardInfo.PathId, "new shard created");
- ShardInfos.emplace(shardIdx, shardInfo);
- return shardIdx;
-}
-
const TTableInfo* TSchemeShard::GetMainTableForIndex(TPathId indexTableId) const {
if (!Tables.contains(indexTableId))
return nullptr;
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 37c0803cd5c..0d46ef9a7aa 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -525,10 +525,21 @@ public:
TShardIdx ReserveShardIdxs(ui64 count);
TShardIdx NextShardIdx(const TShardIdx& shardIdx, ui64 inc) const;
- TShardIdx RegisterShardInfo(TShardInfo&& shardInfo);
- TShardIdx RegisterShardInfo(const TShardInfo& shardInfo);
- TShardIdx RegisterShardInfo(const TShardIdx& shardIdx, TShardInfo&& shardInfo);
- TShardIdx RegisterShardInfo(const TShardIdx& shardIdx, const TShardInfo& shardInfo);
+ template <typename T>
+ TShardIdx RegisterShardInfo(T&& shardInfo) {
+ return RegisterShardInfo(ReserveShardIdxs(1), std::forward<T>(shardInfo));
+ }
+
+ template <typename T>
+ TShardIdx RegisterShardInfo(const TShardIdx& shardIdx, T&& shardInfo) {
+ Y_VERIFY(shardIdx.GetOwnerId() == TabletID());
+ const auto localId = ui64(shardIdx.GetLocalId());
+ Y_VERIFY_S(localId < NextLocalShardIdx, "shardIdx: " << shardIdx << " NextLocalShardIdx: " << NextLocalShardIdx);
+ Y_VERIFY_S(!ShardInfos.contains(shardIdx), "shardIdx: " << shardIdx << " already registered");
+ IncrementPathDbRefCount(shardInfo.PathId, "new shard created");
+ ShardInfos.emplace(shardIdx, std::forward<T>(shardInfo));
+ return shardIdx;
+ }
TTxState& CreateTx(TOperationId opId, TTxState::ETxType txType, TPathId targetPath, TPathId sourcePath = InvalidPathId);
TTxState* FindTx(TOperationId opId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index b94a63e9a16..b156aee3658 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -1749,20 +1749,6 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> {
SequenceShards.erase(it);
}
- const THashSet<TShardIdx>& GetReplicationControllers() const {
- return ReplicationControllers;
- }
-
- void AddReplicationController(const TShardIdx& shardIdx) {
- ReplicationControllers.insert(shardIdx);
- }
-
- void RemoveReplicationController(const TShardIdx& shardIdx) {
- auto it = ReplicationControllers.find(shardIdx);
- Y_VERIFY_S(it != ReplicationControllers.end(), "shardIdx: " << shardIdx);
- ReplicationControllers.erase(it);
- }
-
const NKikimrSubDomains::TProcessingParams& GetProcessingParams() const {
return ProcessingParams;
}
@@ -2001,7 +1987,6 @@ private:
THashSet<TShardIdx> InternalShards;
THashSet<TShardIdx> BackupShards;
THashSet<TShardIdx> SequenceShards;
- THashSet<TShardIdx> ReplicationControllers;
ui64 PQPartitionsInsideCount = 0;
ui64 PQReservedStorage = 0;
@@ -2436,6 +2421,7 @@ struct TReplicationInfo : public TSimpleRefCount<TReplicationInfo> {
ui64 AlterVersion = 0;
TReplicationInfo::TPtr AlterData = nullptr;
NKikimrSchemeOp::TReplicationDescription Description;
+ TShardIdx ControllerShardIdx = InvalidShardIdx;
};
struct TBlobDepotInfo : TSimpleRefCount<TBlobDepotInfo> {
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index b5e8b82a802..538580049f9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -1169,7 +1169,7 @@ void TSchemeShard::DescribeReplication(const TPathId& pathId, const TString& nam
void TSchemeShard::DescribeReplication(const TPathId& pathId, const TString& name, TReplicationInfo::TPtr info,
NKikimrSchemeOp::TReplicationDescription& desc)
{
- Y_VERIFY_S(info, "Empty sequence info"
+ Y_VERIFY_S(info, "Empty replication info"
<< " pathId# " << pathId
<< " name# " << name);
@@ -1179,10 +1179,7 @@ void TSchemeShard::DescribeReplication(const TPathId& pathId, const TString& nam
PathIdFromPathId(pathId, desc.MutablePathId());
desc.SetVersion(info->AlterVersion);
- const auto& controllers = ResolveDomainInfo(pathId)->GetReplicationControllers();
- if (!controllers.empty()) {
- const auto shardIdx = *controllers.begin();
-
+ if (const auto& shardIdx = info->ControllerShardIdx; shardIdx != InvalidShardIdx) {
Y_VERIFY(ShardInfos.contains(shardIdx));
const auto& shardInfo = ShardInfos.at(shardIdx);
diff --git a/ydb/core/tx/schemeshard/ut_replication.cpp b/ydb/core/tx/schemeshard/ut_replication.cpp
index 0f511f2188d..2030564642d 100644
--- a/ydb/core/tx/schemeshard/ut_replication.cpp
+++ b/ydb/core/tx/schemeshard/ut_replication.cpp
@@ -8,6 +8,13 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
runtime.SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NActors::NLog::PRI_TRACE);
}
+ ui64 ExtractControllerId(const NKikimrSchemeOp::TPathDescription& desc) {
+ UNIT_ASSERT(desc.HasReplicationDescription());
+ const auto& r = desc.GetReplicationDescription();
+ UNIT_ASSERT(r.HasControllerId());
+ return r.GetControllerId();
+ }
+
Y_UNIT_TEST(Create) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
@@ -28,6 +35,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
ui64 txId = 100;
SetupLogging(runtime);
+ THashSet<ui64> controllerIds;
for (int i = 0; i < 2; ++i) {
const auto name = Sprintf("Replication%d", i);
@@ -36,8 +44,17 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
Name: "%s"
)", name.c_str()));
env.TestWaitNotification(runtime, txId);
- TestLs(runtime, "/MyRoot/" + name, false, NLs::PathExist);
+
+ const auto desc = DescribePath(runtime, "/MyRoot/" + name);
+ TestDescribeResult(desc, {
+ NLs::PathExist,
+ NLs::Finished,
+ });
+
+ controllerIds.insert(ExtractControllerId(desc.GetPathDescription()));
}
+
+ UNIT_ASSERT_VALUES_EQUAL(controllerIds.size(), 2);
}
Y_UNIT_TEST(CreateInParallel) {
@@ -46,6 +63,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
ui64 txId = 100;
SetupLogging(runtime);
+ THashSet<ui64> controllerIds;
for (int i = 0; i < 2; ++i) {
TVector<TString> names;
@@ -64,9 +82,17 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
env.TestWaitNotification(runtime, txIds);
for (const auto& name : names) {
- TestLs(runtime, "/MyRoot/" + name, false, NLs::PathExist);
+ const auto desc = DescribePath(runtime, "/MyRoot/" + name);
+ TestDescribeResult(desc, {
+ NLs::PathExist,
+ NLs::Finished,
+ });
+
+ controllerIds.insert(ExtractControllerId(desc.GetPathDescription()));
}
}
+
+ UNIT_ASSERT_VALUES_EQUAL(controllerIds.size(), 4);
}
Y_UNIT_TEST(CreateDropRecreate) {
@@ -75,22 +101,31 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
ui64 txId = 100;
SetupLogging(runtime);
+ ui64 controllerId = 0;
TestCreateReplication(runtime, ++txId, "/MyRoot", R"(
Name: "Replication"
)");
env.TestWaitNotification(runtime, txId);
- TestLs(runtime, "/MyRoot/Replication", false, NLs::PathExist);
+ {
+ const auto desc = DescribePath(runtime, "/MyRoot/Replication");
+ TestDescribeResult(desc, {NLs::PathExist});
+ controllerId = ExtractControllerId(desc.GetPathDescription());
+ }
TestDropReplication(runtime, ++txId, "/MyRoot", "Replication");
env.TestWaitNotification(runtime, txId);
- TestLs(runtime, "/MyRoot/Replication", false, NLs::PathNotExist);
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Replication"), {NLs::PathNotExist});
TestCreateReplication(runtime, ++txId, "/MyRoot", R"(
Name: "Replication"
)");
env.TestWaitNotification(runtime, txId);
- TestLs(runtime, "/MyRoot/Replication", false, NLs::PathExist);
+ {
+ const auto desc = DescribePath(runtime, "/MyRoot/Replication");
+ TestDescribeResult(desc, {NLs::PathExist});
+ UNIT_ASSERT_VALUES_UNEQUAL(controllerId, ExtractControllerId(desc.GetPathDescription()));
+ }
}
} // TReplicationTests