diff options
author | hor911 <hor911@ydb.tech> | 2022-07-12 13:34:58 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-07-12 13:34:58 +0300 |
commit | dac0ededbf65f30a1e52f01bd02e68eddf46ebb6 (patch) | |
tree | 2468c71ea8d0b6800c723208223a8c64dc8603d4 | |
parent | 8024ad9845857b76925a9179c32ab7b955e87f17 (diff) | |
download | ydb-dac0ededbf65f30a1e52f01bd02e68eddf46ebb6.tar.gz |
Use grpc::Status::details
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 16 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 19 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_rpc/local_rpc.h | 10 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/schema.h | 3 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp | 4 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp | 9 | ||||
-rw-r--r-- | ydb/core/yq/libs/init/init.cpp | 4 | ||||
-rw-r--r-- | ydb/core/yq/libs/quota_manager/events/events.h | 55 | ||||
-rw-r--r-- | ydb/core/yq/libs/quota_manager/quota_manager.cpp | 82 |
10 files changed, 162 insertions, 42 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index 5bd8d3902b..daace3e5e9 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -190,13 +190,13 @@ public: WriteByteDataOk(resp); } - void ReplyError(grpc::StatusCode code, const TString& msg) override { - FinishGrpcStatus(code, msg, false); + void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) override { + FinishGrpcStatus(code, msg, details, false); } void ReplyUnauthenticated(const TString& in) override { const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in; - FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false); + FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, "", false); } void SetNextReplyCallback(TOnNextReply&& cb) override { @@ -302,7 +302,7 @@ private: } } - void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) { + void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, const TString& details, bool urgent) { Y_VERIFY(code != grpc::OK); if (code == grpc::StatusCode::UNAUTHENTICATED) { Counters_->CountNotAuthenticated(); @@ -315,16 +315,16 @@ private: Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); StateFunc_ = &TThis::SetFinishError; TOut resp; - Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag()); + Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg, details), GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); - auto cb = [this, code, msg]() { + auto cb = [this, code, msg, details]() { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" " (pushed to grpc)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); StateFunc_ = &TThis::SetFinishError; - StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag()); + StreamWriter_->Finish(grpc::Status(code, msg, details), GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), urgent); } @@ -392,7 +392,7 @@ private: } else { //This request has not been counted SkipUpdateCountersOnError = true; - FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true); + FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", "", true); } return true; } diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index fcfce1c181..8827d028b3 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -68,7 +68,7 @@ public: virtual void ReplyUnauthenticated(const TString& in) = 0; //! Send grpc error - virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0; + virtual void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details = "") = 0; //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise virtual TInstant Deadline() const = 0; diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index e45beccd1a..6f236c993b 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -255,7 +255,7 @@ public: // Reply using YDB status code virtual void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) = 0; // Reply using "transport error code" - virtual void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "") = 0; + virtual void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") = 0; // Return address of the peer virtual TString GetPeerName() const = 0; // Return deadile of request execution, calculated from client timeout by grpc @@ -375,7 +375,6 @@ public: virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) = 0; -private: virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0; }; @@ -484,7 +483,7 @@ public: return false; } - void ReplyWithRpcStatus(grpc::StatusCode, const TString&) override { + void ReplyWithRpcStatus(grpc::StatusCode, const TString&, const TString&) override { Y_FAIL("Unimplemented"); } @@ -620,7 +619,7 @@ public: return Ctx_->GetAuthState(); } - void ReplyWithRpcStatus(grpc::StatusCode, const TString&) override { + void ReplyWithRpcStatus(grpc::StatusCode, const TString&, const TString&) override { Y_FAIL("Unimplemented"); } @@ -637,14 +636,14 @@ public: Ctx_->Attach(TActorId()); TResponse resp; FillYdbStatus(resp, IssueManager_.GetIssues(), Ydb::StatusIds::UNAVAILABLE); - Ctx_->WriteAndFinish(std::move(resp), grpc::Status()); + Ctx_->WriteAndFinish(std::move(resp), grpc::Status::OK); } void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { Ctx_->Attach(TActorId()); TResponse resp; FillYdbStatus(resp, IssueManager_.GetIssues(), status); - Ctx_->WriteAndFinish(std::move(resp), grpc::Status()); + Ctx_->WriteAndFinish(std::move(resp), grpc::Status::OK); } void RaiseIssue(const NYql::TIssue& issue) override { @@ -887,8 +886,8 @@ public: return Ctx_->GetAuthState(); } - void ReplyWithRpcStatus(grpc::StatusCode code, const TString& reason) override { - Ctx_->ReplyError(code, reason); + void ReplyWithRpcStatus(grpc::StatusCode code, const TString& reason, const TString& details) override { + Ctx_->ReplyError(code, reason, details); } void ReplyUnauthenticated(const TString& in) override { @@ -1074,6 +1073,10 @@ public: Y_FAIL("unimplemented"); } + void ReplyGrpcError(grpc::StatusCode code, const TString& msg, const TString& details = "") { + Ctx_->ReplyError(code, msg, details); + } + private: void Reply(NProtoBuf::Message *resp, ui32 status) override { if (RespHook) { diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index a7c3d378fa..b39996b84f 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -144,13 +144,11 @@ public: void SetRuHeader(ui64) override {} // Unimplemented methods - void ReplyWithRpcStatus(grpc::StatusCode, const TString& msg = "") override { - Y_UNUSED(msg); + void ReplyWithRpcStatus(grpc::StatusCode, const TString&, const TString&) override { Y_FAIL("Unimplemented for local rpc"); } - void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&& cb) override { - Y_UNUSED(cb); + void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&&) override { Y_FAIL("Unimplemented for local rpc"); } @@ -158,9 +156,7 @@ public: Y_FAIL("Unimplemented for local rpc"); } - virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override { - Y_UNUSED(in); - Y_UNUSED(status); + virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode) override { Y_FAIL("Unimplemented for local rpc"); } diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h index 344be98e12..64dfa936f7 100644 --- a/ydb/core/yq/libs/control_plane_storage/schema.h +++ b/ydb/core/yq/libs/control_plane_storage/schema.h @@ -74,6 +74,7 @@ namespace NYq { #define SUBJECT_TYPE_COLUMN_NAME "subject_type" #define SUBJECT_ID_COLUMN_NAME "subject_id" #define METRIC_NAME_COLUMN_NAME "metric_name" -#define METRIC_VALUE_COLUMN_NAME "metric_value" +#define METRIC_LIMIT_COLUMN_NAME "metric_limit" +#define METRIC_USAGE_COLUMN_NAME "metric_usage" } // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 4c8dc0339d..94c6280621 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -46,6 +46,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() { CreateResultSetsTable(); CreateJobsTable(); CreateNodesTable(); + // CreateQuotasTable(); // not yet Become(&TThis::StateFunc); } @@ -239,7 +240,8 @@ void TYdbControlPlaneStorageActor::CreateQuotasTable() .AddNullableColumn(SUBJECT_TYPE_COLUMN_NAME, EPrimitiveType::String) .AddNullableColumn(SUBJECT_ID_COLUMN_NAME, EPrimitiveType::String) .AddNullableColumn(METRIC_NAME_COLUMN_NAME, EPrimitiveType::String) - .AddNullableColumn(METRIC_VALUE_COLUMN_NAME, EPrimitiveType::Int64) + .AddNullableColumn(METRIC_LIMIT_COLUMN_NAME, EPrimitiveType::Int64) + .AddNullableColumn(METRIC_USAGE_COLUMN_NAME, EPrimitiveType::Int64) .SetPrimaryKeyColumns({SUBJECT_TYPE_COLUMN_NAME, SUBJECT_ID_COLUMN_NAME, METRIC_NAME_COLUMN_NAME}) .Build(); diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp index 1dcfd05456..630cc72aa5 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp @@ -21,15 +21,10 @@ namespace NYq { -struct TCloudAggregator { - TAtomic Started = 0; - TAtomic Finished = 0; -}; - using TQuotaCountExecuter = TDbExecuter<THashMap<TString, ui32>>; -void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev) -{ +void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev) { + if (ev->Get()->SubjectType != SUBJECT_TYPE_CLOUD || ev->Get()->MetricName != QUOTA_COUNT_LIMIT) { Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(ev->Get()->SubjectType, ev->Get()->SubjectId, ev->Get()->MetricName, 0)); } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 58827f0e6c..c8c1c30426 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -281,8 +281,8 @@ void Init( /* yqSharedResources, */ serviceCounters.Counters, { - TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 20_MB), - TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_COUNT_LIMIT, 100, NYq::ControlPlaneStorageServiceActorId()) + TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 20_MB, 2_GB), + TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_COUNT_LIMIT, 100, 200, NYq::ControlPlaneStorageServiceActorId()) }); actorRegistrator(NYq::MakeQuotaServiceActorId(), quotaService); } diff --git a/ydb/core/yq/libs/quota_manager/events/events.h b/ydb/core/yq/libs/quota_manager/events/events.h index 9187b1ba15..1a0acc2088 100644 --- a/ydb/core/yq/libs/quota_manager/events/events.h +++ b/ydb/core/yq/libs/quota_manager/events/events.h @@ -18,9 +18,11 @@ constexpr auto QUOTA_COUNT_LIMIT = "fq.queryLimit.count"; struct TQuotaInfo { ui64 DefaultLimit; + ui64 HardLimit; NActors::TActorId UsageUpdater; - TQuotaInfo(ui64 defaultLimit, NActors::TActorId usageUpdater = {}) + TQuotaInfo(ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId usageUpdater = {}) : DefaultLimit(defaultLimit) + , HardLimit(hardLimit) , UsageUpdater(usageUpdater) { } @@ -30,10 +32,10 @@ struct TQuotaDescription { TString SubjectType; TString MetricName; TQuotaInfo Info; - TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit, NActors::TActorId usageUpdater = {}) + TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId usageUpdater = {}) : SubjectType(subjectType) , MetricName(metricName) - , Info(defaultLimit, usageUpdater) + , Info(defaultLimit, hardLimit, usageUpdater) { } }; @@ -42,6 +44,8 @@ struct TQuotaUsage { ui64 Limit; TMaybe<ui64> Usage; TInstant UpdatedAt; + TQuotaUsage() = default; + TQuotaUsage(const TQuotaUsage&) = default; TQuotaUsage(ui64 limit) : Limit(limit), UpdatedAt(TInstant::Zero()) {} TQuotaUsage(ui64 limit, ui64 usage, const TInstant& updatedAt = Now()) : Limit(limit), Usage(usage), UpdatedAt(updatedAt) {} @@ -57,6 +61,10 @@ struct TEvQuotaService { EvQuotaChangeNotification, EvQuotaUsageRequest, EvQuotaUsageResponse, + EvQuotaSetRequest, + EvQuotaSetResponse, + EvQuotaLimitChangeRequest, + EvQuotaLimitChangeResponse, EvEnd, }; @@ -73,6 +81,8 @@ struct TEvQuotaService { // Quota request never fails, if no quota exist (i.e. SubjectType is incorrect) empty list will be returned struct TQuotaGetResponse : public NActors::TEventLocal<TQuotaGetResponse, EvQuotaGetResponse> { + TString SubjectType; + TString SubjectId; TQuotaMap Quotas; }; @@ -103,6 +113,45 @@ struct TEvQuotaService { : SubjectType(subjectType), SubjectId(subjectId), MetricName(metricName), Usage(usage) {} }; + + struct TQuotaSetRequest : public NActors::TEventLocal<TQuotaSetRequest, EvQuotaSetRequest> { + TString SubjectType; + TString SubjectId; + THashMap<TString, ui64> Limits; + TQuotaSetRequest(const TString& subjectType, const TString& subjectId) + : SubjectType(subjectType), SubjectId(subjectId) + {} + }; + + struct TQuotaSetResponse : public NActors::TEventLocal<TQuotaSetResponse, EvQuotaSetResponse> { + TString SubjectType; + TString SubjectId; + THashMap<TString, ui64> Limits; + TQuotaSetResponse(const TString& subjectType, const TString& subjectId) + : SubjectType(subjectType), SubjectId(subjectId) + {} + }; + + struct TQuotaLimitChangeRequest : public NActors::TEventLocal<TQuotaLimitChangeRequest, EvQuotaLimitChangeRequest> { + TString SubjectType; + TString SubjectId; + TQuotaUsage Quota; + ui64 LimitRequested; + TQuotaLimitChangeRequest(const TString& subjectType, const TString& subjectId) + : SubjectType(subjectType), SubjectId(subjectId) + {} + }; + + struct TQuotaLimitChangeResponse : public NActors::TEventLocal<TQuotaLimitChangeResponse, EvQuotaLimitChangeResponse> { + TString SubjectType; + TString SubjectId; + TQuotaUsage Quota; + ui64 LimitRequested; + TQuotaLimitChangeResponse(const TString& subjectType, const TString& subjectId) + : SubjectType(subjectType), SubjectId(subjectId) + {} + }; + }; } /* NYq */ diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp index 64f190ee86..94eb9529a8 100644 --- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp +++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp @@ -72,16 +72,27 @@ private: hFunc(TEvQuotaService::TQuotaGetRequest, Handle) hFunc(TEvQuotaService::TQuotaChangeNotification, Handle) hFunc(TEvQuotaService::TQuotaUsageResponse, Handle) + hFunc(TEvQuotaService::TQuotaSetRequest, Handle) ); void Handle(TEvQuotaService::TQuotaGetRequest::TPtr& ev) { auto subjectType = ev->Get()->SubjectType; auto subjectId = ev->Get()->SubjectId; auto& subjectMap = QuotaCacheMap[subjectType]; - auto it = subjectMap.find(subjectId); + auto& infoMap = QuotaInfoMap[subjectType]; + if (subjectId.empty()) { // Just get defaults + auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>(); + response->SubjectType = subjectType; + for (auto& it : infoMap) { + response->Quotas.emplace(it.first, TQuotaUsage(it.second.DefaultLimit)); + } + Send(ev->Sender, response.Release()); + return; + } + + auto it = subjectMap.find(subjectId); bool pended = false; - auto& infoMap = QuotaInfoMap[subjectType]; // Load into cache, if needed if (it == subjectMap.end()) { @@ -94,13 +105,13 @@ private: } } } - // 2. Load from DB (TBD) - // 3. Append defaults + // 2. Append defaults for (auto& it : infoMap) { if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) { cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit)); } } + // 3. Load from DB subjectMap.emplace(subjectId, cache); } @@ -131,6 +142,8 @@ private: if (!pended) { auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>(); + response->SubjectType = subjectType; + response->SubjectId = subjectId; for (auto it : cache.UsageMap) { response->Quotas.emplace(it.first, it.second.Usage); } @@ -176,6 +189,8 @@ private: if (cache.PendingUsage.size() == 0) { for (auto& itR : cache.PendingRequests) { auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>(); + response->SubjectType = subjectType; + response->SubjectId = subjectId; for (auto it : cache.UsageMap) { response->Quotas.emplace(it.first, it.second.Usage); } @@ -185,6 +200,65 @@ private: } } + void Handle(TEvQuotaService::TQuotaSetRequest::TPtr& ev) { + auto subjectType = ev->Get()->SubjectType; + auto subjectId = ev->Get()->SubjectId; + auto& subjectMap = QuotaCacheMap[subjectType]; + auto& infoMap = QuotaInfoMap[subjectType]; + + auto it = subjectMap.find(subjectId); + + // Load into cache, if needed + if (it == subjectMap.end()) { + TQuotaCache cache; + // 1. Load from Config + for (const auto& quota : Config.GetQuotas()) { + if (quota.GetSubjectType() == subjectType && quota.GetSubjectId() == subjectId) { + for (const auto& limit : quota.GetLimit()) { + cache.UsageMap.emplace(limit.GetName(), TQuotaCachedUsage(limit.GetLimit())); + } + } + } + // 2. Load from DB (TBD) + // 3. Append defaults + for (auto& it : infoMap) { + if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) { + cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit)); + } + } + bool _; + std::tie(it, _) = subjectMap.emplace(subjectId, cache); + } + + auto& cache = it->second; + + for (auto metricLimit : ev->Get()->Limits) { + auto& name = metricLimit.first; + auto it = cache.UsageMap.find(name); + if (it != cache.UsageMap.end()) { + auto& cached = it->second; + auto limit = metricLimit.second; + if (cached.Usage.Limit == 0 || limit == 0 || limit > cached.Usage.Limit) { + // check hard limit only if quota is increased + auto itI = infoMap.find(name); + if (itI != infoMap.end()) { + auto& info = itI->second; + if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) { + limit = info.HardLimit; + } + } + } + cached.Usage.Limit = limit; + } + } + + auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId); + for (auto it : cache.UsageMap) { + response->Limits.emplace(it.first, it.second.Usage.Limit); + } + Send(ev->Sender, response.Release()); + } + NConfig::TQuotasManagerConfig Config; const ::NMonitoring::TDynamicCounterPtr ServiceCounters; THashMap<TString /* SubjectType */, THashMap<TString /* MetricName */, TQuotaInfo>> QuotaInfoMap; |