diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-10-10 19:49:19 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-10-10 19:49:19 +0300 |
commit | 3f1ac4455ebfdaf68d0dac1bca26110f014457cf (patch) | |
tree | 7960919cf3a8d0315df3e855b553cc72027db787 | |
parent | 9482c913e2ec8adcdb94f0bf34a45b9017fd7add (diff) | |
download | ydb-3f1ac4455ebfdaf68d0dac1bca26110f014457cf.tar.gz |
TRequestActor for control plane storage requests
Рефакторинг состоит в том, чтобы сделать возможным/удобным создание специальных акторов для обработки запроса в control plane storage. Сейчас практически все запросы в control plane storage обрабатываются как один вызов метода для доступа к базе и в колбеке на фьючу отправка ответа клиенту. Это не позволяет нам вставить какие-то акторные действия перед запросом в базу. Сделал рефакторинг, чтобы такие действия были возможны. Далее в планах в качестве основного кода для данного тикета вставить создание ресурса квотировщика перед запросом в базу на создание квери.
11 files changed, 239 insertions, 188 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp index 0c589ceb7e..b8ac7a40e6 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp @@ -107,7 +107,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth const auto readQuery = readQueryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - TAsyncStatus status = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo); + TAsyncStatus status = ReadModifyWrite(readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo); auto prepare = [response] { return *response; }; auto success = SendResponse<TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Fq::Private::NodesHealthCheckResult>( "NodesHealthCheckRequest - NodesHealthCheckResult", diff --git a/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp b/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp index 00814c6a03..2a7d4491ac 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp @@ -1,6 +1,7 @@ #include "utils.h" #include <ydb/public/lib/yq/scope.h> +#include <ydb/core/yq/libs/control_plane_storage/request_actor.h> #include <ydb/core/yq/libs/db_schema/db_schema.h> #include <ydb/core/yq/libs/quota_manager/quota_manager.h> #include <ydb/core/yq/libs/quota_manager/events/events.h> @@ -41,21 +42,62 @@ struct TEvPrivate { }; template <class TRequest, class TResponse, class TDerived> -class TRateLimiterRequestActor : public NActors::TActor<TDerived> { +class TRateLimiterRequestActor : public TControlPlaneRequestActor<TRequest, TResponse, TDerived> { + using TRequestActorBase = TControlPlaneRequestActor<TRequest, TResponse, TDerived>; + +protected: + using TRequestActorBase::ReplyWithError; + public: - using TResponseEvent = TResponse; - - TRateLimiterRequestActor(TInstant startTime, typename TRequest::TPtr&& ev, TRequestCounters requestCounters, TDebugInfoPtr debugInfo) - : NActors::TActor<TDerived>(&TDerived::StateFunc) - , Request(std::move(ev)) - , RequestCounters(std::move(requestCounters)) - , DebugInfo(std::move(debugInfo)) - , QueryId(Request->Get()->Request.query_id().value()) - , OwnerId(Request->Get()->Request.owner_id()) - , StartTime(startTime) + TRateLimiterRequestActor(typename TRequest::TPtr&& ev, TRequestCounters requestCounters, TDebugInfoPtr debugInfo, TDbPool::TPtr dbPool, TYdbConnectionPtr ydbConnection) + : TRequestActorBase(std::move(ev), std::move(requestCounters), std::move(debugInfo), std::move(dbPool), std::move(ydbConnection)) + , QueryId(this->Request->Get()->Request.query_id().value()) + , OwnerId(this->Request->Get()->Request.owner_id()) { } + void Start() { + auto& request = this->Request->Get()->Request; + const TString& scope = request.scope(); + const TString& tenant = request.tenant(); + + if (NYql::TIssues issues = ValidateCreateOrDeleteRateLimiterResource(QueryId, scope, tenant, OwnerId)) { + CPS_LOG_W(TDerived::RequestTypeName << "Request: {" << request.DebugString() << "} validation FAILED: " << issues.ToOneLineString()); + ReplyWithError(issues); + return; + } + + this->Become(&TDerived::StateFunc); + + TSqlQueryBuilder readQueryBuilder(this->YdbConnection->TablePathPrefix, TDerived::RequestTypeName); + readQueryBuilder.AddString("query_id", QueryId); + readQueryBuilder.AddString("scope", scope); + readQueryBuilder.AddString("tenant", tenant); + TStringBuilder text; + text << + "SELECT `" SCOPE_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "` FROM " PENDING_SMALL_TABLE_NAME + " WHERE `" TENANT_COLUMN_NAME "` = $tenant" + " AND `" SCOPE_COLUMN_NAME "` = $scope" + " AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" + + "SELECT `" INTERNAL_COLUMN_NAME "`"; + if constexpr (TDerived::IsCreateRequest) { + text << ", `" QUERY_COLUMN_NAME "`"; + } + text << " FROM " QUERIES_TABLE_NAME + " WHERE `" SCOPE_COLUMN_NAME "` = $scope" + " AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"; + readQueryBuilder.AddText(text); + + const auto query = readQueryBuilder.Build(); + auto [readStatus, resultSets] = this->Read(query.Sql, query.Params, this->RequestCounters, this->DebugInfo); + readStatus.Subscribe( + [resultSets = resultSets, actorSystem = NActors::TActivationContext::ActorSystem(), selfId = this->SelfId()] (const TAsyncStatus& status) { + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvDbRequestResult(status, std::move(resultSets)))); + } + ); + } + void Handle(TEvPrivate::TEvDbRequestResult::TPtr& ev) { const auto& status = ev->Get()->Status.GetValueSync(); CPS_LOG_D(TDerived::RequestTypeName << "Request. Got response from database: " << status.GetStatus()); @@ -121,47 +163,9 @@ public: } } - void ReplyWithError(const TString& msg) { - NYql::TIssues issues; - issues.AddIssue(msg); - ReplyWithError(issues); - } - - void ReplyWithError(const NYql::TIssues& issues) { - SendResponseEventAndPassAway(std::make_unique<TResponse>(issues), false); - } - - void Reply(const typename TResponse::TProto& proto) { - SendResponseEventAndPassAway(std::make_unique<TResponse>(proto), true); - } - - void SendResponseEventAndPassAway(std::unique_ptr<TResponse> event, bool success) { - event->DebugInfo = std::move(DebugInfo); - - RequestCounters.Common->ResponseBytes->Add(event->GetByteSize()); - RequestCounters.IncInFly(); - if (success) { - RequestCounters.IncOk(); - } else { - RequestCounters.IncError(); - } - const TDuration duration = TInstant::Now() - StartTime; - RequestCounters.Common->LatencyMs->Collect(duration.MilliSeconds()); - - TDerived::LwProbe(QueryId, duration, success); - - this->Send(Request->Sender, event.release(), 0, Request->Cookie); - - this->PassAway(); - } - protected: - const typename TRequest::TPtr Request; - TRequestCounters RequestCounters; - TDebugInfoPtr DebugInfo; const TString QueryId; const TString OwnerId; - const TInstant StartTime; TString CloudId; TString FolderId; TMaybe<double> QueryLimit; @@ -208,12 +212,8 @@ public: } } - static void LwProbe(const TString& queryId, TInstant startTime, bool success) { - LwProbe(queryId, TInstant::Now() - startTime, success); - } - - static void LwProbe(const TString& queryId, TDuration duration, bool success) { - LWPROBE(CreateRateLimiterResourceRequest, queryId, duration, success); + void LwProbe(bool success) { + LWPROBE(CreateRateLimiterResourceRequest, QueryId, GetRequestDuration(), success); } static const TString RequestTypeName; @@ -253,12 +253,8 @@ public: } } - static void LwProbe(const TString& queryId, TInstant startTime, bool success) { - LwProbe(queryId, TInstant::Now() - startTime, success); - } - - static void LwProbe(const TString& queryId, TDuration duration, bool success) { - LWPROBE(DeleteRateLimiterResourceRequest, queryId, duration, success); + void LwProbe(bool success) { + LWPROBE(DeleteRateLimiterResourceRequest, QueryId, GetRequestDuration(), success); } static const TString RequestTypeName; @@ -271,59 +267,11 @@ const TString TRateLimiterDeleteRequest::RequestTypeName = "DeleteRateLimiterRes template <class TEventPtr, class TRequestActor, TYdbControlPlaneStorageActor::ERequestTypeCommon requestType> void TYdbControlPlaneStorageActor::HandleRateLimiterImpl(TEventPtr& ev) { - const TInstant startTime = TInstant::Now(); TRequestCounters requestCounters{nullptr, Counters.GetCommonCounters(requestType)}; - requestCounters.IncInFly(); - requestCounters.Common->RequestBytes->Add(ev->Get()->GetByteSize()); - - auto& request = ev->Get()->Request; - const TString& queryId = request.query_id().value(); - const TString& scope = request.scope(); - const TString& tenant = request.tenant(); - const TString& owner = request.owner_id(); - - CPS_LOG_T(TRequestActor::RequestTypeName << "Request: {" << request.DebugString() << "}"); - - NYql::TIssues issues = ValidateCreateOrDeleteRateLimiterResource(queryId, scope, tenant, owner); - if (issues) { - CPS_LOG_W(TRequestActor::RequestTypeName << "Request: {" << request.DebugString() << "} validation FAILED: " << issues.ToOneLineString()); - const TDuration delta = TInstant::Now() - startTime; - SendResponseIssues<typename TRequestActor::TResponseEvent>(ev->Sender, issues, ev->Cookie, delta, requestCounters); - TRequestActor::LwProbe(queryId, startTime, false); - return; - } auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - const NActors::TActorId requestActor = Register(new TRequestActor(startTime, std::move(ev), requestCounters, debugInfo)); - - TSqlQueryBuilder readQueryBuilder(YdbConnection->TablePathPrefix, TRequestActor::RequestTypeName); - readQueryBuilder.AddString("query_id", queryId); - readQueryBuilder.AddString("scope", scope); - readQueryBuilder.AddString("tenant", tenant); - TStringBuilder text; - text << - "SELECT `" SCOPE_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "` FROM " PENDING_SMALL_TABLE_NAME - " WHERE `" TENANT_COLUMN_NAME "` = $tenant" - " AND `" SCOPE_COLUMN_NAME "` = $scope" - " AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" - - "SELECT `" INTERNAL_COLUMN_NAME "`"; - if constexpr (TRequestActor::IsCreateRequest) { - text << ", `" QUERY_COLUMN_NAME "`"; - } - text << " FROM " QUERIES_TABLE_NAME - " WHERE `" SCOPE_COLUMN_NAME "` = $scope" - " AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"; - readQueryBuilder.AddText(text); - - const auto query = readQueryBuilder.Build(); - auto [readStatus, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); - readStatus.Subscribe( - [resultSets = resultSets, actorSystem = NActors::TActivationContext::ActorSystem(), requestActor] (const TAsyncStatus& status) { - actorSystem->Send(new IEventHandle(requestActor, requestActor, new TEvPrivate::TEvDbRequestResult(status, std::move(resultSets)))); - } - ); + const NActors::TActorId requestActor = Register(new TRequestActor(std::move(ev), std::move(requestCounters), std::move(debugInfo), DbPool, YdbConnection)); } void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateRateLimiterResourceRequest::TPtr& ev) { diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index f33261b49a..bde301450a 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -300,7 +300,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ }; const auto query = queryBuilder.Build(); - auto [readStatus, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, TTxSettings::StaleRO()); + auto [readStatus, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo, TTxSettings::StaleRO()); auto result = readStatus.Apply( [=, resultSets=resultSets, diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index 4cb71db428..06cf2c3983 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -475,7 +475,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq auto prepareParams = std::get<2>(pingTaskParams); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery, readParams, prepareParams, requestCounters, debugInfo); + auto result = ReadModifyWrite(readQuery, readParams, prepareParams, requestCounters, debugInfo); auto prepare = [response] { return *response; }; auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Fq::Private::PingTaskResult>( "PingTaskRequest - PingTaskResult", diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp index c7cdbc3738..fe33852013 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp @@ -66,7 +66,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + TAsyncStatus result = Write(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [response] { return *response; }; auto success = SendResponse<TEvControlPlaneStorage::TEvWriteResultDataResponse, Fq::Private::WriteTaskResultResult>( "WriteResultDataRequest - WriteResultDataResult", diff --git a/ydb/core/yq/libs/control_plane_storage/request_actor.h b/ydb/core/yq/libs/control_plane_storage/request_actor.h new file mode 100644 index 0000000000..1197032fb1 --- /dev/null +++ b/ydb/core/yq/libs/control_plane_storage/request_actor.h @@ -0,0 +1,94 @@ +#pragma once +#include "util.h" + +#include <ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h> +#include <ydb/core/yq/libs/control_plane_storage/events/events.h> +#include <ydb/core/yq/libs/control_plane_storage/schema.h> +#include <ydb/core/yq/libs/db_schema/db_schema.h> +#include <ydb/core/yq/libs/shared_resources/db_exec.h> +#include <ydb/public/api/protos/yq.pb.h> +#include <ydb/public/lib/yq/scope.h> +#include <ydb/public/sdk/cpp/client/ydb_value/value.h> + +#include <util/datetime/base.h> + +namespace NYq { + +template <class TRequest, class TResponse, class TDerived> +class TControlPlaneRequestActor : public NActors::TActorBootstrapped<TDerived>, + public TDbRequester +{ +public: + using TResponseEvent = TResponse; + using TRequestEvent = TRequest; + using TBaseActor = NActors::TActorBootstrapped<TDerived>; + + using TBaseActor::Send; + +protected: + TControlPlaneRequestActor(typename TRequestEvent::TPtr&& ev, TRequestCounters requestCounters, TDebugInfoPtr debugInfo, TDbPool::TPtr dbPool, TYdbConnectionPtr ydbConnection) + : TDbRequester(std::move(dbPool), std::move(ydbConnection)) + , Request(std::move(ev)) + , RequestCounters(std::move(requestCounters)) + , DebugInfo(std::move(debugInfo)) + { + RequestCounters.IncInFly(); + RequestCounters.Common->RequestBytes->Add(Request->Get()->GetByteSize()); + } + +public: + void Bootstrap() { + CPS_LOG_T(TDerived::RequestTypeName << "Request: {" << Request->Get()->Request.DebugString() << "}"); + + AsDerived()->Start(); + } + +protected: + void ReplyWithError(const TString& msg) { + NYql::TIssues issues; + issues.AddIssue(msg); + ReplyWithError(issues); + } + + void ReplyWithError(const NYql::TIssues& issues) { + SendResponseEventAndPassAway(std::make_unique<TResponse>(issues), false); + } + + void Reply(const typename TResponse::TProto& proto) { + SendResponseEventAndPassAway(std::make_unique<TResponse>(proto), true); + } + + void SendResponseEventAndPassAway(std::unique_ptr<TResponse> event, bool success) { + event->DebugInfo = std::move(DebugInfo); + + RequestCounters.Common->ResponseBytes->Add(event->GetByteSize()); + RequestCounters.IncInFly(); + if (success) { + RequestCounters.IncOk(); + } else { + RequestCounters.IncError(); + } + RequestCounters.Common->LatencyMs->Collect(GetRequestDuration().MilliSeconds()); + + AsDerived()->LwProbe(success); + + Send(Request->Sender, event.release(), 0, Request->Cookie); + this->PassAway(); + } + + TDuration GetRequestDuration() const { + return TInstant::Now() - StartTime; + } + + TDerived* AsDerived() { + return static_cast<TDerived*>(this); + } + +protected: + const TInstant StartTime = TInstant::Now(); + const typename TRequestEvent::TPtr Request; + TRequestCounters RequestCounters; + TDebugInfoPtr DebugInfo; +}; + +} // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index cab6faa288..3d099b0a6f 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -325,8 +325,7 @@ TAsyncStatus ExecDbRequest(TDbPool::TPtr dbPool, std::function<NYdb::TAsyncStatu return promise.GetFuture(); } -std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TYdbControlPlaneStorageActor::Read( - NActors::TActorSystem* actorSystem, +std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TDbRequester::Read( const TString& query, const NYdb::TParams& params, const TRequestCounters& requestCounters, @@ -334,6 +333,7 @@ std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TYdbControlP TTxSettings transactionMode, bool retryOnTli) { + NActors::TActorSystem* const actorSystem = TActivationContext::ActorSystem(); auto resultSet = std::make_shared<TVector<NYdb::TResultSet>>(); std::shared_ptr<int> retryCount = std::make_shared<int>(); @@ -362,11 +362,11 @@ std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TYdbControlP }; TPromise<NYdb::TStatus> promise = NewPromise<NYdb::TStatus>(); - Register(new TDbRequest(DbPool, promise, handler)); + TActivationContext::AsActorContext().Register(new TDbRequest(DbPool, promise, handler)); return {promise.GetFuture(), resultSet}; } -TAsyncStatus TYdbControlPlaneStorageActor::Validate( +TAsyncStatus TDbRequester::Validate( NActors::TActorSystem* actorSystem, std::shared_ptr<TMaybe<TTransaction>> transaction, size_t item, @@ -402,8 +402,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::Validate( }); } -TAsyncStatus TYdbControlPlaneStorageActor::Write( - NActors::TActorSystem* actorSystem, +TAsyncStatus TDbRequester::Write( const TString& query, const NYdb::TParams& params, const TRequestCounters& requestCounters, @@ -412,6 +411,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::Write( TTxSettings transactionMode, bool retryOnTli) { + NActors::TActorSystem* const actorSystem = TActivationContext::ActorSystem(); std::shared_ptr<int> retryCount = std::make_shared<int>(); auto transaction = std::make_shared<TMaybe<TTransaction>>(); auto writeHandler = [=, retryOnTli=retryOnTli] (TSession session) { @@ -462,7 +462,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::Write( }); }; TPromise<NYdb::TStatus> promise = NewPromise<NYdb::TStatus>(); - Register(new TDbRequest(DbPool, promise, handler)); + TActivationContext::AsActorContext().Register(new TDbRequest(DbPool, promise, handler)); return promise.GetFuture(); } @@ -474,7 +474,7 @@ NThreading::TFuture<void> TYdbControlPlaneStorageActor::PickTask( const TVector<TValidationQuery>& validators, TTxSettings transactionMode) { - return ReadModifyWrite(NActors::TActivationContext::ActorSystem(), taskParams.ReadQuery, taskParams.ReadParams, + return ReadModifyWrite(taskParams.ReadQuery, taskParams.ReadParams, taskParams.PrepareParams, requestCounters, debugInfo, validators, transactionMode, taskParams.RetryOnTli) .Apply([=, responseTasks=responseTasks, queryId = taskParams.QueryId](const auto& future) { const auto status = future.GetValue(); @@ -484,8 +484,7 @@ NThreading::TFuture<void> TYdbControlPlaneStorageActor::PickTask( }); } -TAsyncStatus TYdbControlPlaneStorageActor::ReadModifyWrite( - NActors::TActorSystem* actorSystem, +TAsyncStatus TDbRequester::ReadModifyWrite( const TString& readQuery, const NYdb::TParams& readParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>& prepare, @@ -495,6 +494,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::ReadModifyWrite( TTxSettings transactionMode, bool retryOnTli) { + NActors::TActorSystem* const actorSystem = TActivationContext::ActorSystem(); std::shared_ptr<int> retryCount = std::make_shared<int>(); auto resultSets = std::make_shared<TVector<NYdb::TResultSet>>(); auto transaction = std::make_shared<TMaybe<TTransaction>>(); @@ -599,7 +599,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::ReadModifyWrite( }); }; TPromise<NYdb::TStatus> promise = NewPromise<NYdb::TStatus>(); - Register(new TDbRequest(DbPool, promise, handler)); + TActivationContext::AsActorContext().Register(new TDbRequest(DbPool, promise, handler)); return promise.GetFuture(); } diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp index 8f32943d7a..44a04ab846 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp @@ -126,7 +126,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateBindi const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators); + TAsyncStatus result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvCreateBindingResponse, YandexQuery::CreateBindingResult, TAuditDetails<YandexQuery::Binding>>( MakeLogPrefix(scope, user, bindingId) + "CreateBindingRequest", @@ -233,7 +233,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListBinding const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [resultSets=resultSets, limit] { if (resultSets->size() != 1) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; @@ -337,7 +337,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeBin const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [=, resultSets=resultSets] { if (resultSets->size() != 1) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; @@ -555,7 +555,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyBindi const auto readQuery = readQueryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators); + auto result = ReadModifyWrite(readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvModifyBindingResponse, YandexQuery::ModifyBindingResult, TAuditDetails<YandexQuery::Binding>>( MakeLogPrefix(scope, user, bindingId) + "ModifyBindingRequest", @@ -666,7 +666,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteBindi const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators); + auto result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvDeleteBindingResponse, YandexQuery::DeleteBindingResult, TAuditDetails<YandexQuery::Binding>>( MakeLogPrefix(scope, user, bindingId) + "DeleteBindingRequest", diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp index e6915b8f4a..913b1cf513 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp @@ -120,7 +120,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateConne const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators); + TAsyncStatus result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvCreateConnectionResponse, YandexQuery::CreateConnectionResult, TAuditDetails<YandexQuery::Connection>>( MakeLogPrefix(scope, user, connectionId) + "CreateConnectionRequest", @@ -228,7 +228,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListConnect const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [resultSets=resultSets, limit] { if (resultSets->size() != 1) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; @@ -322,7 +322,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeCon const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [=, resultSets=resultSets] { if (resultSets->size() != 1) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; @@ -526,7 +526,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyConne const auto readQuery = readQueryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators); + auto result = ReadModifyWrite(readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvModifyConnectionResponse, YandexQuery::ModifyConnectionResult, TAuditDetails<YandexQuery::Connection>>( MakeLogPrefix(scope, user, connectionId) + "ModifyConnectionRequest", @@ -645,7 +645,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteConne const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators); + auto result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvDeleteConnectionResponse, YandexQuery::DeleteConnectionResult, TAuditDetails<YandexQuery::Connection>>( MakeLogPrefix(scope, user, connectionId) + "DeleteConnectionRequest", diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index 01aac43de5..9a71373447 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -211,7 +211,58 @@ void InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const void ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey); -class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbControlPlaneStorageActor> { +class TDbRequester { +protected: + explicit TDbRequester(TDbPool::TPtr pool = nullptr, TYdbConnectionPtr ydbConnection = nullptr) + : DbPool(std::move(pool)) + , YdbConnection(std::move(ydbConnection)) + { + } + + std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> Read( + const TString& query, + const NYdb::TParams& params, + const TRequestCounters& requestCounters, + TDebugInfoPtr debugInfo, + TTxSettings transactionMode = TTxSettings::SerializableRW(), + bool retryOnTli = true); + + TAsyncStatus Validate( + NActors::TActorSystem* actorSystem, + std::shared_ptr<TMaybe<TTransaction>> transaction, + size_t item, const TVector<TValidationQuery>& validators, + TSession session, + std::shared_ptr<bool> successFinish, + TDebugInfoPtr debugInfo, + TTxSettings transactionMode = TTxSettings::SerializableRW()); + + TAsyncStatus Write( + const TString& query, + const NYdb::TParams& params, + const TRequestCounters& requestCounters, + TDebugInfoPtr debugInfo, + const TVector<TValidationQuery>& validators = {}, + TTxSettings transactionMode = TTxSettings::SerializableRW(), + bool retryTli = true); + + TAsyncStatus ReadModifyWrite( + const TString& readQuery, + const NYdb::TParams& readParams, + const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>& prepare, + const TRequestCounters& requestCounters, + TDebugInfoPtr debugInfo = {}, + const TVector<TValidationQuery>& validators = {}, + TTxSettings transactionMode = TTxSettings::SerializableRW(), + bool retryOnTli = true); + +protected: + TDbPool::TPtr DbPool; + TYdbConnectionPtr YdbConnection; +}; + +class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbControlPlaneStorageActor>, + public TDbRequester +{ enum ERequestTypeScope { RTS_CREATE_QUERY, RTS_LIST_QUERIES, @@ -408,10 +459,7 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont TCounters Counters; - TYdbConnectionPtr YdbConnection; - ::NYq::TYqSharedResources::TPtr YqSharedResources; - TDbPool::TPtr DbPool; static constexpr int64_t InitialRevision = 1; @@ -574,45 +622,6 @@ private: */ bool IsSuperUser(const TString& user); - std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> Read( - NActors::TActorSystem* actorSystem, - const TString& query, - const NYdb::TParams& params, - const TRequestCounters& requestCounters, - TDebugInfoPtr debugInfo, - TTxSettings transactionMode = TTxSettings::SerializableRW(), - bool retryOnTli = true); - - TAsyncStatus Validate( - NActors::TActorSystem* actorSystem, - std::shared_ptr<TMaybe<TTransaction>> transaction, - size_t item, const TVector<TValidationQuery>& validators, - TSession session, - std::shared_ptr<bool> successFinish, - TDebugInfoPtr debugInfo, - TTxSettings transactionMode = TTxSettings::SerializableRW()); - - TAsyncStatus Write( - NActors::TActorSystem* actorSystem, - const TString& query, - const NYdb::TParams& params, - const TRequestCounters& requestCounters, - TDebugInfoPtr debugInfo, - const TVector<TValidationQuery>& validators = {}, - TTxSettings transactionMode = TTxSettings::SerializableRW(), - bool retryTli = true); - - TAsyncStatus ReadModifyWrite( - NActors::TActorSystem* actorSystem, - const TString& readQuery, - const NYdb::TParams& readParams, - const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>& prepare, - const TRequestCounters& requestCounters, - TDebugInfoPtr debugInfo = {}, - const TVector<TValidationQuery>& validators = {}, - TTxSettings transactionMode = TTxSettings::SerializableRW(), - bool retryOnTli = true); - template<class ResponseEvent, class Result, class RequestEventPtr> TFuture<bool> SendResponse(const TString& name, NActors::TActorSystem* actorSystem, diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 4d3e5b5e78..25c921e44d 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -348,7 +348,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery const auto read = readQueryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - TAsyncStatus status = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), read.Sql, read.Params, prepareParams, requestCounters, debugInfo); + TAsyncStatus status = ReadModifyWrite(read.Sql, read.Params, prepareParams, requestCounters, debugInfo); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvCreateQueryResponse, YandexQuery::CreateQueryResult, TAuditDetails<YandexQuery::Query>>( "CreateQueryRequest - CreateQueryResult", @@ -484,7 +484,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListQueries const auto read = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), read.Sql, read.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(read.Sql, read.Params, requestCounters, debugInfo); auto prepare = [resultSets=resultSets, limit] { if (resultSets->size() != 1) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; @@ -574,7 +574,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeQue ); const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [resultSets=resultSets, user,permissions] { if (resultSets->size() != 1) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; @@ -698,7 +698,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetQuerySta const auto read = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), read.Sql, read.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(read.Sql, read.Params, requestCounters, debugInfo); auto prepare = [resultSets=resultSets, user,permissions] { if (resultSets->size() != 1) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; @@ -1108,7 +1108,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery const auto read = readQueryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), read.Sql, read.Params, prepareParams, requestCounters, debugInfo, validators); + auto result = ReadModifyWrite(read.Sql, read.Params, prepareParams, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvModifyQueryResponse, YandexQuery::ModifyQueryResult, TAuditDetails<YandexQuery::Query>>( "ModifyQueryRequest - ModifyQueryResult", @@ -1223,7 +1223,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteQuery const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators); + auto result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvDeleteQueryResponse, YandexQuery::DeleteQueryResult, TAuditDetails<YandexQuery::Query>>( "DeleteQueryRequest - DeleteQueryResult", @@ -1451,7 +1451,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer const auto readQuery = readQueryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators); + auto result = ReadModifyWrite(readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators); auto prepare = [response] { return *response; }; auto success = SendAuditResponse<TEvControlPlaneStorage::TEvControlQueryResponse, YandexQuery::ControlQueryResult, TAuditDetails<YandexQuery::Query>>( "ControlQueryRequest - ControlQueryRequest", @@ -1527,7 +1527,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetResultDa const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [resultSets=resultSets, resultSetIndex, user, permissions] { if (resultSets->size() != 2) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 2 but equal " << resultSets->size() << ". Please contact internal support"; @@ -1685,7 +1685,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListJobsReq const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [resultSets=resultSets, limit] { if (resultSets->size() != 1) { ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; @@ -1785,7 +1785,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeJob const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [=, id=request.job_id(), resultSets=resultSets] { if (resultSets->size() != 1) { |