diff options
author | alexbogo <alexbogo@ydb.tech> | 2022-10-18 15:27:03 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2022-10-18 15:27:03 +0300 |
commit | d752b0964621cf9ad026a32d168deaebfbeeaef0 (patch) | |
tree | 994d01b63dc429f3294867b1685ee1fe9133fdd4 | |
parent | 28a69e75259ba949f9d8cc24bc30c738272635ce (diff) | |
download | ydb-d752b0964621cf9ad026a32d168deaebfbeeaef0.tar.gz |
[ymq] dont fill fixed leader tablet id for common tables format (v1)
init
-rw-r--r-- | ydb/core/ymq/actor/CMakeLists.txt | 5 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_schema.cpp | 111 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_schema.h | 12 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/multinode/test_multinode_cluster.py | 4 |
4 files changed, 63 insertions, 69 deletions
diff --git a/ydb/core/ymq/actor/CMakeLists.txt b/ydb/core/ymq/actor/CMakeLists.txt index 5bf19c4d5b8..ae365bfa4fe 100644 --- a/ydb/core/ymq/actor/CMakeLists.txt +++ b/ydb/core/ymq/actor/CMakeLists.txt @@ -120,3 +120,8 @@ generate_enum_serilization(core-ymq-actor INCLUDE_HEADERS ydb/core/ymq/actor/fifo_cleanup.h ) +generate_enum_serilization(core-ymq-actor + ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/queue_schema.h + INCLUDE_HEADERS + ydb/core/ymq/actor/queue_schema.h +) diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp index 616843185e3..92621a4f27b 100644 --- a/ydb/core/ymq/actor/queue_schema.cpp +++ b/ydb/core/ymq/actor/queue_schema.cpp @@ -510,10 +510,6 @@ void TCreateQueueSchemaActorV2::CreateComponents() { break; } - case ECreateComponentsStep::DescribeTableForSetSchemeShardId: { - SendDescribeTable(); - break; - } case ECreateComponentsStep::DiscoverLeaderTabletId: { RequestLeaderTabletId(); break; @@ -522,6 +518,10 @@ void TCreateQueueSchemaActorV2::CreateComponents() { AddRPSQuota(); break; } + case ECreateComponentsStep::Commit: { + CommitNewVersion(); + break; + } } } @@ -529,7 +529,6 @@ STATEFN(TCreateQueueSchemaActorV2::CreateComponentsState) { switch (ev->GetTypeRewrite()) { hFunc(TSqsEvents::TEvExecuted, OnExecuted); hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, OnDescribeSchemeResult); - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTableDescription); hFunc(NKesus::TEvKesus::TEvAddQuoterResourceResult, HandleAddQuoterResource); cFunc(TEvPoisonPill::EventType, PassAway); } @@ -554,7 +553,11 @@ void TCreateQueueSchemaActorV2::Step() { CurrentCreationStep_ = ECreateComponentsStep::MakeShards; } } else { - CurrentCreationStep_ = ECreateComponentsStep::DescribeTableForSetSchemeShardId; + if (Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()) { + CurrentCreationStep_ = ECreateComponentsStep::AddQuoterResource; + } else { + CurrentCreationStep_ = ECreateComponentsStep::Commit; + } } break; } @@ -576,19 +579,20 @@ void TCreateQueueSchemaActorV2::Step() { CurrentCreationStep_ = ECreateComponentsStep::DiscoverLeaderTabletId; break; } - case ECreateComponentsStep::DescribeTableForSetSchemeShardId: { - Y_VERIFY(TablesFormat_ == 1); - Y_VERIFY(TableWithLeaderPathId_.first && TableWithLeaderPathId_.second); - CurrentCreationStep_ = ECreateComponentsStep::DiscoverLeaderTabletId; - break; - } case ECreateComponentsStep::DiscoverLeaderTabletId: { - Y_VERIFY(Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()); - CurrentCreationStep_ = ECreateComponentsStep::AddQuoterResource; + if (Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()) { + CurrentCreationStep_ = ECreateComponentsStep::AddQuoterResource; + } else { + CurrentCreationStep_ = ECreateComponentsStep::Commit; + } break; } case ECreateComponentsStep::AddQuoterResource: { - Y_VERIFY(false); // unreachable + CurrentCreationStep_ = ECreateComponentsStep::Commit; + break; + } + default: { + Y_VERIFY_S(false, "incorrect queue creation step: " << CurrentCreationStep_); // unreachable break; } } @@ -657,7 +661,7 @@ void TCreateQueueSchemaActorV2::OnDescribeSchemeResult(NSchemeShard::TEvSchemeSh const auto& pathDescription = ev->Get()->GetRecord().GetPathDescription(); if (ev->Get()->GetRecord().GetStatus() != NKikimrScheme::StatusSuccess || pathDescription.TablePartitionsSize() == 0 || !pathDescription.GetTablePartitions(0).GetDatashardId()) { - // fail + RLOG_SQS_ERROR("Failed to discover leader: " << ev->Get()->GetRecord()); auto resp = MakeErrorResponse(NErrors::INTERNAL_FAILURE); resp->State = EQueueState::Creating; resp->Error = "Failed to discover leader."; @@ -670,34 +674,6 @@ void TCreateQueueSchemaActorV2::OnDescribeSchemeResult(NSchemeShard::TEvSchemeSh LeaderTabletId_ = pathDescription.GetTablePartitions(0).GetDatashardId(); - if (Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()) { - Step(); - } else { - CommitNewVersion(); - } -} - -void TCreateQueueSchemaActorV2::SendDescribeTable() { - auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); - - NSchemeCache::TSchemeCacheNavigate::TEntry entry; - entry.Path = NKikimr::SplitPath(Cfg().GetRoot() + "/.Queues"); - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; - navigateRequest->ResultSet.emplace_back(entry); - - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release())); -} - -void TCreateQueueSchemaActorV2::HandleTableDescription(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get(); - Y_VERIFY(result->ResultSet.size() == 1); - const auto& response = result->ResultSet.front(); - - TableWithLeaderPathId_ = std::make_pair( - response.TableId.PathId.OwnerId, - response.TableId.PathId.LocalPathId - ); - Step(); } @@ -714,7 +690,7 @@ void TCreateQueueSchemaActorV2::HandleAddQuoterResource(NKesus::TEvKesus::TEvAdd auto status = ev->Get()->Record.GetError().GetStatus(); if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::ALREADY_EXISTS) { RLOG_SQS_DEBUG("Successfully added quoter resource. Id: " << ev->Get()->Record.GetResourceId()); - CommitNewVersion(); + Step(); } else { RLOG_SQS_WARN("Failed to add quoter resource: " << ev->Get()->Record); auto resp = MakeErrorResponse(status == Ydb::StatusIds::BAD_REQUEST ? NErrors::VALIDATION_ERROR : NErrors::INTERNAL_FAILURE); @@ -985,7 +961,7 @@ void TCreateQueueSchemaActorV2::CommitNewVersion() { auto ev = MakeExecuteEvent(query); auto* trans = ev->Record.MutableTransaction()->MutableMiniKQLTransaction(); - Y_VERIFY(LeaderTabletId_ != 0); + Y_VERIFY(TablesFormat_ == 1 || LeaderTabletId_ != 0); TParameters(trans->MutableParams()->MutableProto()) .Utf8("NAME", QueuePath_.QueueName) .Utf8("CUSTOMNAME", CustomQueueName_) @@ -1174,7 +1150,7 @@ TDeleteQueueSchemaActorV2::TDeleteQueueSchemaActorV2(const TQueuePath& path, , IsFifo_(isFifo) , TablesFormat_(tablesFormat) , Sender_(sender) - , SI_(static_cast<ui32>(EDeleting::EraseQueueRecord)) + , DeletionStep_(EDeleting::EraseQueueRecord) , RequestId_(requestId) , UserCounters_(std::move(userCounters)) { @@ -1193,7 +1169,7 @@ TDeleteQueueSchemaActorV2::TDeleteQueueSchemaActorV2(const TQueuePath& path, , IsFifo_(isFifo) , TablesFormat_(tablesFormat) , Sender_(sender) - , SI_(static_cast<ui32>(tablesFormat == 0 ? EDeleting::RemoveTables : EDeleting::RemoveQueueVersionDirectory)) + , DeletionStep_(tablesFormat == 0 ? EDeleting::RemoveTables : EDeleting::RemoveQueueVersionDirectory) , RequestId_(requestId) , UserCounters_(std::move(userCounters)) { @@ -1365,7 +1341,7 @@ static const char* EraseQueueRecordQuery = R"__( )__"; void TDeleteQueueSchemaActorV2::NextAction() { - switch (EDeleting(SI_)) { + switch (DeletionStep_) { case EDeleting::EraseQueueRecord: { TString queueStateDir = QueuePath_.GetVersionedQueuePath(); if (TablesFormat_ == 1) { @@ -1431,12 +1407,12 @@ void TDeleteQueueSchemaActorV2::NextAction() { } void TDeleteQueueSchemaActorV2::DoSuccessOperation() { - switch (EDeleting(SI_)) { + switch (DeletionStep_) { case EDeleting::EraseQueueRecord: { if (TablesFormat_ == 0) { - SI_ = ui32(EDeleting::RemoveTables); + DeletionStep_ = EDeleting::RemoveTables; } else { - SI_ = ui32(EDeleting::RemoveQueueVersionDirectory); + DeletionStep_ = EDeleting::RemoveQueueVersionDirectory; } break; } @@ -1445,9 +1421,9 @@ void TDeleteQueueSchemaActorV2::DoSuccessOperation() { if (Tables_.empty()) { if (Shards_.empty()) { - SI_ = ui32(Version_ ? EDeleting::RemoveQueueVersionDirectory : EDeleting::RemoveQueueDirectory); + DeletionStep_ = Version_ ? EDeleting::RemoveQueueVersionDirectory : EDeleting::RemoveQueueDirectory; } else { - SI_ = ui32(EDeleting::RemoveShards); + DeletionStep_ = EDeleting::RemoveShards; } } break; @@ -1456,15 +1432,28 @@ void TDeleteQueueSchemaActorV2::DoSuccessOperation() { Shards_.pop_back(); if (Shards_.empty()) { - SI_ = ui32(Version_ ? EDeleting::RemoveQueueVersionDirectory : EDeleting::RemoveQueueDirectory); + DeletionStep_ = Version_ ? EDeleting::RemoveQueueVersionDirectory : EDeleting::RemoveQueueDirectory; } break; } - default: { - SI_++; - if ((!Cfg().GetQuotingConfig().GetEnableQuoting() || !Cfg().GetQuotingConfig().HasKesusQuoterConfig()) && EDeleting(SI_) == EDeleting::DeleteQuoterResource) { - SI_++; + case EDeleting::RemoveQueueVersionDirectory: { + DeletionStep_ = EDeleting::RemoveQueueDirectory; + break; + } + case EDeleting::RemoveQueueDirectory: { + if (Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()) { + DeletionStep_ = EDeleting::DeleteQuoterResource; + } else { + DeletionStep_ = EDeleting::Finish; } + break; + } + case EDeleting::DeleteQuoterResource: { + DeletionStep_ = EDeleting::Finish; + break; + } + default: { + Y_VERIFY_S(false, "incorrect queue deletion step: " << DeletionStep_); // unreachable } } @@ -1474,7 +1463,7 @@ void TDeleteQueueSchemaActorV2::DoSuccessOperation() { void TDeleteQueueSchemaActorV2::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev) { const auto& record = ev->Get()->Record; if (IsGoodStatusCode(record.GetStatus())) { - if (EDeleting(SI_) == EDeleting::EraseQueueRecord) { + if (DeletionStep_ == EDeleting::EraseQueueRecord) { const TValue val(TValue::Create(record.GetExecutionEngineEvaluatedResponse())); if (!bool(val["exists"])) { Send(Sender_, @@ -1492,7 +1481,7 @@ void TDeleteQueueSchemaActorV2::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev } else { RLOG_SQS_WARN("request execution error: " << record); - if (EDeleting(SI_) == EDeleting::EraseQueueRecord) { + if (DeletionStep_ == EDeleting::EraseQueueRecord) { Send(Sender_, MakeHolder<TSqsEvents::TEvQueueDeleted>(QueuePath_, false, "Failed to erase queue record.")); PassAway(); diff --git a/ydb/core/ymq/actor/queue_schema.h b/ydb/core/ymq/actor/queue_schema.h index 773910b4779..4e4054de7df 100644 --- a/ydb/core/ymq/actor/queue_schema.h +++ b/ydb/core/ymq/actor/queue_schema.h @@ -90,18 +90,19 @@ public: return NKikimrServices::TActivity::SQS_ACTOR; } -private: - enum class ECreateComponentsStep : ui32 { +public: + enum class ECreateComponentsStep { GetTablesFormatSetting, MakeQueueDir, MakeQueueVersionDir, MakeShards, MakeTables, - DescribeTableForSetSchemeShardId, DiscoverLeaderTabletId, AddQuoterResource, + Commit }; +private: const TQueuePath QueuePath_; const TCreateQueueRequest Request_; const TActorId Sender_; @@ -188,7 +189,7 @@ private: void HandleDeleteQuoterResource(NKesus::TEvKesus::TEvDeleteQuoterResourceResult::TPtr& ev); void PassAway() override; -private: +public: enum class EDeleting : ui32 { EraseQueueRecord, RemoveTables, @@ -199,13 +200,14 @@ private: Finish, }; +private: const TQueuePath QueuePath_; const bool IsFifo_; const ui32 TablesFormat_; const TActorId Sender_; TVector<TTable> Tables_; TVector<int> Shards_; - ui32 SI_; + EDeleting DeletionStep_; const TString RequestId_; TIntrusivePtr<TUserCounters> UserCounters_; ui64 Version_ = 0; diff --git a/ydb/tests/functional/sqs/multinode/test_multinode_cluster.py b/ydb/tests/functional/sqs/multinode/test_multinode_cluster.py index 1dbe9bff745..8a431a90a48 100644 --- a/ydb/tests/functional/sqs/multinode/test_multinode_cluster.py +++ b/ydb/tests/functional/sqs/multinode/test_multinode_cluster.py @@ -143,9 +143,7 @@ class TestSqsMultinodeCluster(KikimrSqsTestBase): check_master_node_counters(new_node_index) @pytest.mark.parametrize(**STOP_NODE_PARAMS) - @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) - def test_reassign_master(self, stop_node, tables_format): - self._init_with_params(tables_format=tables_format) + def test_reassign_master(self, stop_node): self._create_queue_and_assert(self.queue_name) node_index = self._get_queue_master_node_index() proxy_node_index = self._other_node(node_index) |