diff options
author | snaury <snaury@ydb.tech> | 2023-09-07 12:17:46 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-09-07 13:25:18 +0300 |
commit | b0c000d48fba6fcbd7f69bcbe94290623a172fa7 (patch) | |
tree | 6719fd510c0d982400b36cfac3669d393c428068 | |
parent | b5c950c57decce49922c1bfd0855965baa42b564 (diff) | |
download | ydb-b0c000d48fba6fcbd7f69bcbe94290623a172fa7.tar.gz |
Notify all overload subscribers on split and drop KIKIMR-19021
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_loans.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_overload.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_split_src.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/drop_table_unit.cpp | 1 |
5 files changed, 28 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2ea68071a8..a1647a03af 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1675,6 +1675,7 @@ public: bool HasPipeServer(const TActorId& pipeServerId); bool AddOverloadSubscriber(const TActorId& pipeServerId, const TActorId& actorId, ui64 seqNo, ERejectReasons reasons); void NotifyOverloadSubscribers(ERejectReason reason); + void NotifyAllOverloadSubscribers(); bool HasSharedBlobs() const; void CheckInitiateBorrowedPartsReturn(const TActorContext& ctx); diff --git a/ydb/core/tx/datashard/datashard_loans.cpp b/ydb/core/tx/datashard/datashard_loans.cpp index adbfc4769d..6a78ac0ce2 100644 --- a/ydb/core/tx/datashard/datashard_loans.cpp +++ b/ydb/core/tx/datashard/datashard_loans.cpp @@ -181,6 +181,7 @@ public: Self->State = TShardState::Offline; Self->PersistSys(db, TDataShard::Schema::Sys_State, Self->State); + Self->NotifyAllOverloadSubscribers(); return true; } diff --git a/ydb/core/tx/datashard/datashard_overload.cpp b/ydb/core/tx/datashard/datashard_overload.cpp index 4724a93a95..0256dcad01 100644 --- a/ydb/core/tx/datashard/datashard_overload.cpp +++ b/ydb/core/tx/datashard/datashard_overload.cpp @@ -73,6 +73,30 @@ void TDataShard::NotifyOverloadSubscribers(ERejectReason reason) { PipeServersWithOverloadSubscribers.Append(left); } +void TDataShard::NotifyAllOverloadSubscribers() { + bool clearedSubscribers = false; + while (!PipeServersWithOverloadSubscribers.Empty()) { + TPipeServerInfo* pipeServer = PipeServersWithOverloadSubscribers.PopFront(); + for (auto it = pipeServer->OverloadSubscribers.begin(); it != pipeServer->OverloadSubscribers.end(); ++it) { + const TActorId& actorId = it->first; + TOverloadSubscriber& entry = it->second; + SendViaSession( + pipeServer->InterconnectSession, + actorId, + SelfId(), + new TEvDataShard::TEvOverloadReady(TabletID(), entry.SeqNo)); + } + pipeServer->OverloadSubscribers.clear(); + clearedSubscribers = true; + } + + if (clearedSubscribers) { + for (int i = 0; i < RejectReasonCount; ++i) { + OverloadSubscribersByReason[i] = 0; + } + } +} + void TDataShard::DiscardOverloadSubscribers(TPipeServerInfo& pipeServer) { for (auto it = pipeServer.OverloadSubscribers.begin(); it != pipeServer.OverloadSubscribers.end(); ++it) { TOverloadSubscriber& entry = it->second; diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index 8fbcf1af96..427fff74db 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -45,6 +45,7 @@ public: Self->State = TShardState::SplitSrcWaitForNoTxInFlight; Self->PersistSys(db, Schema::Sys_State, Self->State); + Self->NotifyAllOverloadSubscribers(); // Wake up immediate ops, so they abort as soon as possible for (const auto& kv : Self->Pipeline.GetImmediateOps()) { diff --git a/ydb/core/tx/datashard/drop_table_unit.cpp b/ydb/core/tx/datashard/drop_table_unit.cpp index 8d8cfb960d..8cc11a0ff9 100644 --- a/ydb/core/tx/datashard/drop_table_unit.cpp +++ b/ydb/core/tx/datashard/drop_table_unit.cpp @@ -91,6 +91,7 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op, txc.DB.NoMoreReadsForTx(); DataShard.SetPersistState(TShardState::PreOffline, txc); + DataShard.NotifyAllOverloadSubscribers(); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); |