aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-09-17 19:26:03 +0300
committerGitHub <noreply@github.com>2024-09-17 19:26:03 +0300
commit0361036deeef748018284573baa33a364d714de5 (patch)
treec021981c642afc88fb5e39df434c5c1d855e502e
parent90e319847e2094094259c4e78cf423fb1857e87e (diff)
downloadydb-0361036deeef748018284573baa33a364d714de5.tar.gz
Limit inflight cross-database scheme requests in the replication controller (#9334)
-rw-r--r--ydb/core/protos/replication.proto6
-rw-r--r--ydb/core/tx/replication/controller/controller.cpp43
-rw-r--r--ydb/core/tx/replication/controller/controller_impl.h12
-rw-r--r--ydb/core/tx/replication/controller/private_events.h16
-rw-r--r--ydb/core/tx/replication/controller/stream_creator.cpp24
-rw-r--r--ydb/core/tx/replication/controller/stream_remover.cpp29
6 files changed, 127 insertions, 3 deletions
diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto
index 3764b0511c5..41baff85764 100644
--- a/ydb/core/protos/replication.proto
+++ b/ydb/core/protos/replication.proto
@@ -6,7 +6,13 @@ package NKikimrReplication;
option java_package = "ru.yandex.kikimr.proto";
message TReplicationDefaults {
+ message TSchemeOperationLimits {
+ optional uint32 InflightCreateStreamLimit = 1 [default = 1];
+ optional uint32 InflightDropStreamLimit = 2 [default = 1];
+ }
+
optional int32 RetentionPeriodSeconds = 1 [default = 86400]; // 1d
+ optional TSchemeOperationLimits SchemeOperationLimits = 2;
}
message TStaticCredentials {
diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp
index 4f73d84d68b..46e8c3dfbb0 100644
--- a/ydb/core/tx/replication/controller/controller.cpp
+++ b/ydb/core/tx/replication/controller/controller.cpp
@@ -1,6 +1,7 @@
#include "controller.h"
#include "controller_impl.h"
+#include <ydb/core/base/appdata.h>
#include <ydb/core/discovery/discovery.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
@@ -60,6 +61,8 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvProcessQueues, Handle);
HFunc(TEvPrivate::TEvRemoveWorker, Handle);
HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle);
+ HFunc(TEvPrivate::TEvRequestCreateStream, Handle);
+ HFunc(TEvPrivate::TEvRequestDropStream, Handle);
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
HFunc(TEvDiscovery::TEvError, Handle);
HFunc(TEvService::TEvStatus, Handle);
@@ -148,13 +151,53 @@ void TController::Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActor
RunTxAssignStreamName(ev, ctx);
}
+template <typename TEvent>
+void ProcessLimiterQueue(TDeque<TActorId>& requested, THashSet<TActorId>& inflight, ui32 limit, const TActorContext& ctx) {
+ while (!requested.empty() && inflight.size() < limit) {
+ const auto& actorId = requested.front();
+ ctx.Send(actorId, new TEvent());
+ inflight.insert(actorId);
+ requested.pop_front();
+ }
+}
+
+void TController::ProcessCreateStreamQueue(const TActorContext& ctx) {
+ const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits();
+ ProcessLimiterQueue<TEvPrivate::TEvAllowCreateStream>(RequestedCreateStream, InflightCreateStream, limits.GetInflightCreateStreamLimit(), ctx);
+}
+
+void TController::ProcessDropStreamQueue(const TActorContext& ctx) {
+ const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits();
+ ProcessLimiterQueue<TEvPrivate::TEvAllowDropStream>(RequestedDropStream, InflightDropStream, limits.GetInflightDropStreamLimit(), ctx);
+}
+
+void TController::Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ RequestedCreateStream.push_back(ev->Sender);
+ ProcessCreateStreamQueue(ctx);
+}
+
void TController::Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ InflightCreateStream.erase(ev->Sender);
+ ProcessCreateStreamQueue(ctx);
RunTxCreateStreamResult(ev, ctx);
}
+void TController::Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ RequestedDropStream.push_back(ev->Sender);
+ ProcessDropStreamQueue(ctx);
+}
+
void TController::Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ InflightDropStream.erase(ev->Sender);
+ ProcessDropStreamQueue(ctx);
RunTxDropStreamResult(ev, ctx);
}
diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h
index 927d2d5bf52..b7a365501a4 100644
--- a/ydb/core/tx/replication/controller/controller_impl.h
+++ b/ydb/core/tx/replication/controller/controller_impl.h
@@ -17,6 +17,7 @@
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
+#include <util/generic/deque.h>
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
@@ -83,6 +84,8 @@ private:
void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
@@ -103,6 +106,8 @@ private:
void RemoveWorker(const TWorkerId& id, const TActorContext& ctx);
bool MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx);
void UpdateLag(const TWorkerId& id, TDuration lag);
+ void ProcessCreateStreamQueue(const TActorContext& ctx);
+ void ProcessDropStreamQueue(const TActorContext& ctx);
// local transactions
class TTxInitSchema;
@@ -178,6 +183,13 @@ private:
bool ProcessQueuesScheduled = false;
static constexpr ui32 ProcessBatchLimit = 100;
+ // create stream limiter
+ TDeque<TActorId> RequestedCreateStream;
+ THashSet<TActorId> InflightCreateStream;
+ // drop stream limiter
+ TDeque<TActorId> RequestedDropStream;
+ THashSet<TActorId> InflightDropStream;
+
}; // TController
}
diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h
index f69f08ae3f6..ebf083f97b0 100644
--- a/ydb/core/tx/replication/controller/private_events.h
+++ b/ydb/core/tx/replication/controller/private_events.h
@@ -32,6 +32,10 @@ struct TEvPrivate {
EvAlterDstResult,
EvRemoveWorker,
EvDescribeTargetsResult,
+ EvRequestCreateStream,
+ EvAllowCreateStream,
+ EvRequestDropStream,
+ EvAllowDropStream,
EvEnd,
};
@@ -221,6 +225,18 @@ struct TEvPrivate {
TString ToString() const override;
};
+ struct TEvRequestCreateStream: public TEventLocal<TEvRequestCreateStream, EvRequestCreateStream> {
+ };
+
+ struct TEvAllowCreateStream: public TEventLocal<TEvAllowCreateStream, EvAllowCreateStream> {
+ };
+
+ struct TEvRequestDropStream: public TEventLocal<TEvRequestDropStream, EvRequestDropStream> {
+ };
+
+ struct TEvAllowDropStream: public TEventLocal<TEvAllowDropStream, EvAllowDropStream> {
+ };
+
}; // TEvPrivate
}
diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp
index 7603154d544..cd8cb06d8e7 100644
--- a/ydb/core/tx/replication/controller/stream_creator.cpp
+++ b/ydb/core/tx/replication/controller/stream_creator.cpp
@@ -28,6 +28,24 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
.AddAttribute("__async_replication", NJson::WriteJson(attrs, false));
}
+ void RequestPermission() {
+ Send(Parent, new TEvPrivate::TEvRequestCreateStream());
+ Become(&TThis::StateRequestPermission);
+ }
+
+ STATEFN(StateRequestPermission) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPrivate::TEvAllowCreateStream, Handle);
+ default:
+ return StateBase(ev);
+ }
+ }
+
+ void Handle(TEvPrivate::TEvAllowCreateStream::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+ CreateStream();
+ }
+
void CreateStream() {
switch (Kind) {
case TReplication::ETargetKind::Table:
@@ -103,6 +121,10 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
LOG_T("Handle " << ev->Get()->ToString());
auto& result = ev->Get()->Result;
+ if (result.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
+ return Reply(NYdb::TStatus(NYdb::EStatus::SUCCESS, NYql::TIssues()));
+ }
+
if (!result.IsSuccess()) {
if (IsRetryableError(result)) {
LOG_D("Retry CreateConsumer");
@@ -155,7 +177,7 @@ public:
}
void Bootstrap() {
- CreateStream();
+ RequestPermission();
}
STATEFN(StateBase) {
diff --git a/ydb/core/tx/replication/controller/stream_remover.cpp b/ydb/core/tx/replication/controller/stream_remover.cpp
index b1acdb46b04..4f75dc27a69 100644
--- a/ydb/core/tx/replication/controller/stream_remover.cpp
+++ b/ydb/core/tx/replication/controller/stream_remover.cpp
@@ -10,6 +10,24 @@
namespace NKikimr::NReplication::NController {
class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
+ void RequestPermission() {
+ Send(Parent, new TEvPrivate::TEvRequestDropStream());
+ Become(&TThis::StateRequestPermission);
+ }
+
+ STATEFN(StateRequestPermission) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPrivate::TEvAllowDropStream, Handle);
+ default:
+ return StateBase(ev);
+ }
+ }
+
+ void Handle(TEvPrivate::TEvAllowDropStream::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+ DropStream();
+ }
+
void DropStream() {
switch (Kind) {
case TReplication::ETargetKind::Table:
@@ -26,7 +44,8 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
switch (ev->GetTypeRewrite()) {
hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle);
sFunc(TEvents::TEvWakeup, DropStream);
- sFunc(TEvents::TEvPoison, PassAway);
+ default:
+ return StateBase(ev);
}
}
@@ -77,7 +96,13 @@ public:
}
void Bootstrap() {
- DropStream();
+ RequestPermission();
+ }
+
+ STATEFN(StateBase) {
+ switch (ev->GetTypeRewrite()) {
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
}
private: