aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2022-10-18 15:27:03 +0300
committeralexbogo <alexbogo@ydb.tech>2022-10-18 15:27:03 +0300
commitd752b0964621cf9ad026a32d168deaebfbeeaef0 (patch)
tree994d01b63dc429f3294867b1685ee1fe9133fdd4
parent28a69e75259ba949f9d8cc24bc30c738272635ce (diff)
downloadydb-d752b0964621cf9ad026a32d168deaebfbeeaef0.tar.gz
[ymq] dont fill fixed leader tablet id for common tables format (v1)
init
-rw-r--r--ydb/core/ymq/actor/CMakeLists.txt5
-rw-r--r--ydb/core/ymq/actor/queue_schema.cpp111
-rw-r--r--ydb/core/ymq/actor/queue_schema.h12
-rw-r--r--ydb/tests/functional/sqs/multinode/test_multinode_cluster.py4
4 files changed, 63 insertions, 69 deletions
diff --git a/ydb/core/ymq/actor/CMakeLists.txt b/ydb/core/ymq/actor/CMakeLists.txt
index 5bf19c4d5b8..ae365bfa4fe 100644
--- a/ydb/core/ymq/actor/CMakeLists.txt
+++ b/ydb/core/ymq/actor/CMakeLists.txt
@@ -120,3 +120,8 @@ generate_enum_serilization(core-ymq-actor
INCLUDE_HEADERS
ydb/core/ymq/actor/fifo_cleanup.h
)
+generate_enum_serilization(core-ymq-actor
+ ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/queue_schema.h
+ INCLUDE_HEADERS
+ ydb/core/ymq/actor/queue_schema.h
+)
diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp
index 616843185e3..92621a4f27b 100644
--- a/ydb/core/ymq/actor/queue_schema.cpp
+++ b/ydb/core/ymq/actor/queue_schema.cpp
@@ -510,10 +510,6 @@ void TCreateQueueSchemaActorV2::CreateComponents() {
break;
}
- case ECreateComponentsStep::DescribeTableForSetSchemeShardId: {
- SendDescribeTable();
- break;
- }
case ECreateComponentsStep::DiscoverLeaderTabletId: {
RequestLeaderTabletId();
break;
@@ -522,6 +518,10 @@ void TCreateQueueSchemaActorV2::CreateComponents() {
AddRPSQuota();
break;
}
+ case ECreateComponentsStep::Commit: {
+ CommitNewVersion();
+ break;
+ }
}
}
@@ -529,7 +529,6 @@ STATEFN(TCreateQueueSchemaActorV2::CreateComponentsState) {
switch (ev->GetTypeRewrite()) {
hFunc(TSqsEvents::TEvExecuted, OnExecuted);
hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, OnDescribeSchemeResult);
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTableDescription);
hFunc(NKesus::TEvKesus::TEvAddQuoterResourceResult, HandleAddQuoterResource);
cFunc(TEvPoisonPill::EventType, PassAway);
}
@@ -554,7 +553,11 @@ void TCreateQueueSchemaActorV2::Step() {
CurrentCreationStep_ = ECreateComponentsStep::MakeShards;
}
} else {
- CurrentCreationStep_ = ECreateComponentsStep::DescribeTableForSetSchemeShardId;
+ if (Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()) {
+ CurrentCreationStep_ = ECreateComponentsStep::AddQuoterResource;
+ } else {
+ CurrentCreationStep_ = ECreateComponentsStep::Commit;
+ }
}
break;
}
@@ -576,19 +579,20 @@ void TCreateQueueSchemaActorV2::Step() {
CurrentCreationStep_ = ECreateComponentsStep::DiscoverLeaderTabletId;
break;
}
- case ECreateComponentsStep::DescribeTableForSetSchemeShardId: {
- Y_VERIFY(TablesFormat_ == 1);
- Y_VERIFY(TableWithLeaderPathId_.first && TableWithLeaderPathId_.second);
- CurrentCreationStep_ = ECreateComponentsStep::DiscoverLeaderTabletId;
- break;
- }
case ECreateComponentsStep::DiscoverLeaderTabletId: {
- Y_VERIFY(Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig());
- CurrentCreationStep_ = ECreateComponentsStep::AddQuoterResource;
+ if (Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()) {
+ CurrentCreationStep_ = ECreateComponentsStep::AddQuoterResource;
+ } else {
+ CurrentCreationStep_ = ECreateComponentsStep::Commit;
+ }
break;
}
case ECreateComponentsStep::AddQuoterResource: {
- Y_VERIFY(false); // unreachable
+ CurrentCreationStep_ = ECreateComponentsStep::Commit;
+ break;
+ }
+ default: {
+ Y_VERIFY_S(false, "incorrect queue creation step: " << CurrentCreationStep_); // unreachable
break;
}
}
@@ -657,7 +661,7 @@ void TCreateQueueSchemaActorV2::OnDescribeSchemeResult(NSchemeShard::TEvSchemeSh
const auto& pathDescription = ev->Get()->GetRecord().GetPathDescription();
if (ev->Get()->GetRecord().GetStatus() != NKikimrScheme::StatusSuccess || pathDescription.TablePartitionsSize() == 0 || !pathDescription.GetTablePartitions(0).GetDatashardId()) {
- // fail
+ RLOG_SQS_ERROR("Failed to discover leader: " << ev->Get()->GetRecord());
auto resp = MakeErrorResponse(NErrors::INTERNAL_FAILURE);
resp->State = EQueueState::Creating;
resp->Error = "Failed to discover leader.";
@@ -670,34 +674,6 @@ void TCreateQueueSchemaActorV2::OnDescribeSchemeResult(NSchemeShard::TEvSchemeSh
LeaderTabletId_ = pathDescription.GetTablePartitions(0).GetDatashardId();
- if (Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()) {
- Step();
- } else {
- CommitNewVersion();
- }
-}
-
-void TCreateQueueSchemaActorV2::SendDescribeTable() {
- auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
-
- NSchemeCache::TSchemeCacheNavigate::TEntry entry;
- entry.Path = NKikimr::SplitPath(Cfg().GetRoot() + "/.Queues");
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
- navigateRequest->ResultSet.emplace_back(entry);
-
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release()));
-}
-
-void TCreateQueueSchemaActorV2::HandleTableDescription(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
- Y_VERIFY(result->ResultSet.size() == 1);
- const auto& response = result->ResultSet.front();
-
- TableWithLeaderPathId_ = std::make_pair(
- response.TableId.PathId.OwnerId,
- response.TableId.PathId.LocalPathId
- );
-
Step();
}
@@ -714,7 +690,7 @@ void TCreateQueueSchemaActorV2::HandleAddQuoterResource(NKesus::TEvKesus::TEvAdd
auto status = ev->Get()->Record.GetError().GetStatus();
if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::ALREADY_EXISTS) {
RLOG_SQS_DEBUG("Successfully added quoter resource. Id: " << ev->Get()->Record.GetResourceId());
- CommitNewVersion();
+ Step();
} else {
RLOG_SQS_WARN("Failed to add quoter resource: " << ev->Get()->Record);
auto resp = MakeErrorResponse(status == Ydb::StatusIds::BAD_REQUEST ? NErrors::VALIDATION_ERROR : NErrors::INTERNAL_FAILURE);
@@ -985,7 +961,7 @@ void TCreateQueueSchemaActorV2::CommitNewVersion() {
auto ev = MakeExecuteEvent(query);
auto* trans = ev->Record.MutableTransaction()->MutableMiniKQLTransaction();
- Y_VERIFY(LeaderTabletId_ != 0);
+ Y_VERIFY(TablesFormat_ == 1 || LeaderTabletId_ != 0);
TParameters(trans->MutableParams()->MutableProto())
.Utf8("NAME", QueuePath_.QueueName)
.Utf8("CUSTOMNAME", CustomQueueName_)
@@ -1174,7 +1150,7 @@ TDeleteQueueSchemaActorV2::TDeleteQueueSchemaActorV2(const TQueuePath& path,
, IsFifo_(isFifo)
, TablesFormat_(tablesFormat)
, Sender_(sender)
- , SI_(static_cast<ui32>(EDeleting::EraseQueueRecord))
+ , DeletionStep_(EDeleting::EraseQueueRecord)
, RequestId_(requestId)
, UserCounters_(std::move(userCounters))
{
@@ -1193,7 +1169,7 @@ TDeleteQueueSchemaActorV2::TDeleteQueueSchemaActorV2(const TQueuePath& path,
, IsFifo_(isFifo)
, TablesFormat_(tablesFormat)
, Sender_(sender)
- , SI_(static_cast<ui32>(tablesFormat == 0 ? EDeleting::RemoveTables : EDeleting::RemoveQueueVersionDirectory))
+ , DeletionStep_(tablesFormat == 0 ? EDeleting::RemoveTables : EDeleting::RemoveQueueVersionDirectory)
, RequestId_(requestId)
, UserCounters_(std::move(userCounters))
{
@@ -1365,7 +1341,7 @@ static const char* EraseQueueRecordQuery = R"__(
)__";
void TDeleteQueueSchemaActorV2::NextAction() {
- switch (EDeleting(SI_)) {
+ switch (DeletionStep_) {
case EDeleting::EraseQueueRecord: {
TString queueStateDir = QueuePath_.GetVersionedQueuePath();
if (TablesFormat_ == 1) {
@@ -1431,12 +1407,12 @@ void TDeleteQueueSchemaActorV2::NextAction() {
}
void TDeleteQueueSchemaActorV2::DoSuccessOperation() {
- switch (EDeleting(SI_)) {
+ switch (DeletionStep_) {
case EDeleting::EraseQueueRecord: {
if (TablesFormat_ == 0) {
- SI_ = ui32(EDeleting::RemoveTables);
+ DeletionStep_ = EDeleting::RemoveTables;
} else {
- SI_ = ui32(EDeleting::RemoveQueueVersionDirectory);
+ DeletionStep_ = EDeleting::RemoveQueueVersionDirectory;
}
break;
}
@@ -1445,9 +1421,9 @@ void TDeleteQueueSchemaActorV2::DoSuccessOperation() {
if (Tables_.empty()) {
if (Shards_.empty()) {
- SI_ = ui32(Version_ ? EDeleting::RemoveQueueVersionDirectory : EDeleting::RemoveQueueDirectory);
+ DeletionStep_ = Version_ ? EDeleting::RemoveQueueVersionDirectory : EDeleting::RemoveQueueDirectory;
} else {
- SI_ = ui32(EDeleting::RemoveShards);
+ DeletionStep_ = EDeleting::RemoveShards;
}
}
break;
@@ -1456,15 +1432,28 @@ void TDeleteQueueSchemaActorV2::DoSuccessOperation() {
Shards_.pop_back();
if (Shards_.empty()) {
- SI_ = ui32(Version_ ? EDeleting::RemoveQueueVersionDirectory : EDeleting::RemoveQueueDirectory);
+ DeletionStep_ = Version_ ? EDeleting::RemoveQueueVersionDirectory : EDeleting::RemoveQueueDirectory;
}
break;
}
- default: {
- SI_++;
- if ((!Cfg().GetQuotingConfig().GetEnableQuoting() || !Cfg().GetQuotingConfig().HasKesusQuoterConfig()) && EDeleting(SI_) == EDeleting::DeleteQuoterResource) {
- SI_++;
+ case EDeleting::RemoveQueueVersionDirectory: {
+ DeletionStep_ = EDeleting::RemoveQueueDirectory;
+ break;
+ }
+ case EDeleting::RemoveQueueDirectory: {
+ if (Cfg().GetQuotingConfig().GetEnableQuoting() && Cfg().GetQuotingConfig().HasKesusQuoterConfig()) {
+ DeletionStep_ = EDeleting::DeleteQuoterResource;
+ } else {
+ DeletionStep_ = EDeleting::Finish;
}
+ break;
+ }
+ case EDeleting::DeleteQuoterResource: {
+ DeletionStep_ = EDeleting::Finish;
+ break;
+ }
+ default: {
+ Y_VERIFY_S(false, "incorrect queue deletion step: " << DeletionStep_); // unreachable
}
}
@@ -1474,7 +1463,7 @@ void TDeleteQueueSchemaActorV2::DoSuccessOperation() {
void TDeleteQueueSchemaActorV2::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev) {
const auto& record = ev->Get()->Record;
if (IsGoodStatusCode(record.GetStatus())) {
- if (EDeleting(SI_) == EDeleting::EraseQueueRecord) {
+ if (DeletionStep_ == EDeleting::EraseQueueRecord) {
const TValue val(TValue::Create(record.GetExecutionEngineEvaluatedResponse()));
if (!bool(val["exists"])) {
Send(Sender_,
@@ -1492,7 +1481,7 @@ void TDeleteQueueSchemaActorV2::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev
} else {
RLOG_SQS_WARN("request execution error: " << record);
- if (EDeleting(SI_) == EDeleting::EraseQueueRecord) {
+ if (DeletionStep_ == EDeleting::EraseQueueRecord) {
Send(Sender_,
MakeHolder<TSqsEvents::TEvQueueDeleted>(QueuePath_, false, "Failed to erase queue record."));
PassAway();
diff --git a/ydb/core/ymq/actor/queue_schema.h b/ydb/core/ymq/actor/queue_schema.h
index 773910b4779..4e4054de7df 100644
--- a/ydb/core/ymq/actor/queue_schema.h
+++ b/ydb/core/ymq/actor/queue_schema.h
@@ -90,18 +90,19 @@ public:
return NKikimrServices::TActivity::SQS_ACTOR;
}
-private:
- enum class ECreateComponentsStep : ui32 {
+public:
+ enum class ECreateComponentsStep {
GetTablesFormatSetting,
MakeQueueDir,
MakeQueueVersionDir,
MakeShards,
MakeTables,
- DescribeTableForSetSchemeShardId,
DiscoverLeaderTabletId,
AddQuoterResource,
+ Commit
};
+private:
const TQueuePath QueuePath_;
const TCreateQueueRequest Request_;
const TActorId Sender_;
@@ -188,7 +189,7 @@ private:
void HandleDeleteQuoterResource(NKesus::TEvKesus::TEvDeleteQuoterResourceResult::TPtr& ev);
void PassAway() override;
-private:
+public:
enum class EDeleting : ui32 {
EraseQueueRecord,
RemoveTables,
@@ -199,13 +200,14 @@ private:
Finish,
};
+private:
const TQueuePath QueuePath_;
const bool IsFifo_;
const ui32 TablesFormat_;
const TActorId Sender_;
TVector<TTable> Tables_;
TVector<int> Shards_;
- ui32 SI_;
+ EDeleting DeletionStep_;
const TString RequestId_;
TIntrusivePtr<TUserCounters> UserCounters_;
ui64 Version_ = 0;
diff --git a/ydb/tests/functional/sqs/multinode/test_multinode_cluster.py b/ydb/tests/functional/sqs/multinode/test_multinode_cluster.py
index 1dbe9bff745..8a431a90a48 100644
--- a/ydb/tests/functional/sqs/multinode/test_multinode_cluster.py
+++ b/ydb/tests/functional/sqs/multinode/test_multinode_cluster.py
@@ -143,9 +143,7 @@ class TestSqsMultinodeCluster(KikimrSqsTestBase):
check_master_node_counters(new_node_index)
@pytest.mark.parametrize(**STOP_NODE_PARAMS)
- @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
- def test_reassign_master(self, stop_node, tables_format):
- self._init_with_params(tables_format=tables_format)
+ def test_reassign_master(self, stop_node):
self._create_queue_and_assert(self.queue_name)
node_index = self._get_queue_master_node_index()
proxy_node_index = self._other_node(node_index)