diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-07-31 19:13:04 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-07-31 19:13:04 +0300 |
commit | 5cf9beeab3ea847da0b6c414fcb5faa9cb041317 (patch) | |
tree | 5feff351da06c746e5add774887b1915baabd448 | |
parent | 6c8877167b6eb4a454d93d49f05111b488ea015e (diff) | |
download | ydb-5cf9beeab3ea847da0b6c414fcb5faa9cb041317.tar.gz |
moved request actor to separate file
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/actors/control_plane_proxy_request_actor.h | 293 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp | 246 |
2 files changed, 294 insertions, 245 deletions
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_proxy_request_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_proxy_request_actor.h new file mode 100644 index 00000000000..0a94b233347 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_proxy_request_actor.h @@ -0,0 +1,293 @@ +#pragma once + +#include "counters.h" + +#include <contrib/libs/fmt/include/fmt/format.h> +#include <library/cpp/actors/core/event.h> +#include <util/generic/maybe.h> + +#include <ydb/core/fq/libs/actors/logging/log.h> +#include <ydb/core/fq/libs/control_plane_proxy/events/events.h> +#include <ydb/core/fq/libs/control_plane_storage/events/events.h> +#include <ydb/library/yql/public/issue/yql_issue.h> + +namespace NFq { +namespace NPrivate { + +template<class TRequestProto, class TRequest, class TResponse, class TRequestProxy, class TResponseProxy> +class TRequestActor : + public NActors::TActorBootstrapped< + TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>> { +protected: + using TBase = NActors::TActorBootstrapped< + TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>>; + using TBase::SelfId; + using TBase::Send; + using TBase::PassAway; + using TBase::Become; + using TBase::Schedule; + + typename TRequestProxy::TPtr RequestProxy; + ::NFq::TControlPlaneProxyConfig Config; + TRequestProto RequestProto; + TString Scope; + TString FolderId; + TString User; + TString Token; + TActorId Sender; + ui32 Cookie; + TActorId ServiceId; + TRequestCounters Counters; + TInstant StartTime; + std::function<void(const TDuration&, bool /* isSuccess */, bool /* isTimeout */)> Probe; + TPermissions Permissions; + TString CloudId; + TString SubjectType; + const TMaybe<TQuotaMap> Quotas; + TTenantInfo::TPtr TenantInfo; + TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase; + ui32 RetryCount = 0; + bool ReplyWithResponseOnSuccess = true; + +public: + static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR"; + + explicit TRequestActor(typename TRequestProxy::TPtr requestProxy, + const ::NFq::TControlPlaneProxyConfig& config, + TActorId sender, + ui32 cookie, + const TString& scope, + const TString& folderId, + TRequestProto&& requestProto, + TString&& user, + TString&& token, + const TActorId& serviceId, + const TRequestCounters& counters, + const std::function<void(const TDuration&, bool, bool)>& probe, + TPermissions permissions, + const TString& cloudId, + const TString& subjectType, + TMaybe<TQuotaMap>&& quotas = Nothing(), + TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal>&& + computeDatabase = Nothing(), + bool replyWithResponseOnSuccess = true) + : RequestProxy(requestProxy) + , Config(config) + , RequestProto(std::forward<TRequestProto>(requestProto)) + , Scope(scope) + , FolderId(folderId) + , User(std::move(user)) + , Token(std::move(token)) + , Sender(sender) + , Cookie(cookie) + , ServiceId(serviceId) + , Counters(counters) + , StartTime(TInstant::Now()) + , Probe(probe) + , Permissions(permissions) + , CloudId(cloudId) + , SubjectType(subjectType) + , Quotas(std::move(quotas)) + , ComputeDatabase(std::move(computeDatabase)) + , ReplyWithResponseOnSuccess(replyWithResponseOnSuccess) { + Counters.IncInFly(); + } + +public: + void Bootstrap() { + CPP_LOG_T("Request actor. Actor id: " << SelfId()); + Become(&TRequestActor::StateFunc, + Config.RequestTimeout, + new NActors::TEvents::TEvWakeup()); + Send(ControlPlaneConfigActorId(), + new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); + OnBootstrap(); + } + + virtual void OnBootstrap() { } + + STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); + hFunc(TResponse, Handle); + cFunc(TEvControlPlaneConfig::EvGetTenantInfoRequest, HandleRetry); + hFunc(TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle);) + + void HandleRetry() { + Send(ControlPlaneConfigActorId(), + new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); + } + + void Handle(TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) { + TenantInfo = std::move(ev->Get()->TenantInfo); + if (TenantInfo) { + SendRequestIfCan(); + } else { + RetryCount++; + Schedule(Now() + Config.ConfigRetryPeriod * (1 << RetryCount), + new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); + } + } + + void HandleTimeout() { + CPP_LOG_D("Request timeout. " << RequestProto.DebugString()); + NYql::TIssues issues; + NYql::TIssue issue = + MakeErrorIssue(TIssuesIds::TIMEOUT, + "Request timeout. Try repeating the request later"); + issues.AddIssue(issue); + Counters.IncTimeout(); + ReplyWithError(issues, true); + } + + void Handle(typename TResponse::TPtr& ev) { + auto& response = *ev->Get(); + ProcessResponse(response); + } + + template<typename T> + void ProcessResponse(const T& response) { + if (response.Issues) { + ReplyWithError(response.Issues); + } else { + ReplyWithSuccess(response.Result); + } + } + + template<typename T> + requires requires(T t) { t.AuditDetails; } + void ProcessResponse(const T& response) { + if (response.Issues) { + ReplyWithError(response.Issues); + } else { + ReplyWithSuccess(response.Result, response.AuditDetails); + } + } + + void ReplyWithError(const NYql::TIssues& issues, bool isTimeout = false) { + const TDuration delta = TInstant::Now() - StartTime; + Counters.IncError(); + Probe(delta, false, isTimeout); + Send(Sender, new TResponseProxy(issues, SubjectType), 0, Cookie); + PassAway(); + } + + template<class... TArgs> + void ReplyWithSuccess(TArgs&&... args) { + const TDuration delta = TInstant::Now() - StartTime; + Counters.IncOk(); + Probe(delta, true, false); + if (ReplyWithResponseOnSuccess) { + Send(Sender, + new TResponseProxy(std::forward<TArgs>(args)..., SubjectType), + 0, + Cookie); + } else { + RequestProxy->Get()->Response = + std::make_unique<TResponseProxy>(std::forward<TArgs>(args)..., SubjectType); + RequestProxy->Get()->ControlPlaneYDBOperationWasPerformed = true; + Send(RequestProxy->Forward(ControlPlaneProxyActorId())); + } + PassAway(); + } + + virtual bool CanSendRequest() const { return bool(TenantInfo); } + + void SendRequestIfCan() { + if (CanSendRequest()) { + Send(ServiceId, + new TRequest(Scope, + RequestProto, + User, + Token, + CloudId, + Permissions, + Quotas, + TenantInfo, + ComputeDatabase.GetOrElse({})), + 0, + Cookie); + } + } + + virtual ~TRequestActor() { + Counters.DecInFly(); + Counters.Common->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + } +}; + +class TCreateQueryRequestActor : + public TRequestActor<FederatedQuery::CreateQueryRequest, + TEvControlPlaneStorage::TEvCreateQueryRequest, + TEvControlPlaneStorage::TEvCreateQueryResponse, + TEvControlPlaneProxy::TEvCreateQueryRequest, + TEvControlPlaneProxy::TEvCreateQueryResponse> { + bool QuoterResourceCreated = false; + +public: + using TBaseRequestActor = TRequestActor<FederatedQuery::CreateQueryRequest, + TEvControlPlaneStorage::TEvCreateQueryRequest, + TEvControlPlaneStorage::TEvCreateQueryResponse, + TEvControlPlaneProxy::TEvCreateQueryRequest, + TEvControlPlaneProxy::TEvCreateQueryResponse>; + using TBaseRequestActor::TBaseRequestActor; + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvRateLimiter::TEvCreateResourceResponse, Handle); + default: + return TBaseRequestActor::StateFunc(ev); + } + } + + void OnBootstrap() override { + Become(&TCreateQueryRequestActor::StateFunc); + if (Quotas) { + SendCreateRateLimiterResourceRequest(); + } else { + SendRequestIfCan(); + } + } + + void SendCreateRateLimiterResourceRequest() { + if (auto quotaIt = Quotas->find(QUOTA_CPU_PERCENT_LIMIT); quotaIt != Quotas->end()) { + const double cloudLimit = static_cast<double>(quotaIt->second.Limit.Value * + 10); // percent -> milliseconds + CPP_LOG_T("Create rate limiter resource for cloud with limit " << cloudLimit + << "ms"); + Send(RateLimiterControlPlaneServiceId(), + new TEvRateLimiter::TEvCreateResource(CloudId, cloudLimit)); + } else { + NYql::TIssues issues; + NYql::TIssue issue = + MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, + TStringBuilder() << "CPU quota for cloud \"" << CloudId + << "\" was not found"); + issues.AddIssue(issue); + CPP_LOG_W("Failed to get cpu quota for cloud " << CloudId); + ReplyWithError(issues); + } + } + + void Handle(TEvRateLimiter::TEvCreateResourceResponse::TPtr& ev) { + CPP_LOG_D( + "Create response from rate limiter service. Success: " << ev->Get()->Success); + if (ev->Get()->Success) { + QuoterResourceCreated = true; + SendRequestIfCan(); + } else { + NYql::TIssue issue("Failed to create rate limiter resource"); + for (const NYql::TIssue& i : ev->Get()->Issues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i)); + } + NYql::TIssues issues; + issues.AddIssue(issue); + ReplyWithError(issues); + } + } + + bool CanSendRequest() const override { + return (QuoterResourceCreated || !Quotas) && TBaseRequestActor::CanSendRequest(); + } +}; + +} +} diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp index 2ee716c84c9..3244cca00bf 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -17,6 +17,7 @@ #include <ydb/core/fq/libs/ydb/ydb.h> #include <ydb/core/fq/libs/config/yq_issue.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_proxy_request_actor.h> #include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h> #include <ydb/core/fq/libs/control_plane_proxy/actors/utils.h> #include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h> @@ -462,252 +463,7 @@ public: } }; -template<class TRequestProto, class TRequest, class TResponse, class TRequestProxy, class TResponseProxy> -class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>> { -protected: - using TBase = NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>>; - using TBase::SelfId; - using TBase::Send; - using TBase::PassAway; - using TBase::Become; - using TBase::Schedule; - - typename TRequestProxy::TPtr RequestProxy; - ::NFq::TControlPlaneProxyConfig Config; - TRequestProto RequestProto; - TString Scope; - TString FolderId; - TString User; - TString Token; - TActorId Sender; - ui32 Cookie; - TActorId ServiceId; - TRequestCounters Counters; - TInstant StartTime; - std::function<void(const TDuration&, bool /* isSuccess */, bool /* isTimeout */)> Probe; - TPermissions Permissions; - TString CloudId; - TString SubjectType; - const TMaybe<TQuotaMap> Quotas; - TTenantInfo::TPtr TenantInfo; - TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase; - ui32 RetryCount = 0; - bool ReplyWithResponseOnSuccess = true; - -public: - static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR"; - - explicit TRequestActor(typename TRequestProxy::TPtr requestProxy, - const ::NFq::TControlPlaneProxyConfig& config, - TActorId sender, - ui32 cookie, - const TString& scope, - const TString& folderId, - TRequestProto&& requestProto, - TString&& user, - TString&& token, - const TActorId& serviceId, - const TRequestCounters& counters, - const std::function<void(const TDuration&, bool, bool)>& probe, - TPermissions permissions, - const TString& cloudId, - const TString& subjectType, - TMaybe<TQuotaMap>&& quotas = Nothing(), - TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal>&& computeDatabase = Nothing(), - bool replyWithResponseOnSuccess = true) - : RequestProxy(requestProxy) - , Config(config) - , RequestProto(std::forward<TRequestProto>(requestProto)) - , Scope(scope) - , FolderId(folderId) - , User(std::move(user)) - , Token(std::move(token)) - , Sender(sender) - , Cookie(cookie) - , ServiceId(serviceId) - , Counters(counters) - , StartTime(TInstant::Now()) - , Probe(probe) - , Permissions(permissions) - , CloudId(cloudId) - , SubjectType(subjectType) - , Quotas(std::move(quotas)) - , ComputeDatabase(std::move(computeDatabase)) - , ReplyWithResponseOnSuccess(replyWithResponseOnSuccess) - { - Counters.IncInFly(); - } - -public: - - void Bootstrap() { - CPP_LOG_T("Request actor. Actor id: " << SelfId()); - Become(&TRequestActor::StateFunc, Config.RequestTimeout, new NActors::TEvents::TEvWakeup()); - Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); - OnBootstrap(); - } - virtual void OnBootstrap() {} - - STRICT_STFUNC(StateFunc, - cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); - hFunc(TResponse, Handle); - cFunc(TEvControlPlaneConfig::EvGetTenantInfoRequest, HandleRetry); - hFunc(TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle); - ) - - void HandleRetry() { - Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); - } - - void Handle(TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) { - TenantInfo = std::move(ev->Get()->TenantInfo); - if (TenantInfo) { - SendRequestIfCan(); - } else { - RetryCount++; - Schedule(Now() + Config.ConfigRetryPeriod * (1 << RetryCount), new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); - } - } - - void HandleTimeout() { - CPP_LOG_D("Request timeout. " << RequestProto.DebugString()); - NYql::TIssues issues; - NYql::TIssue issue = MakeErrorIssue(TIssuesIds::TIMEOUT, "Request timeout. Try repeating the request later"); - issues.AddIssue(issue); - Counters.IncTimeout(); - ReplyWithError(issues, true); - } - - void Handle(typename TResponse::TPtr& ev) { - auto& response = *ev->Get(); - ProcessResponse(response); - } - - template<typename T> - void ProcessResponse(const T& response) { - if (response.Issues) { - ReplyWithError(response.Issues); - } else { - ReplyWithSuccess(response.Result); - } - } - - template<typename T> requires requires (T t) { t.AuditDetails; } - void ProcessResponse(const T& response) { - if (response.Issues) { - ReplyWithError(response.Issues); - } else { - ReplyWithSuccess(response.Result, response.AuditDetails); - } - } - - void ReplyWithError(const NYql::TIssues& issues, bool isTimeout = false) { - const TDuration delta = TInstant::Now() - StartTime; - Counters.IncError(); - Probe(delta, false, isTimeout); - Send(Sender, new TResponseProxy(issues, SubjectType), 0, Cookie); - PassAway(); - } - - template <class... TArgs> - void ReplyWithSuccess(TArgs&&... args) { - const TDuration delta = TInstant::Now() - StartTime; - Counters.IncOk(); - Probe(delta, true, false); - if (ReplyWithResponseOnSuccess) { // constexpr - Send(Sender, new TResponseProxy(std::forward<TArgs>(args)..., SubjectType), 0, Cookie); - } else { - RequestProxy->Get()->Response = std::make_unique<TResponseProxy>(std::forward<TArgs>(args)..., SubjectType); - RequestProxy->Get()->ControlPlaneYDBOperationWasPerformed = true; - Send(RequestProxy->Forward(ControlPlaneProxyActorId())); - } - PassAway(); - } - - virtual bool CanSendRequest() const { - return bool(TenantInfo); - } - - void SendRequestIfCan() { - if (CanSendRequest()) { - Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas, TenantInfo, ComputeDatabase.GetOrElse({})), 0, Cookie); - } - } - - virtual ~TRequestActor() { - Counters.DecInFly(); - Counters.Common->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); - } -}; - -class TCreateQueryRequestActor : public TRequestActor<FederatedQuery::CreateQueryRequest, - TEvControlPlaneStorage::TEvCreateQueryRequest, - TEvControlPlaneStorage::TEvCreateQueryResponse, - TEvControlPlaneProxy::TEvCreateQueryRequest, - TEvControlPlaneProxy::TEvCreateQueryResponse> -{ - bool QuoterResourceCreated = false; - -public: - using TBaseRequestActor = TRequestActor<FederatedQuery::CreateQueryRequest, - TEvControlPlaneStorage::TEvCreateQueryRequest, - TEvControlPlaneStorage::TEvCreateQueryResponse, - TEvControlPlaneProxy::TEvCreateQueryRequest, - TEvControlPlaneProxy::TEvCreateQueryResponse>; - using TBaseRequestActor::TBaseRequestActor; - - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvRateLimiter::TEvCreateResourceResponse, Handle); - default: - return TBaseRequestActor::StateFunc(ev); - } - } - - void OnBootstrap() override { - Become(&TCreateQueryRequestActor::StateFunc); - if (Quotas) { - SendCreateRateLimiterResourceRequest(); - } else { - SendRequestIfCan(); - } - } - - void SendCreateRateLimiterResourceRequest() { - if (auto quotaIt = Quotas->find(QUOTA_CPU_PERCENT_LIMIT); quotaIt != Quotas->end()) { - const double cloudLimit = static_cast<double>(quotaIt->second.Limit.Value * 10); // percent -> milliseconds - CPP_LOG_T("Create rate limiter resource for cloud with limit " << cloudLimit << "ms"); - Send(RateLimiterControlPlaneServiceId(), new TEvRateLimiter::TEvCreateResource(CloudId, cloudLimit)); - } else { - NYql::TIssues issues; - NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, TStringBuilder() << "CPU quota for cloud \"" << CloudId << "\" was not found"); - issues.AddIssue(issue); - CPP_LOG_W("Failed to get cpu quota for cloud " << CloudId); - ReplyWithError(issues); - } - } - - void Handle(TEvRateLimiter::TEvCreateResourceResponse::TPtr& ev) { - CPP_LOG_D("Create response from rate limiter service. Success: " << ev->Get()->Success); - if (ev->Get()->Success) { - QuoterResourceCreated = true; - SendRequestIfCan(); - } else { - NYql::TIssue issue("Failed to create rate limiter resource"); - for (const NYql::TIssue& i : ev->Get()->Issues) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i)); - } - NYql::TIssues issues; - issues.AddIssue(issue); - ReplyWithError(issues); - } - } - - bool CanSendRequest() const override { - return (QuoterResourceCreated || !Quotas) && TBaseRequestActor::CanSendRequest(); - } -}; class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlaneProxyActor> { private: |