diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-09-17 19:26:03 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-17 19:26:03 +0300 |
commit | 0361036deeef748018284573baa33a364d714de5 (patch) | |
tree | c021981c642afc88fb5e39df434c5c1d855e502e | |
parent | 90e319847e2094094259c4e78cf423fb1857e87e (diff) | |
download | ydb-0361036deeef748018284573baa33a364d714de5.tar.gz |
Limit inflight cross-database scheme requests in the replication controller (#9334)
-rw-r--r-- | ydb/core/protos/replication.proto | 6 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/controller.cpp | 43 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/controller_impl.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/private_events.h | 16 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/stream_creator.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/stream_remover.cpp | 29 |
6 files changed, 127 insertions, 3 deletions
diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 3764b0511c5..41baff85764 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -6,7 +6,13 @@ package NKikimrReplication; option java_package = "ru.yandex.kikimr.proto"; message TReplicationDefaults { + message TSchemeOperationLimits { + optional uint32 InflightCreateStreamLimit = 1 [default = 1]; + optional uint32 InflightDropStreamLimit = 2 [default = 1]; + } + optional int32 RetentionPeriodSeconds = 1 [default = 86400]; // 1d + optional TSchemeOperationLimits SchemeOperationLimits = 2; } message TStaticCredentials { diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 4f73d84d68b..46e8c3dfbb0 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -1,6 +1,7 @@ #include "controller.h" #include "controller_impl.h" +#include <ydb/core/base/appdata.h> #include <ydb/core/discovery/discovery.h> #include <ydb/core/engine/minikql/flat_local_tx_factory.h> @@ -60,6 +61,8 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvProcessQueues, Handle); HFunc(TEvPrivate::TEvRemoveWorker, Handle); HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle); + HFunc(TEvPrivate::TEvRequestCreateStream, Handle); + HFunc(TEvPrivate::TEvRequestDropStream, Handle); HFunc(TEvDiscovery::TEvDiscoveryData, Handle); HFunc(TEvDiscovery::TEvError, Handle); HFunc(TEvService::TEvStatus, Handle); @@ -148,13 +151,53 @@ void TController::Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActor RunTxAssignStreamName(ev, ctx); } +template <typename TEvent> +void ProcessLimiterQueue(TDeque<TActorId>& requested, THashSet<TActorId>& inflight, ui32 limit, const TActorContext& ctx) { + while (!requested.empty() && inflight.size() < limit) { + const auto& actorId = requested.front(); + ctx.Send(actorId, new TEvent()); + inflight.insert(actorId); + requested.pop_front(); + } +} + +void TController::ProcessCreateStreamQueue(const TActorContext& ctx) { + const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits(); + ProcessLimiterQueue<TEvPrivate::TEvAllowCreateStream>(RequestedCreateStream, InflightCreateStream, limits.GetInflightCreateStreamLimit(), ctx); +} + +void TController::ProcessDropStreamQueue(const TActorContext& ctx) { + const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits(); + ProcessLimiterQueue<TEvPrivate::TEvAllowDropStream>(RequestedDropStream, InflightDropStream, limits.GetInflightDropStreamLimit(), ctx); +} + +void TController::Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + RequestedCreateStream.push_back(ev->Sender); + ProcessCreateStreamQueue(ctx); +} + void TController::Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + InflightCreateStream.erase(ev->Sender); + ProcessCreateStreamQueue(ctx); RunTxCreateStreamResult(ev, ctx); } +void TController::Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + RequestedDropStream.push_back(ev->Sender); + ProcessDropStreamQueue(ctx); +} + void TController::Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + InflightDropStream.erase(ev->Sender); + ProcessDropStreamQueue(ctx); RunTxDropStreamResult(ev, ctx); } diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 927d2d5bf52..b7a365501a4 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -17,6 +17,7 @@ #include <ydb/library/actors/core/interconnect.h> #include <ydb/library/yverify_stream/yverify_stream.h> +#include <util/generic/deque.h> #include <util/generic/hash.h> #include <util/generic/hash_set.h> @@ -83,6 +84,8 @@ private: void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx); @@ -103,6 +106,8 @@ private: void RemoveWorker(const TWorkerId& id, const TActorContext& ctx); bool MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx); void UpdateLag(const TWorkerId& id, TDuration lag); + void ProcessCreateStreamQueue(const TActorContext& ctx); + void ProcessDropStreamQueue(const TActorContext& ctx); // local transactions class TTxInitSchema; @@ -178,6 +183,13 @@ private: bool ProcessQueuesScheduled = false; static constexpr ui32 ProcessBatchLimit = 100; + // create stream limiter + TDeque<TActorId> RequestedCreateStream; + THashSet<TActorId> InflightCreateStream; + // drop stream limiter + TDeque<TActorId> RequestedDropStream; + THashSet<TActorId> InflightDropStream; + }; // TController } diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h index f69f08ae3f6..ebf083f97b0 100644 --- a/ydb/core/tx/replication/controller/private_events.h +++ b/ydb/core/tx/replication/controller/private_events.h @@ -32,6 +32,10 @@ struct TEvPrivate { EvAlterDstResult, EvRemoveWorker, EvDescribeTargetsResult, + EvRequestCreateStream, + EvAllowCreateStream, + EvRequestDropStream, + EvAllowDropStream, EvEnd, }; @@ -221,6 +225,18 @@ struct TEvPrivate { TString ToString() const override; }; + struct TEvRequestCreateStream: public TEventLocal<TEvRequestCreateStream, EvRequestCreateStream> { + }; + + struct TEvAllowCreateStream: public TEventLocal<TEvAllowCreateStream, EvAllowCreateStream> { + }; + + struct TEvRequestDropStream: public TEventLocal<TEvRequestDropStream, EvRequestDropStream> { + }; + + struct TEvAllowDropStream: public TEventLocal<TEvAllowDropStream, EvAllowDropStream> { + }; + }; // TEvPrivate } diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp index 7603154d544..cd8cb06d8e7 100644 --- a/ydb/core/tx/replication/controller/stream_creator.cpp +++ b/ydb/core/tx/replication/controller/stream_creator.cpp @@ -28,6 +28,24 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> { .AddAttribute("__async_replication", NJson::WriteJson(attrs, false)); } + void RequestPermission() { + Send(Parent, new TEvPrivate::TEvRequestCreateStream()); + Become(&TThis::StateRequestPermission); + } + + STATEFN(StateRequestPermission) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPrivate::TEvAllowCreateStream, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvPrivate::TEvAllowCreateStream::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + CreateStream(); + } + void CreateStream() { switch (Kind) { case TReplication::ETargetKind::Table: @@ -103,6 +121,10 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> { LOG_T("Handle " << ev->Get()->ToString()); auto& result = ev->Get()->Result; + if (result.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + return Reply(NYdb::TStatus(NYdb::EStatus::SUCCESS, NYql::TIssues())); + } + if (!result.IsSuccess()) { if (IsRetryableError(result)) { LOG_D("Retry CreateConsumer"); @@ -155,7 +177,7 @@ public: } void Bootstrap() { - CreateStream(); + RequestPermission(); } STATEFN(StateBase) { diff --git a/ydb/core/tx/replication/controller/stream_remover.cpp b/ydb/core/tx/replication/controller/stream_remover.cpp index b1acdb46b04..4f75dc27a69 100644 --- a/ydb/core/tx/replication/controller/stream_remover.cpp +++ b/ydb/core/tx/replication/controller/stream_remover.cpp @@ -10,6 +10,24 @@ namespace NKikimr::NReplication::NController { class TStreamRemover: public TActorBootstrapped<TStreamRemover> { + void RequestPermission() { + Send(Parent, new TEvPrivate::TEvRequestDropStream()); + Become(&TThis::StateRequestPermission); + } + + STATEFN(StateRequestPermission) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPrivate::TEvAllowDropStream, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvPrivate::TEvAllowDropStream::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + DropStream(); + } + void DropStream() { switch (Kind) { case TReplication::ETargetKind::Table: @@ -26,7 +44,8 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> { switch (ev->GetTypeRewrite()) { hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle); sFunc(TEvents::TEvWakeup, DropStream); - sFunc(TEvents::TEvPoison, PassAway); + default: + return StateBase(ev); } } @@ -77,7 +96,13 @@ public: } void Bootstrap() { - DropStream(); + RequestPermission(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoison, PassAway); + } } private: |