aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-05-05 12:42:38 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-05-05 12:42:38 +0300
commit49d112cffeae9c2dbfcadaeb7083dcd238ec646b (patch)
treef6b1d053bb293961c291a87777c669b28fba0a48
parent304f49aa588960c10859c290d183a4a90c10889f (diff)
downloadydb-49d112cffeae9c2dbfcadaeb7083dcd238ec646b.tar.gz
Quota for result set size (fq.queryResultLimit.bytes)
ref:a35d645f518bb794fc73d0cadbfa9841ec196970
-rw-r--r--CMakeLists.darwin.txt2
-rw-r--r--CMakeLists.linux.txt2
-rw-r--r--ydb/core/protos/services.proto1
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp3
-rw-r--r--ydb/core/yq/libs/actors/proxy.h4
-rw-r--r--ydb/core/yq/libs/actors/result_writer.cpp21
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp2
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.cpp4
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.h4
-rw-r--r--ydb/core/yq/libs/actors/task_get.cpp1
-rw-r--r--ydb/core/yq/libs/config/protos/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/config/protos/quotas_manager.proto23
-rw-r--r--ydb/core/yq/libs/config/protos/yq_config.proto2
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp113
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h2
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/events/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/events/events.h6
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/events.h6
-rw-r--r--ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp3
-rw-r--r--ydb/core/yq/libs/events/event_subspace.h1
-rw-r--r--ydb/core/yq/libs/init/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/init/init.cpp12
-rw-r--r--ydb/core/yq/libs/protos/yq_private.proto1
-rw-r--r--ydb/core/yq/libs/quota_manager/CMakeLists.txt28
-rw-r--r--ydb/core/yq/libs/quota_manager/events/CMakeLists.txt18
-rw-r--r--ydb/core/yq/libs/quota_manager/events/events.cpp1
-rw-r--r--ydb/core/yq/libs/quota_manager/events/events.h65
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.cpp78
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.h20
-rw-r--r--ydb/core/yq/libs/test_connection/events/events.h5
-rw-r--r--ydb/library/folder_service/mock/mock_folder_service.cpp10
34 files changed, 395 insertions, 50 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 91b10039eb..c858f404b6 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -661,6 +661,7 @@ add_subdirectory(ydb/library/yql/providers/dq/worker_manager/interface)
add_subdirectory(ydb/core/yq/libs/common)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/events)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/proto)
+add_subdirectory(ydb/core/yq/libs/quota_manager/events)
add_subdirectory(ydb/core/yq/libs/control_plane_storage)
add_subdirectory(library/cpp/protobuf/interop)
add_subdirectory(ydb/core/yq/libs/config)
@@ -806,6 +807,7 @@ add_subdirectory(ydb/core/yq/libs/read_rule)
add_subdirectory(ydb/core/yq/libs/tasks_packer)
add_subdirectory(ydb/core/yq/libs/health)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)
+add_subdirectory(ydb/core/yq/libs/quota_manager)
add_subdirectory(ydb/core/yq/libs/test_connection)
add_subdirectory(ydb/core/yq/libs/test_connection/events)
add_subdirectory(ydb/library/yql/providers/solomon/async_io)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index da3fa70606..0a59dbb7c3 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -741,6 +741,7 @@ add_subdirectory(ydb/library/yql/providers/dq/worker_manager/interface)
add_subdirectory(ydb/core/yq/libs/common)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/events)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/proto)
+add_subdirectory(ydb/core/yq/libs/quota_manager/events)
add_subdirectory(ydb/core/yq/libs/control_plane_storage)
add_subdirectory(library/cpp/protobuf/interop)
add_subdirectory(ydb/core/yq/libs/config)
@@ -886,6 +887,7 @@ add_subdirectory(ydb/core/yq/libs/read_rule)
add_subdirectory(ydb/core/yq/libs/tasks_packer)
add_subdirectory(ydb/core/yq/libs/health)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)
+add_subdirectory(ydb/core/yq/libs/quota_manager)
add_subdirectory(ydb/core/yq/libs/test_connection)
add_subdirectory(ydb/core/yq/libs/test_connection/events)
add_subdirectory(ydb/library/yql/providers/solomon/async_io)
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 6c8ecdf1b1..c25a11826c 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -302,6 +302,7 @@ enum EServiceKikimr {
YQ_AUDIT_EVENT_SENDER = 1151;
YQ_HEALTH = 1152;
FQ_INTERNAL_SERVICE = 1153;
+ FQ_QUOTA_SERVICE = 1154;
// 1024 - 1099 is reserved for nbs
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index 2c86c939e7..aec8826f62 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -329,7 +329,8 @@ private:
NProtoInterop::CastFromProto(task.deadline()),
ClientCounters,
createdAt,
- TenantName);
+ TenantName,
+ task.result_limit());
auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params)));
diff --git a/ydb/core/yq/libs/actors/proxy.h b/ydb/core/yq/libs/actors/proxy.h
index d489740395..f8d176c475 100644
--- a/ydb/core/yq/libs/actors/proxy.h
+++ b/ydb/core/yq/libs/actors/proxy.h
@@ -72,8 +72,8 @@ NActors::IActor* CreateResultWriter(
const TResultId& resultId,
const TVector<TString>& columns,
const TString& traceId,
- const TInstant& deadline
- );
+ const TInstant& deadline,
+ ui64 resultBytesLimit);
NActors::IActor* CreatePingerActor(
const TString& tenantName,
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp
index 0888d112b6..62f47f4c86 100644
--- a/ydb/core/yq/libs/actors/result_writer.cpp
+++ b/ydb/core/yq/libs/actors/result_writer.cpp
@@ -42,14 +42,20 @@ public:
const TResultId& resultId,
const TVector<TString>& columns,
const TString& traceId,
- const TInstant& deadline)
+ const TInstant& deadline,
+ ui64 resultBytesLimit)
: ExecuterId(executerId)
, ResultBuilder(MakeHolder<TProtoBuilder>(resultType, columns))
, ResultId({resultId})
, TraceId(traceId)
, Deadline(deadline)
+ , ResultBytesLimit(resultBytesLimit)
, InternalServiceId(MakeInternalServiceActorId())
- { }
+ {
+ if (!ResultBytesLimit) {
+ ResultBytesLimit = 20_MB;
+ }
+ }
static constexpr char ActorName[] = "YQ_RESULT_WRITER";
@@ -214,9 +220,9 @@ private:
FreeSpace -= data.GetRaw().size();
OccupiedSpace += data.GetRaw().size();
- if (OccupiedSpace > SpaceLimitPerQuery) {
+ if (OccupiedSpace > ResultBytesLimit) {
TIssues issues;
- issues.AddIssue(TStringBuilder() << "Can not write results with size > " << SpaceLimitPerQuery / (1024 * 1024) << "_MB");
+ issues.AddIssue(TStringBuilder() << "Can not write results with size > " << ResultBytesLimit << " byte(s)");
SendIssuesAndSetErrorFlag(issues);
return;
}
@@ -315,7 +321,7 @@ private:
bool HasError = false;
bool Finished = false;
NYql::TIssues Issues;
- ui64 SpaceLimitPerQuery = 20_MB;
+ ui64 ResultBytesLimit;
ui64 OccupiedSpace = 0;
TVector<Yq::Private::WriteTaskResultRequest> ResultChunks;
@@ -330,9 +336,10 @@ NActors::IActor* CreateResultWriter(
const TResultId& resultId,
const TVector<TString>& columns,
const TString& traceId,
- const TInstant& deadline)
+ const TInstant& deadline,
+ ui64 resultBytesLimit)
{
- return new TResultWriter(executerId, resultType, resultId, columns, traceId, deadline);
+ return new TResultWriter(executerId, resultType, resultId, columns, traceId, deadline, resultBytesLimit);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 2619f1e223..29f287b8f6 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -826,7 +826,7 @@ private:
resultId = NActors::TActivationContext::Register(
CreateResultWriter(
ExecuterId, dqGraphParams.GetResultType(),
- writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline));
+ writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ResultBytesLimit));
} else {
LOG_D("ResultWriter was NOT CREATED since ResultType is empty");
resultId = ExecuterId;
diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp
index 12bf28a6ca..c00466ac4c 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.cpp
+++ b/ydb/core/yq/libs/actors/run_actor_params.cpp
@@ -47,7 +47,8 @@ TRunActorParams::TRunActorParams(
const TInstant& deadline,
const NMonitoring::TDynamicCounterPtr& clientCounters,
TInstant createdAt,
- const TString& tenantName
+ const TString& tenantName,
+ uint64_t resultBytesLimit
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -92,6 +93,7 @@ TRunActorParams::TRunActorParams(
, ClientCounters(clientCounters)
, CreatedAt(createdAt)
, TenantName(tenantName)
+ , ResultBytesLimit(resultBytesLimit)
{
}
diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h
index b04006f138..fdc63ce8df 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.h
+++ b/ydb/core/yq/libs/actors/run_actor_params.h
@@ -62,7 +62,8 @@ struct TRunActorParams { // TODO2 : Change name
const TInstant& deadline,
const NMonitoring::TDynamicCounterPtr& clientCounters,
TInstant createdAt,
- const TString& tenantName
+ const TString& tenantName,
+ uint64_t resultBytesLimit
);
TRunActorParams(const TRunActorParams& params) = default;
@@ -114,6 +115,7 @@ struct TRunActorParams { // TODO2 : Change name
const NMonitoring::TDynamicCounterPtr ClientCounters;
const TInstant CreatedAt;
const TString TenantName;
+ uint64_t ResultBytesLimit;
};
} /* NYq */
diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp
index eb54ac7fa7..6cc8bb922a 100644
--- a/ydb/core/yq/libs/actors/task_get.cpp
+++ b/ydb/core/yq/libs/actors/task_get.cpp
@@ -166,6 +166,7 @@ private:
newTask->set_query_name(task.Query.content().name());
*newTask->mutable_deadline() = NProtoInterop::CastToProto(task.Deadline);
newTask->mutable_disposition()->CopyFrom(task.Internal.disposition());
+ newTask->set_result_limit(task.Internal.result_limit());
THashMap<TString, TString> accountIdSignatures;
for (const auto& connection: task.Internal.connection()) {
diff --git a/ydb/core/yq/libs/config/protos/CMakeLists.txt b/ydb/core/yq/libs/config/protos/CMakeLists.txt
index 136150ad26..ce03e7b21f 100644
--- a/ydb/core/yq/libs/config/protos/CMakeLists.txt
+++ b/ydb/core/yq/libs/config/protos/CMakeLists.txt
@@ -32,6 +32,7 @@ target_proto_messages(libs-config-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/pinger.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/private_api.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/private_proxy.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/quotas_manager.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/read_actors_factory.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/resource_manager.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/storage.proto
diff --git a/ydb/core/yq/libs/config/protos/quotas_manager.proto b/ydb/core/yq/libs/config/protos/quotas_manager.proto
new file mode 100644
index 0000000000..b1e36cf5fc
--- /dev/null
+++ b/ydb/core/yq/libs/config/protos/quotas_manager.proto
@@ -0,0 +1,23 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package NYq.NConfig;
+option java_package = "ru.yandex.kikimr.proto";
+
+////////////////////////////////////////////////////////////
+
+message TQuotaLimit {
+ string Name = 1;
+ uint64 Limit = 2;
+}
+
+message TQuotaList {
+ string SubjectType = 1;
+ string SubjectId = 2;
+ repeated TQuotaLimit Limit = 3;
+}
+
+message TQuotasManagerConfig {
+ bool Enabled = 1;
+ repeated TQuotaList Quotas = 2;
+}
diff --git a/ydb/core/yq/libs/config/protos/yq_config.proto b/ydb/core/yq/libs/config/protos/yq_config.proto
index 8dd6dedc53..74344d2ccf 100644
--- a/ydb/core/yq/libs/config/protos/yq_config.proto
+++ b/ydb/core/yq/libs/config/protos/yq_config.proto
@@ -17,6 +17,7 @@ import "ydb/core/yq/libs/config/protos/pending_fetcher.proto";
import "ydb/core/yq/libs/config/protos/pinger.proto";
import "ydb/core/yq/libs/config/protos/private_api.proto";
import "ydb/core/yq/libs/config/protos/private_proxy.proto";
+import "ydb/core/yq/libs/config/protos/quotas_manager.proto";
import "ydb/core/yq/libs/config/protos/read_actors_factory.proto";
import "ydb/core/yq/libs/config/protos/resource_manager.proto";
import "ydb/core/yq/libs/config/protos/test_connection.proto";
@@ -46,4 +47,5 @@ message TConfig {
TTestConnectionConfig TestConnection = 18;
TReadActorsFactoryConfig ReadActorsFactoryConfig = 19;
THealthConfig Health = 20;
+ TQuotasManagerConfig QuotasManager = 21;
}
diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
index 74b21e74e3..03dfec68fc 100644
--- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
#include <ydb/core/yq/libs/control_plane_storage/util.h>
+#include <ydb/core/yq/libs/quota_manager/quota_manager.h>
#include <ydb/core/yq/libs/test_connection/test_connection.h>
#include <ydb/core/yq/libs/test_connection/events/events.h>
#include <ydb/core/yq/libs/ydb/util.h>
@@ -79,12 +80,55 @@ private:
using TRequestCountersPtr = TIntrusivePtr<TRequestCounters>;
template<class TEventRequest, class TResponseProxy>
+class TGetQuotaActor : public NActors::TActorBootstrapped<TGetQuotaActor<TEventRequest, TResponseProxy>> {
+ using TBase = NActors::TActorBootstrapped<TGetQuotaActor<TEventRequest, TResponseProxy>>;
+ using TBase::SelfId;
+ using TBase::Send;
+ using TBase::PassAway;
+ using TBase::Become;
+
+ TActorId Sender;
+ TEventRequest Event;
+ ui32 Cookie;
+
+public:
+ TGetQuotaActor(TActorId sender, TEventRequest event, ui32 cookie)
+ : Sender(sender)
+ , Event(event)
+ , Cookie(cookie)
+ {}
+
+ static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_GET_QUOTA";
+
+ void Bootstrap() {
+ CPP_LOG_T("Get quotas bootstrap. Cloud id: " << Event->Get()->CloudId << " Actor id: " << SelfId());
+ Become(&TGetQuotaActor::StateFunc);
+ auto request = std::make_unique<TEvQuotaService::TQuotaGetRequest>();
+ request->SubjectType = SUBJECT_TYPE_CLOUD;
+ request->SubjectId = Event->Get()->CloudId;
+ Send(MakeQuotaServiceActorId(), request.release(), 0, 0);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvQuotaService::TQuotaGetResponse, Handle);
+ )
+
+ void Handle(TEvQuotaService::TQuotaGetResponse::TPtr& ev) {
+ Event->Get()->Quotas = ev->Get()->Quotas;
+ CPP_LOG_T("Cloud id: " << Event->Get()->CloudId << " Quota count: " << Event->Get()->Quotas.size());
+ TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId()));
+ PassAway();
+ }
+};
+
+template<class TEventRequest, class TResponseProxy>
class TResolveFolderActor : public NActors::TActorBootstrapped<TResolveFolderActor<TEventRequest, TResponseProxy>> {
using TBase = NActors::TActorBootstrapped<TResolveFolderActor<TEventRequest, TResponseProxy>>;
using TBase::SelfId;
using TBase::Send;
using TBase::PassAway;
using TBase::Become;
+ using TBase::Register;
NConfig::TControlPlaneProxyConfig Config;
TActorId Sender;
@@ -95,6 +139,7 @@ class TResolveFolderActor : public NActors::TActorBootstrapped<TResolveFolderAct
TEventRequest Event;
ui32 Cookie;
TInstant StartTime;
+ bool GetQuotas;
public:
TResolveFolderActor(const TRequestCountersPtr& counters,
@@ -102,7 +147,7 @@ public:
const TString& folderId, const TString& token,
const std::function<void(const TDuration&, bool, bool)>& probe,
TEventRequest event,
- ui32 cookie)
+ ui32 cookie, bool getQuotas)
: Config(config)
, Sender(sender)
, Counters(counters)
@@ -112,6 +157,7 @@ public:
, Event(event)
, Cookie(cookie)
, StartTime(TInstant::Now())
+ , GetQuotas(getQuotas)
{}
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_RESOLVE_FOLDER";
@@ -169,7 +215,12 @@ public:
TString cloudId = response.folder().cloud_id();
Event->Get()->CloudId = cloudId;
CPP_LOG_T("Cloud id: " << cloudId << " Folder id: " << FolderId);
- TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId()));
+
+ if (GetQuotas) {
+ Register(new TGetQuotaActor<TEventRequest, TResponseProxy>(Sender, Event, Cookie));
+ } else {
+ TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId()));
+ }
PassAway();
}
};
@@ -196,6 +247,7 @@ class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestP
std::function<void(const TDuration&, bool, bool)> Probe;
TPermissions Permissions;
TString CloudId;
+ TQuotaMap Quotas;
public:
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR";
@@ -207,7 +259,7 @@ public:
const TRequestCountersPtr& counters,
const std::function<void(const TDuration&, bool, bool)>& probe,
TPermissions permissions,
- const TString& cloudId)
+ const TString& cloudId, const TQuotaMap& quotas = {})
: Config(config)
, RequestProto(std::forward<TRequestProto>(requestProto))
, Scope(scope)
@@ -222,6 +274,7 @@ public:
, Probe(probe)
, Permissions(permissions)
, CloudId(cloudId)
+ , Quotas(quotas)
{
Counters->InFly->Inc();
FillDefaultParameters(Config);
@@ -232,7 +285,7 @@ public:
void Bootstrap() {
CPP_LOG_T("Request actor. Actor id: " << SelfId());
Become(&TRequestActor::StateFunc, GetDuration(Config.GetRequestTimeout(), TDuration::Seconds(30)), new NActors::TEvents::TEvWakeup());
- Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions), 0, Cookie);
+ Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas), 0, Cookie);
}
STRICT_STFUNC(StateFunc,
@@ -417,11 +470,13 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane
TCounters Counters;
NConfig::TControlPlaneProxyConfig Config;
+ bool GetQuotas;
public:
- TControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const NMonitoring::TDynamicCounterPtr& counters)
+ TControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const NMonitoring::TDynamicCounterPtr& counters, bool getQuotas)
: Counters(counters)
, Config(config)
+ , GetQuotas(getQuotas)
{
}
@@ -531,7 +586,7 @@ private:
TEvControlPlaneProxy::TEvCreateQueryResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -562,7 +617,7 @@ private:
std::move(request), std::move(user), std::move(token),
ControlPlaneStorageServiceActorId(),
requestCounters,
- probe, ExtractPermissions(ev, availablePermissions), cloudId));
+ probe, ExtractPermissions(ev, availablePermissions), cloudId, ev->Get()->Quotas));
}
void Handle(TEvControlPlaneProxy::TEvListQueriesRequest::TPtr& ev) {
@@ -587,7 +642,7 @@ private:
TEvControlPlaneProxy::TEvListQueriesResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -643,7 +698,7 @@ private:
TEvControlPlaneProxy::TEvDescribeQueryResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -700,7 +755,7 @@ private:
TEvControlPlaneProxy::TEvGetQueryStatusResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -756,7 +811,7 @@ private:
TEvControlPlaneProxy::TEvModifyQueryResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -815,7 +870,7 @@ private:
TEvControlPlaneProxy::TEvDeleteQueryResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -871,7 +926,7 @@ private:
TEvControlPlaneProxy::TEvControlQueryResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -930,7 +985,7 @@ private:
TEvControlPlaneProxy::TEvGetResultDataResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -986,7 +1041,7 @@ private:
TEvControlPlaneProxy::TEvListJobsResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1042,7 +1097,7 @@ private:
TEvControlPlaneProxy::TEvDescribeJobResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1097,7 +1152,7 @@ private:
TEvControlPlaneProxy::TEvCreateConnectionResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1155,7 +1210,7 @@ private:
TEvControlPlaneProxy::TEvListConnectionsResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1211,7 +1266,7 @@ private:
TEvControlPlaneProxy::TEvDescribeConnectionResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1267,7 +1322,7 @@ private:
TEvControlPlaneProxy::TEvModifyConnectionResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1328,7 +1383,7 @@ private:
TEvControlPlaneProxy::TEvDeleteConnectionResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1383,7 +1438,7 @@ private:
TEvControlPlaneProxy::TEvTestConnectionResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1437,7 +1492,7 @@ private:
TEvControlPlaneProxy::TEvCreateBindingResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1490,7 +1545,7 @@ private:
TEvControlPlaneProxy::TEvListBindingsResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1546,7 +1601,7 @@ private:
TEvControlPlaneProxy::TEvDescribeBindingResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1602,7 +1657,7 @@ private:
TEvControlPlaneProxy::TEvModifyBindingResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1658,7 +1713,7 @@ private:
TEvControlPlaneProxy::TEvDeleteBindingResponse>
(Counters.GetCommonCounters(RTC_RESOLVE_FOLDER), sender,
Config, folderId, token,
- probe, ev, cookie));
+ probe, ev, cookie, GetQuotas));
return;
}
@@ -1711,8 +1766,8 @@ TActorId ControlPlaneProxyActorId() {
return NActors::TActorId(0, name);
}
-IActor* CreateControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const NMonitoring::TDynamicCounterPtr& counters) {
- return new TControlPlaneProxyActor(config, counters);
+IActor* CreateControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const NMonitoring::TDynamicCounterPtr& counters, bool getQuotas) {
+ return new TControlPlaneProxyActor(config, counters, getQuotas);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h
index 12e495ba1c..dfd831adbb 100644
--- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h
+++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h
@@ -22,6 +22,6 @@ namespace NYq {
NActors::TActorId ControlPlaneProxyActorId();
-NActors::IActor* CreateControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const NMonitoring::TDynamicCounterPtr& counters);
+NActors::IActor* CreateControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const NMonitoring::TDynamicCounterPtr& counters, bool getQuotas);
} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_proxy/events/CMakeLists.txt b/ydb/core/yq/libs/control_plane_proxy/events/CMakeLists.txt
index 5c0bbc7b2c..73799d8c5b 100644
--- a/ydb/core/yq/libs/control_plane_proxy/events/CMakeLists.txt
+++ b/ydb/core/yq/libs/control_plane_proxy/events/CMakeLists.txt
@@ -14,6 +14,7 @@ target_link_libraries(libs-control_plane_proxy-events PUBLIC
cpp-actors-core
cpp-actors-interconnect
libs-control_plane_storage-events
+ libs-quota_manager-events
)
target_sources(libs-control_plane_proxy-events PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_proxy/events/events.cpp
diff --git a/ydb/core/yq/libs/control_plane_proxy/events/events.h b/ydb/core/yq/libs/control_plane_proxy/events/events.h
index 6398bc3714..bf0d956865 100644
--- a/ydb/core/yq/libs/control_plane_proxy/events/events.h
+++ b/ydb/core/yq/libs/control_plane_proxy/events/events.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/yq/libs/quota_manager/events/events.h>
#include <ydb/public/api/protos/yq.pb.h>
@@ -68,12 +69,14 @@ struct TEvControlPlaneProxy {
const ProtoMessage& request,
const TString& user,
const TString& token,
- const TVector<TString>& permissions)
+ const TVector<TString>& permissions,
+ const TQuotaMap& quotas = {})
: FolderId(folderId)
, Request(request)
, User(user)
, Token(token)
, Permissions(permissions)
+ , Quotas(quotas)
{
}
@@ -83,6 +86,7 @@ struct TEvControlPlaneProxy {
TString User;
TString Token;
TVector<TString> Permissions;
+ TQuotaMap Quotas;
};
template<typename TDerived, typename ProtoMessage, ui32 EventType>
diff --git a/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
index 2d5fcefeef..7db7b4e6bc 100644
--- a/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
+++ b/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
@@ -351,7 +351,7 @@ private:
TRuntimePtr runtime(new TTestBasicRuntime());
runtime->SetLogPriority(NKikimrServices::STREAMS_CONTROL_PLANE_SERVICE, NLog::PRI_DEBUG);
- auto controlPlaneProxy = CreateControlPlaneProxyActor(Config, MakeIntrusive<NMonitoring::TDynamicCounters>());
+ auto controlPlaneProxy = CreateControlPlaneProxyActor(Config, MakeIntrusive<NMonitoring::TDynamicCounters>(), false);
runtime->AddLocalService(
ControlPlaneProxyActorId(),
TActorSetupCmd(controlPlaneProxy, TMailboxType::Simple, 0));
diff --git a/ydb/core/yq/libs/control_plane_storage/events/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/events/CMakeLists.txt
index 954cfa161b..1a8f1b3fb0 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/CMakeLists.txt
+++ b/ydb/core/yq/libs/control_plane_storage/events/CMakeLists.txt
@@ -15,6 +15,7 @@ target_link_libraries(libs-control_plane_storage-events PUBLIC
cpp-actors-interconnect
libs-control_plane_storage-proto
yq-libs-events
+ libs-quota_manager-events
)
target_sources(libs-control_plane_storage-events PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/events/events.cpp
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h
index fc614a09a0..7b38f1e994 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/yq/libs/control_plane_storage/events/events.h
@@ -16,6 +16,7 @@
#include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h>
#include <ydb/core/yq/libs/events/event_subspace.h>
+#include <ydb/core/yq/libs/quota_manager/events/events.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
namespace NYq {
@@ -207,13 +208,15 @@ struct TEvControlPlaneStorage {
const TString& user,
const TString& token,
const TString& cloudId,
- TPermissions permissions)
+ TPermissions permissions,
+ const TQuotaMap& quotas)
: Scope(scope)
, Request(request)
, User(user)
, Token(token)
, CloudId(cloudId)
, Permissions(permissions)
+ , Quotas(quotas)
{
}
@@ -232,6 +235,7 @@ struct TEvControlPlaneStorage {
TString Token;
TString CloudId;
TPermissions Permissions;
+ TQuotaMap Quotas;
};
template<typename TDerived, typename ProtoMessage, ui32 EventType>
diff --git a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto
index 0ad35e8228..1c90cf0dde 100644
--- a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto
+++ b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto
@@ -30,6 +30,7 @@ message QueryInternal {
repeated bytes dq_graph = 13;
int32 dq_graph_index = 14;
StreamingDisposition disposition = 15;
+ uint64 result_limit = 16;
}
message JobInternal {
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index fb5d51656e..f37dd98eb9 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -74,6 +74,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
TInstant startTime = TInstant::Now();
const TEvControlPlaneStorage::TEvCreateQueryRequest& event = *ev->Get();
const TString cloudId = event.CloudId;
+ auto it = event.Quotas.find(QUOTA_RESULT_LIMIT);
+ ui64 resultLimit = (it != event.Quotas.end()) ? it->second.Limit : 0;
const TString scope = event.Scope;
TRequestCountersPtr requestCounters = Counters.GetScopeCounters(cloudId, scope, RTS_CREATE_QUERY);
requestCounters->InFly->Inc();
@@ -192,6 +194,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
queryInternal.set_cloud_id(cloudId);
queryInternal.set_state_load_mode(YandexQuery::StateLoadMode::EMPTY);
queryInternal.mutable_disposition()->CopyFrom(request.disposition());
+ queryInternal.set_result_limit(resultLimit);
if (request.execute_mode() != YandexQuery::SAVE) {
// TODO: move to run actor priority selection
diff --git a/ydb/core/yq/libs/events/event_subspace.h b/ydb/core/yq/libs/events/event_subspace.h
index 6f02afaccf..46aef90629 100644
--- a/ydb/core/yq/libs/events/event_subspace.h
+++ b/ydb/core/yq/libs/events/event_subspace.h
@@ -27,6 +27,7 @@ struct TYqEventSubspace {
AuditService,
TestConnection,
InternalService,
+ QuotaService,
SubspacesEnd,
};
diff --git a/ydb/core/yq/libs/init/CMakeLists.txt b/ydb/core/yq/libs/init/CMakeLists.txt
index b1c99ba5ef..37ac2281e8 100644
--- a/ydb/core/yq/libs/init/CMakeLists.txt
+++ b/ydb/core/yq/libs/init/CMakeLists.txt
@@ -28,6 +28,7 @@ target_link_libraries(yq-libs-init PUBLIC
yq-libs-events
yq-libs-gateway
yq-libs-health
+ yq-libs-quota_manager
yq-libs-shared_resources
yq-libs-test_connection
ydb-library-folder_service
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 348a474997..ab3de5915f 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -9,6 +9,7 @@
#include <ydb/core/yq/libs/health/health.h>
#include <ydb/core/yq/libs/checkpoint_storage/storage_service.h>
#include <ydb/core/yq/libs/private_client/internal_service.h>
+#include <ydb/core/yq/libs/quota_manager/quota_manager.h>
#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/folder_service/folder_service.h>
#include <ydb/library/yql/providers/common/metrics/service_counters.h>
@@ -77,7 +78,7 @@ void Init(
if (protoConfig.GetControlPlaneProxy().GetEnabled()) {
auto controlPlaneProxy = NYq::CreateControlPlaneProxyActor(protoConfig.GetControlPlaneProxy(),
- appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneProxy"));
+ appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneProxy"), protoConfig.GetQuotasManager().GetEnabled());
actorRegistrator(NYq::ControlPlaneProxyActorId(), controlPlaneProxy);
}
@@ -258,6 +259,15 @@ void Init(
serviceCounters.Counters);
actorRegistrator(NYq::HealthActorId(), health);
}
+
+ if (protoConfig.GetQuotasManager().GetEnabled()) {
+ auto quotaService = NYq::CreateQuotaServiceActor(
+ protoConfig.GetQuotasManager(),
+ yqSharedResources,
+ serviceCounters.Counters,
+ { TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 20_MB) });
+ actorRegistrator(NYq::MakeQuotaServiceActorId(), quotaService);
+ }
}
IYqSharedResources::TPtr CreateYqSharedResources(
diff --git a/ydb/core/yq/libs/protos/yq_private.proto b/ydb/core/yq/libs/protos/yq_private.proto
index 37565b1cc6..ab524e0a79 100644
--- a/ydb/core/yq/libs/protos/yq_private.proto
+++ b/ydb/core/yq/libs/protos/yq_private.proto
@@ -70,6 +70,7 @@ message GetTaskResult {
string query_name = 23;
google.protobuf.Timestamp deadline = 24;
YandexQuery.StreamingDisposition disposition = 25;
+ uint64 result_limit = 26;
}
repeated Task tasks = 1;
}
diff --git a/ydb/core/yq/libs/quota_manager/CMakeLists.txt b/ydb/core/yq/libs/quota_manager/CMakeLists.txt
new file mode 100644
index 0000000000..0186382aae
--- /dev/null
+++ b/ydb/core/yq/libs/quota_manager/CMakeLists.txt
@@ -0,0 +1,28 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yq-libs-quota_manager)
+target_compile_options(yq-libs-quota_manager PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(yq-libs-quota_manager PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-monlib-dynamic_counters
+ cpp-protobuf-json
+ api-grpc-draft
+ cpp-client-ydb_table
+ libs-control_plane_storage-proto
+ libs-quota_manager-events
+ yq-libs-shared_resources
+ ydb-core-protos
+)
+target_sources(yq-libs-quota_manager PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/quota_manager/quota_manager.cpp
+)
diff --git a/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt b/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt
new file mode 100644
index 0000000000..0b39cb10db
--- /dev/null
+++ b/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt
@@ -0,0 +1,18 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-quota_manager-events)
+target_link_libraries(libs-quota_manager-events PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ yq-libs-events
+)
+target_sources(libs-quota_manager-events PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/quota_manager/events/events.cpp
+)
diff --git a/ydb/core/yq/libs/quota_manager/events/events.cpp b/ydb/core/yq/libs/quota_manager/events/events.cpp
new file mode 100644
index 0000000000..b4d595d7a6
--- /dev/null
+++ b/ydb/core/yq/libs/quota_manager/events/events.cpp
@@ -0,0 +1 @@
+#include "events.h" \ No newline at end of file
diff --git a/ydb/core/yq/libs/quota_manager/events/events.h b/ydb/core/yq/libs/quota_manager/events/events.h
new file mode 100644
index 0000000000..05507def80
--- /dev/null
+++ b/ydb/core/yq/libs/quota_manager/events/events.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include <memory>
+
+#include <util/generic/maybe.h>
+#include <util/generic/string.h>
+
+#include <ydb/core/yq/libs/events/event_subspace.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/event_local.h>
+
+namespace NYq {
+
+constexpr auto SUBJECT_TYPE_CLOUD = "cloud";
+constexpr auto QUOTA_RESULT_LIMIT = "fq.queryResultLimit.bytes";
+
+struct IQuotaCalculator {
+ // ui64 GetQuotaUsage(const TString& quota, const TString& subjectType, const TString& subjectId) = 0;
+};
+
+struct TQuotaDescription {
+ TString SubjectType;
+ TString MetricName;
+ ui64 DefaultLimit;
+ std::shared_ptr<IQuotaCalculator> QuotaCalculator;
+ TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit)
+ : SubjectType(subjectType)
+ , MetricName(metricName)
+ , DefaultLimit(defaultLimit)
+ {
+ }
+};
+
+struct TQuotaUsage {
+ ui64 Limit;
+ TMaybe<ui64> Usage;
+ TQuotaUsage(ui64 limit) : Limit(limit) {}
+ TQuotaUsage(ui64 limit, ui64 usage) : Limit(limit), Usage(usage) {}
+};
+
+using TQuotaMap = THashMap<TString, TQuotaUsage>;
+
+struct TEvQuotaService {
+ // Event ids.
+ enum EEv : ui32 {
+ EvQuotaGetRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::QuotaService),
+ EvQuotaGetResponse,
+ EvEnd,
+ };
+
+ static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::QuotaService), "All events must be in their subspace");
+
+ struct TQuotaGetRequest : public NActors::TEventLocal<TQuotaGetRequest, EvQuotaGetRequest> {
+ TString SubjectType;
+ TString SubjectId;
+ };
+
+ // Quota request never fails, if no quota exist (i.e. SubjectType is incorrect) empty list will be returned
+ struct TQuotaGetResponse : public NActors::TEventLocal<TQuotaGetResponse, EvQuotaGetResponse> {
+ TQuotaMap Quotas;
+ };
+};
+
+} /* NYq */
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
new file mode 100644
index 0000000000..1d53206345
--- /dev/null
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
@@ -0,0 +1,78 @@
+#include "quota_manager.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
+
+#include <ydb/core/protos/services.pb.h>
+
+#define LOG_E(stream) \
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_W(stream) \
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_I(stream) \
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_D(stream) \
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+
+namespace NYq {
+
+NActors::TActorId MakeQuotaServiceActorId() {
+ constexpr TStringBuf name = "FQQTASRV";
+ return NActors::TActorId(0, name);
+}
+
+class TQuotaManagementService : public NActors::TActorBootstrapped<TQuotaManagementService> {
+public:
+ TQuotaManagementService(
+ const NConfig::TQuotasManagerConfig& config,
+ const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ const NMonitoring::TDynamicCounterPtr& counters,
+ std::vector<TQuotaDescription> quotaDesc)
+ : Config(config)
+ , ServiceCounters(counters->GetSubgroup("subsystem", "QuotaService"))
+ {
+ Y_UNUSED(yqSharedResources);
+ Y_UNUSED(quotaDesc);
+ }
+
+ static constexpr char ActorName[] = "FQ_QUOTA_SERVICE";
+
+ void Bootstrap() {
+ Become(&TQuotaManagementService::StateFunc);
+ LOG_I("STARTED");
+ }
+
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvQuotaService::TQuotaGetRequest, Handle)
+ );
+
+ void Handle(TEvQuotaService::TQuotaGetRequest::TPtr& ev) {
+ auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>();
+ for (const auto& quota : Config.GetQuotas()) {
+ if (quota.GetSubjectType() == ev->Get()->SubjectType && quota.GetSubjectId() == ev->Get()->SubjectId) {
+ for (const auto& limit : quota.GetLimit()) {
+ response->Quotas.emplace(limit.GetName(), TQuotaUsage(limit.GetLimit()));
+ }
+ }
+ }
+ Send(ev->Sender, response.Release());
+ }
+
+ NConfig::TQuotasManagerConfig Config;
+ const NMonitoring::TDynamicCounterPtr ServiceCounters;
+};
+
+NActors::IActor* CreateQuotaServiceActor(
+ const NConfig::TQuotasManagerConfig& config,
+ const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ const NMonitoring::TDynamicCounterPtr& counters,
+ std::vector<TQuotaDescription> quotaDesc) {
+ return new TQuotaManagementService(config, yqSharedResources, counters, quotaDesc);
+}
+
+} /* NYq */
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.h b/ydb/core/yq/libs/quota_manager/quota_manager.h
new file mode 100644
index 0000000000..6db4d1deaf
--- /dev/null
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <ydb/core/yq/libs/quota_manager/events/events.h>
+
+#include <ydb/core/yq/libs/config/protos/quotas_manager.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NYq {
+
+NActors::TActorId MakeQuotaServiceActorId();
+
+NActors::IActor* CreateQuotaServiceActor(
+ const NConfig::TQuotasManagerConfig& config,
+ const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ const NMonitoring::TDynamicCounterPtr& counters,
+ std::vector<TQuotaDescription> quotaDesc);
+
+} /* NYq */
diff --git a/ydb/core/yq/libs/test_connection/events/events.h b/ydb/core/yq/libs/test_connection/events/events.h
index 7961cefdc7..7cbb61bdcc 100644
--- a/ydb/core/yq/libs/test_connection/events/events.h
+++ b/ydb/core/yq/libs/test_connection/events/events.h
@@ -29,13 +29,15 @@ struct TEvTestConnection {
const TString& user,
const TString& token,
const TString& cloudId,
- const TPermissions& permissions)
+ const TPermissions& permissions,
+ const TQuotaMap& quotas)
: CloudId(cloudId)
, Scope(scope)
, Request(request)
, User(user)
, Token(token)
, Permissions(permissions)
+ , Quotas(quotas)
{
}
@@ -45,6 +47,7 @@ struct TEvTestConnection {
TString User;
TString Token;
TPermissions Permissions;
+ const TQuotaMap Quotas;
};
struct TEvTestConnectionResponse : NActors::TEventLocal<TEvTestConnectionResponse, EvTestConnectionResponse> {
diff --git a/ydb/library/folder_service/mock/mock_folder_service.cpp b/ydb/library/folder_service/mock/mock_folder_service.cpp
index 68f6d1d64d..12e825fed8 100644
--- a/ydb/library/folder_service/mock/mock_folder_service.cpp
+++ b/ydb/library/folder_service/mock/mock_folder_service.cpp
@@ -18,10 +18,16 @@ public:
}
void Handle(TEvListFolderRequest::TPtr& ev) {
+ auto folderId = ev.Get()->Get()->Request.folder_id();
auto result = std::make_unique<TEvListFolderResponse>();
auto* fakeFolder = result->Response.mutable_folder();
- fakeFolder->set_id(ev.Get()->Get()->Request.folder_id());
- fakeFolder->set_cloud_id("mock_cloud");
+ TString cloudId = "mock_cloud";
+ auto p = folderId.find('@');
+ if (p != folderId.npos) {
+ cloudId = folderId.substr(p + 1);
+ }
+ fakeFolder->set_id(folderId);
+ fakeFolder->set_cloud_id(cloudId);
result->Status = NGrpc::TGrpcStatus();
Send(ev->Sender, result.release());
}