aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-09-07 12:17:46 +0300
committersnaury <snaury@ydb.tech>2023-09-07 13:25:18 +0300
commitb0c000d48fba6fcbd7f69bcbe94290623a172fa7 (patch)
tree6719fd510c0d982400b36cfac3669d393c428068
parentb5c950c57decce49922c1bfd0855965baa42b564 (diff)
downloadydb-b0c000d48fba6fcbd7f69bcbe94290623a172fa7.tar.gz
Notify all overload subscribers on split and drop KIKIMR-19021
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h1
-rw-r--r--ydb/core/tx/datashard/datashard_loans.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_overload.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard_split_src.cpp1
-rw-r--r--ydb/core/tx/datashard/drop_table_unit.cpp1
5 files changed, 28 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 2ea68071a8..a1647a03af 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1675,6 +1675,7 @@ public:
bool HasPipeServer(const TActorId& pipeServerId);
bool AddOverloadSubscriber(const TActorId& pipeServerId, const TActorId& actorId, ui64 seqNo, ERejectReasons reasons);
void NotifyOverloadSubscribers(ERejectReason reason);
+ void NotifyAllOverloadSubscribers();
bool HasSharedBlobs() const;
void CheckInitiateBorrowedPartsReturn(const TActorContext& ctx);
diff --git a/ydb/core/tx/datashard/datashard_loans.cpp b/ydb/core/tx/datashard/datashard_loans.cpp
index adbfc4769d..6a78ac0ce2 100644
--- a/ydb/core/tx/datashard/datashard_loans.cpp
+++ b/ydb/core/tx/datashard/datashard_loans.cpp
@@ -181,6 +181,7 @@ public:
Self->State = TShardState::Offline;
Self->PersistSys(db, TDataShard::Schema::Sys_State, Self->State);
+ Self->NotifyAllOverloadSubscribers();
return true;
}
diff --git a/ydb/core/tx/datashard/datashard_overload.cpp b/ydb/core/tx/datashard/datashard_overload.cpp
index 4724a93a95..0256dcad01 100644
--- a/ydb/core/tx/datashard/datashard_overload.cpp
+++ b/ydb/core/tx/datashard/datashard_overload.cpp
@@ -73,6 +73,30 @@ void TDataShard::NotifyOverloadSubscribers(ERejectReason reason) {
PipeServersWithOverloadSubscribers.Append(left);
}
+void TDataShard::NotifyAllOverloadSubscribers() {
+ bool clearedSubscribers = false;
+ while (!PipeServersWithOverloadSubscribers.Empty()) {
+ TPipeServerInfo* pipeServer = PipeServersWithOverloadSubscribers.PopFront();
+ for (auto it = pipeServer->OverloadSubscribers.begin(); it != pipeServer->OverloadSubscribers.end(); ++it) {
+ const TActorId& actorId = it->first;
+ TOverloadSubscriber& entry = it->second;
+ SendViaSession(
+ pipeServer->InterconnectSession,
+ actorId,
+ SelfId(),
+ new TEvDataShard::TEvOverloadReady(TabletID(), entry.SeqNo));
+ }
+ pipeServer->OverloadSubscribers.clear();
+ clearedSubscribers = true;
+ }
+
+ if (clearedSubscribers) {
+ for (int i = 0; i < RejectReasonCount; ++i) {
+ OverloadSubscribersByReason[i] = 0;
+ }
+ }
+}
+
void TDataShard::DiscardOverloadSubscribers(TPipeServerInfo& pipeServer) {
for (auto it = pipeServer.OverloadSubscribers.begin(); it != pipeServer.OverloadSubscribers.end(); ++it) {
TOverloadSubscriber& entry = it->second;
diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp
index 8fbcf1af96..427fff74db 100644
--- a/ydb/core/tx/datashard/datashard_split_src.cpp
+++ b/ydb/core/tx/datashard/datashard_split_src.cpp
@@ -45,6 +45,7 @@ public:
Self->State = TShardState::SplitSrcWaitForNoTxInFlight;
Self->PersistSys(db, Schema::Sys_State, Self->State);
+ Self->NotifyAllOverloadSubscribers();
// Wake up immediate ops, so they abort as soon as possible
for (const auto& kv : Self->Pipeline.GetImmediateOps()) {
diff --git a/ydb/core/tx/datashard/drop_table_unit.cpp b/ydb/core/tx/datashard/drop_table_unit.cpp
index 8d8cfb960d..8cc11a0ff9 100644
--- a/ydb/core/tx/datashard/drop_table_unit.cpp
+++ b/ydb/core/tx/datashard/drop_table_unit.cpp
@@ -91,6 +91,7 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op,
txc.DB.NoMoreReadsForTx();
DataShard.SetPersistState(TShardState::PreOffline, txc);
+ DataShard.NotifyAllOverloadSubscribers();
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());