aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-07-08 10:56:19 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-07-08 10:56:19 +0300
commit3f5056deffaa871aa49c77d6bc14d0c49a6b60c2 (patch)
treefd0dbf540b1ea0a38fc5184ec9fccfe1e2c4b37f
parentd783e97de9fbfbbcec1ff6ef52c8c2515d9b3576 (diff)
downloadydb-3f5056deffaa871aa49c77d6bc14d0c49a6b60c2.tar.gz
Create kesuses for rate limiting in YQ
Fill config in kikimr runner Create coordination nodes Pass to config Protos
-rw-r--r--CMakeLists.darwin.txt5
-rw-r--r--CMakeLists.linux.txt5
-rw-r--r--library/cpp/actors/http/http.h22
-rw-r--r--library/cpp/actors/http/http_proxy.cpp4
-rw-r--r--library/cpp/actors/http/http_proxy.h25
-rw-r--r--library/python/pytest/plugins/conftests.py4
-rw-r--r--library/python/pytest/plugins/ya.py3
-rw-r--r--library/python/testing/import_test/import_test.py12
-rw-r--r--library/python/testing/recipe/__init__.py10
-rw-r--r--library/python/testing/yatest_common/yatest/common/runtime.py32
-rw-r--r--ydb/core/protos/services.proto1
-rw-r--r--ydb/core/yq/libs/config/protos/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/config/protos/rate_limiter.proto22
-rw-r--r--ydb/core/yq/libs/config/protos/yq_config.proto2
-rw-r--r--ydb/core/yq/libs/init/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/init/init.cpp6
-rw-r--r--ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt27
-rw-r--r--ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp128
-rw-r--r--ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h18
-rw-r--r--ydb/core/yq/libs/ydb/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/ydb/create_schema.cpp46
-rw-r--r--ydb/core/yq/libs/ydb/create_schema.h10
-rw-r--r--ydb/core/yq/libs/ydb/ydb.cpp1
-rw-r--r--ydb/core/yq/libs/ydb/ydb.h2
24 files changed, 353 insertions, 35 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 9b6aa23a80..4a495cabf1 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -752,8 +752,9 @@ add_subdirectory(ydb/core/yq/libs/ydb)
add_subdirectory(library/cpp/retry)
add_subdirectory(library/cpp/retry/protos)
add_subdirectory(ydb/library/security)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme)
add_subdirectory(ydb/core/yq/libs/db_schema)
add_subdirectory(ydb/core/yq/libs/shared_resources)
add_subdirectory(ydb/core/yq/libs/shared_resources/interface)
@@ -891,6 +892,7 @@ add_subdirectory(ydb/core/yq/libs/tasks_packer)
add_subdirectory(ydb/core/yq/libs/health)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)
add_subdirectory(ydb/core/yq/libs/quota_manager)
+add_subdirectory(ydb/core/yq/libs/rate_limiter/control_plane_service)
add_subdirectory(ydb/core/yq/libs/test_connection)
add_subdirectory(ydb/core/yq/libs/test_connection/events)
add_subdirectory(ydb/library/yql/providers/solomon/async_io)
@@ -1087,7 +1089,6 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl)
add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut)
add_subdirectory(ydb/services/rate_limiter/ut)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter)
add_subdirectory(ydb/services/ydb/index_ut)
add_subdirectory(ydb/services/ydb/sdk_credprovider_ut)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 81e3793715..1e8c6079cc 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -756,8 +756,9 @@ add_subdirectory(ydb/core/yq/libs/ydb)
add_subdirectory(library/cpp/retry)
add_subdirectory(library/cpp/retry/protos)
add_subdirectory(ydb/library/security)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme)
add_subdirectory(ydb/core/yq/libs/db_schema)
add_subdirectory(ydb/core/yq/libs/shared_resources)
add_subdirectory(ydb/core/yq/libs/shared_resources/interface)
@@ -895,6 +896,7 @@ add_subdirectory(ydb/core/yq/libs/tasks_packer)
add_subdirectory(ydb/core/yq/libs/health)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)
add_subdirectory(ydb/core/yq/libs/quota_manager)
+add_subdirectory(ydb/core/yq/libs/rate_limiter/control_plane_service)
add_subdirectory(ydb/core/yq/libs/test_connection)
add_subdirectory(ydb/core/yq/libs/test_connection/events)
add_subdirectory(ydb/library/yql/providers/solomon/async_io)
@@ -1107,7 +1109,6 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl)
add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut)
add_subdirectory(ydb/services/rate_limiter/ut)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter)
add_subdirectory(ydb/services/ydb/index_ut)
add_subdirectory(ydb/services/ydb/sdk_credprovider_ut)
diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h
index 318000389c..295b8f97b5 100644
--- a/library/cpp/actors/http/http.h
+++ b/library/cpp/actors/http/http.h
@@ -54,6 +54,27 @@ struct TEqNoCase {
}
};
+struct TSensors {
+ TString Direction;
+ TString Host;
+ TString Url;
+ TString Status;
+ TDuration Time;
+
+ TSensors(
+ TStringBuf direction,
+ TStringBuf host,
+ TStringBuf url,
+ TStringBuf status,
+ TDuration time)
+ : Direction(direction)
+ , Host(host)
+ , Url(url)
+ , Status(status)
+ , Time(time)
+ {}
+};
+
struct TUrlParameters {
THashMap<TStringBuf, TStringBuf> Parameters;
@@ -840,6 +861,7 @@ public:
// it's temporary accessible for cleanup
//protected:
THttpIncomingRequestPtr Request;
+ std::unique_ptr<TSensors> Sensors;
};
}
diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp
index 394cb17bd3..7cf83d38d8 100644
--- a/library/cpp/actors/http/http_proxy.cpp
+++ b/library/cpp/actors/http/http_proxy.cpp
@@ -260,6 +260,10 @@ TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingR
}
TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) {
+ const auto& sensors = response->Sensors;
+ if (sensors) {
+ return new TEvHttpProxy::TEvReportSensors(*sensors);
+ }
return new TEvHttpProxy::TEvReportSensors(
"in",
request->Host,
diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h
index b95954e561..0ed09119e4 100644
--- a/library/cpp/actors/http/http_proxy.h
+++ b/library/cpp/actors/http/http_proxy.h
@@ -20,7 +20,7 @@ struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig {
SocketType Socket;
TSocketDescriptor() = default;
-
+
TSocketDescriptor(int af)
: Socket(af)
{
@@ -213,24 +213,11 @@ struct TEvHttpProxy {
{}
};
- struct TEvReportSensors : NActors::TEventLocal<TEvReportSensors, EvReportSensors> {
- TString Direction;
- TString Host;
- TString Url;
- TString Status;
- TDuration Time;
-
- TEvReportSensors(
- TStringBuf direction,
- TStringBuf host,
- TStringBuf url,
- TStringBuf status,
- TDuration time)
- : Direction(direction)
- , Host(host)
- , Url(url)
- , Status(status)
- , Time(time)
+ struct TEvReportSensors : TSensors, NActors::TEventLocal<TEvReportSensors, EvReportSensors> {
+ using TSensors::TSensors;
+
+ TEvReportSensors(const TSensors& sensors)
+ : TSensors(sensors)
{}
};
};
diff --git a/library/python/pytest/plugins/conftests.py b/library/python/pytest/plugins/conftests.py
index 3c855c9c11..2ea36ae4c2 100644
--- a/library/python/pytest/plugins/conftests.py
+++ b/library/python/pytest/plugins/conftests.py
@@ -3,7 +3,10 @@ import importlib
import sys
import inspect
+import yatest.common as yc
+
from pytest import hookimpl
+from yatest_lib.ya import Ya
from .fixtures import metrics, links # noqa
@@ -23,6 +26,7 @@ conftest_modules = []
@hookimpl(trylast=True)
def pytest_load_initial_conftests(early_config, parser, args):
+ yc.runtime._set_ya_config(ya=Ya())
conftests = filter(lambda name: name.endswith(".conftest"), sys.extra_modules)
def conftest_key(name):
diff --git a/library/python/pytest/plugins/ya.py b/library/python/pytest/plugins/ya.py
index 5b48ab775d..715d24ce5b 100644
--- a/library/python/pytest/plugins/ya.py
+++ b/library/python/pytest/plugins/ya.py
@@ -30,6 +30,7 @@ import _pytest.skipping
from _pytest.warning_types import PytestUnhandledCoroutineWarning
from yatest_lib import test_splitter
+import yatest.common as yatest_common
try:
import resource
@@ -283,6 +284,8 @@ def pytest_configure(config):
if config.option.pdb_on_sigusr1:
configure_pdb_on_demand()
+ yatest_common.runtime._set_ya_config(config=config)
+
# Dump python backtrace in case of any errors
faulthandler.enable()
if hasattr(signal, "SIGQUIT"):
diff --git a/library/python/testing/import_test/import_test.py b/library/python/testing/import_test/import_test.py
index 5290ae3cae..114723a4c6 100644
--- a/library/python/testing/import_test/import_test.py
+++ b/library/python/testing/import_test/import_test.py
@@ -11,6 +11,15 @@ import __res
from __res import importer
+def setup_test_environment():
+ try:
+ from yatest_lib.ya import Ya
+ import yatest.common as yc
+ yc.runtime._set_ya_config(ya=Ya())
+ except ImportError:
+ pass
+
+
def check_imports(no_check=(), extra=(), skip_func=None, py_main=None):
"""
tests all bundled modules are importable
@@ -18,6 +27,7 @@ def check_imports(no_check=(), extra=(), skip_func=None, py_main=None):
"PEERDIR(library/python/import_test)" to your CMakeLists.txt and
"from import_test import test_imports" to your python test source file.
"""
+
str_ = lambda s: s
if not isinstance(b'', str):
str_ = lambda s: s.decode('UTF-8')
@@ -99,6 +109,8 @@ test_imports = check_imports
def main():
+ setup_test_environment()
+
skip_names = sys.argv[1:]
try:
diff --git a/library/python/testing/recipe/__init__.py b/library/python/testing/recipe/__init__.py
index 5ef9c5c189..5f48628ad3 100644
--- a/library/python/testing/recipe/__init__.py
+++ b/library/python/testing/recipe/__init__.py
@@ -8,6 +8,8 @@ import argparse
from yatest_lib.ya import Ya
+import yatest.common as yc
+
RECIPE_START_OPTION = "start"
RECIPE_STOP_OPTION = "stop"
@@ -16,6 +18,13 @@ collect_cores = None
sanitizer_extra_checks = None
+class Config:
+ def __init__(self):
+ self.ya = ya
+ self.collect_cores = collect_cores
+ self.sanitizer_extra_checks = sanitizer_extra_checks
+
+
def _setup_logging(level=logging.DEBUG):
root_logger = logging.getLogger()
root_logger.setLevel(level)
@@ -60,6 +69,7 @@ def get_options():
os.environ[envvar] = os.environ[envvar + '_ORIGINAL']
collect_cores = args.collect_cores
+ yc.runtime._set_ya_config(config=Config())
for recipe_option in RECIPE_START_OPTION, RECIPE_STOP_OPTION:
if recipe_option in opts:
return args, opts[opts.index(recipe_option):]
diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py
index 6e6c3c8759..b8d5964d1d 100644
--- a/library/python/testing/yatest_common/yatest/common/runtime.py
+++ b/library/python/testing/yatest_common/yatest/common/runtime.py
@@ -9,22 +9,30 @@ import six
_lock = threading.Lock()
+_config = None
+
+
+def _set_ya_config(config=None, ya=None):
+ global _config
+ if config:
+ _config = config
+ elif ya:
+ class Config:
+ def __init__(self):
+ self.ya = None
+ _config = Config()
+ _config.ya = ya
+
def _get_ya_config():
- try:
- import library.python.pytest.plugins.ya as ya_plugin
- if ya_plugin.pytest_config is not None:
- return ya_plugin.pytest_config
- import pytest
- return pytest.config
- except (ImportError, AttributeError):
+ if _config:
+ return _config
+ else:
try:
- import library.python.testing.recipe
- if library.python.testing.recipe.ya:
- return library.python.testing.recipe
+ import pytest
+ return pytest.config
except (ImportError, AttributeError):
- pass
- raise NotImplementedError("yatest.common.* is only available from the testing runtime")
+ raise NotImplementedError("yatest.common.* is only available from the testing runtime")
def _get_ya_plugin_instance():
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 5fc0856fc5..44c76870dc 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -301,6 +301,7 @@ enum EServiceKikimr {
YQ_AUDIT = 1150;
YQ_AUDIT_EVENT_SENDER = 1151;
YQ_HEALTH = 1152;
+ YQ_RATE_LIMITER = 1155;
FQ_INTERNAL_SERVICE = 1153;
FQ_QUOTA_SERVICE = 1154;
diff --git a/ydb/core/yq/libs/config/protos/CMakeLists.txt b/ydb/core/yq/libs/config/protos/CMakeLists.txt
index ce03e7b21f..c399db4b32 100644
--- a/ydb/core/yq/libs/config/protos/CMakeLists.txt
+++ b/ydb/core/yq/libs/config/protos/CMakeLists.txt
@@ -33,6 +33,7 @@ target_proto_messages(libs-config-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/private_api.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/private_proxy.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/quotas_manager.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/rate_limiter.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/read_actors_factory.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/resource_manager.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/storage.proto
diff --git a/ydb/core/yq/libs/config/protos/rate_limiter.proto b/ydb/core/yq/libs/config/protos/rate_limiter.proto
new file mode 100644
index 0000000000..3a82632b30
--- /dev/null
+++ b/ydb/core/yq/libs/config/protos/rate_limiter.proto
@@ -0,0 +1,22 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package NYq.NConfig;
+option java_package = "ru.yandex.kikimr.proto";
+
+import "ydb/core/yq/libs/config/protos/storage.proto";
+
+////////////////////////////////////////////////////////////
+
+message TLimiter {
+ string CoordinationNodePath = 1;
+}
+
+message TRateLimiterConfig {
+ bool Enabled = 1;
+ bool ControlPlaneEnabled = 2;
+ bool DataPlaneEnabled = 3;
+
+ NYq.NConfig.TYdbStorageConfig Database = 4;
+ repeated TLimiter Limiters = 5;
+}
diff --git a/ydb/core/yq/libs/config/protos/yq_config.proto b/ydb/core/yq/libs/config/protos/yq_config.proto
index 74344d2ccf..c7c7deb102 100644
--- a/ydb/core/yq/libs/config/protos/yq_config.proto
+++ b/ydb/core/yq/libs/config/protos/yq_config.proto
@@ -18,6 +18,7 @@ import "ydb/core/yq/libs/config/protos/pinger.proto";
import "ydb/core/yq/libs/config/protos/private_api.proto";
import "ydb/core/yq/libs/config/protos/private_proxy.proto";
import "ydb/core/yq/libs/config/protos/quotas_manager.proto";
+import "ydb/core/yq/libs/config/protos/rate_limiter.proto";
import "ydb/core/yq/libs/config/protos/read_actors_factory.proto";
import "ydb/core/yq/libs/config/protos/resource_manager.proto";
import "ydb/core/yq/libs/config/protos/test_connection.proto";
@@ -48,4 +49,5 @@ message TConfig {
TReadActorsFactoryConfig ReadActorsFactoryConfig = 19;
THealthConfig Health = 20;
TQuotasManagerConfig QuotasManager = 21;
+ TRateLimiterConfig RateLimiter = 22;
}
diff --git a/ydb/core/yq/libs/init/CMakeLists.txt b/ydb/core/yq/libs/init/CMakeLists.txt
index 272bb515f0..e052e99586 100644
--- a/ydb/core/yq/libs/init/CMakeLists.txt
+++ b/ydb/core/yq/libs/init/CMakeLists.txt
@@ -29,6 +29,7 @@ target_link_libraries(yq-libs-init PUBLIC
yq-libs-gateway
yq-libs-health
yq-libs-quota_manager
+ libs-rate_limiter-control_plane_service
yq-libs-shared_resources
yq-libs-test_connection
ydb-library-folder_service
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 15a0524d12..0f1431b575 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -11,6 +11,7 @@
#include <ydb/core/yq/libs/private_client/internal_service.h>
#include <ydb/core/yq/libs/private_client/loopback_service.h>
#include <ydb/core/yq/libs/quota_manager/quota_manager.h>
+#include <ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h>
#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/folder_service/folder_service.h>
#include <ydb/library/yql/providers/common/metrics/service_counters.h>
@@ -84,6 +85,11 @@ void Init(
actorRegistrator(NYq::ControlPlaneProxyActorId(), controlPlaneProxy);
}
+ if (protoConfig.GetRateLimiter().GetControlPlaneEnabled()) {
+ NActors::IActor* rateLimiterService = NYq::CreateRateLimiterControlPlaneService(protoConfig.GetRateLimiter(), yqSharedResources, credentialsProviderFactory);
+ actorRegistrator(NYq::RateLimiterControlPlaneServiceId(), rateLimiterService);
+ }
+
if (protoConfig.GetAudit().GetEnabled()) {
auto* auditSerive = auditServiceFactory(
protoConfig.GetAudit(),
diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt
new file mode 100644
index 0000000000..da96f328b5
--- /dev/null
+++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt
@@ -0,0 +1,27 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-rate_limiter-control_plane_service)
+target_compile_options(libs-rate_limiter-control_plane_service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-rate_limiter-control_plane_service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-protos
+ libs-config-protos
+ yq-libs-events
+ yq-libs-shared_resources
+ yq-libs-ydb
+ ydb-library-security
+)
+target_sources(libs-rate_limiter-control_plane_service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp
+)
diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp
new file mode 100644
index 0000000000..d52cb03cb3
--- /dev/null
+++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp
@@ -0,0 +1,128 @@
+#include "rate_limiter_control_plane_service.h"
+
+#include <ydb/core/protos/services.pb.h>
+#include <ydb/core/yq/libs/events/events.h>
+#include <ydb/core/yq/libs/ydb/create_schema.h>
+#include <ydb/core/yq/libs/ydb/util.h>
+#include <ydb/core/yq/libs/ydb/ydb.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+#define LOG_D(stream) LOG_DEBUG_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream)
+#define LOG_I(stream) LOG_INFO_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream)
+#define LOG_W(stream) LOG_WARN_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream)
+#define LOG_E(stream) LOG_ERROR_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream)
+
+namespace NYq {
+
+class TRateLimiterControlPlaneService : public NActors::TActorBootstrapped<TRateLimiterControlPlaneService> {
+public:
+ static constexpr char ActorName[] = "YQ_RATE_LIMITER_CONTROL_PLANE";
+
+ TRateLimiterControlPlaneService(
+ const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig,
+ const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ : Config(rateLimiterConfig)
+ , YqSharedResources(yqSharedResources)
+ , CredProviderFactory(credentialsProviderFactory)
+ {
+ }
+
+ void Bootstrap() {
+ Y_VERIFY(Config.GetControlPlaneEnabled());
+ if (!Config.GetEnabled()) {
+ Become(&TRateLimiterControlPlaneService::RateLimiterOffStateFunc);
+ return;
+ }
+
+ StartInit();
+ }
+
+ static ERetryErrorClass RetryFunc(const NYdb::TStatus& status) {
+ return status.GetStatus() == NYdb::EStatus::OVERLOADED ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;
+ }
+
+ TYdbSdkRetryPolicy::TPtr MakeCreateSchemaRetryPolicy() {
+ return TYdbSdkRetryPolicy::GetExponentialBackoffPolicy(RetryFunc, TDuration::MilliSeconds(10), TDuration::Seconds(1), TDuration::Seconds(5));
+ }
+
+ void RunCreateCoordinationNodeActor(const TString& path) {
+ Register(MakeCreateCoordinationNodeActor(SelfId(), NKikimrServices::YQ_RATE_LIMITER, YdbConnection, path, MakeCreateSchemaRetryPolicy()));
+ ++CreatingCoordinationNodes;
+ }
+
+ void StartInit() {
+ Become(&TRateLimiterControlPlaneService::InitStateFunc);
+
+ YdbConnection = NewYdbConnection(Config.GetDatabase(), CredProviderFactory, YqSharedResources->CoreYdbDriver);
+
+ for (const auto& limiterConfig : Config.GetLimiters()) {
+ auto coordinationNodePath = JoinPath(YdbConnection->TablePathPrefix, limiterConfig.GetCoordinationNodePath());
+ RunCreateCoordinationNodeActor(coordinationNodePath);
+ }
+
+ TryStartWorking();
+ }
+
+ void TryStartWorking() {
+ Y_VERIFY(CreatingCoordinationNodes >= 0);
+ if (CreatingCoordinationNodes > 0) {
+ return;
+ }
+
+ Become(&TRateLimiterControlPlaneService::WorkingStateFunc);
+
+ // Start processing deferred queries
+ }
+
+ void HandleInit(TEvents::TEvSchemaCreated::TPtr&) {
+ --CreatingCoordinationNodes;
+
+ TryStartWorking();
+ }
+
+ // State func that does nothing. Rate limiting is turned off.
+ // Answers "OK" responses and does nothing.
+ STRICT_STFUNC(
+ RateLimiterOffStateFunc,
+ )
+
+ // State func that inits limiters.
+ // Puts all incoming requests to queue.
+ STRICT_STFUNC(
+ InitStateFunc,
+ hFunc(TEvents::TEvSchemaCreated, HandleInit);
+ )
+
+ // Working
+ STRICT_STFUNC(
+ WorkingStateFunc,
+ )
+
+private:
+ const NYq::NConfig::TRateLimiterConfig Config;
+ const NYq::TYqSharedResources::TPtr YqSharedResources;
+ NKikimr::TYdbCredentialsProviderFactory CredProviderFactory;
+ TYdbConnectionPtr YdbConnection;
+
+ // Init
+ size_t CreatingCoordinationNodes = 0;
+};
+
+NActors::TActorId RateLimiterControlPlaneServiceId() {
+ constexpr TStringBuf name = "RATE_LIM_CP";
+ return NActors::TActorId(0, name);
+}
+
+NActors::IActor* CreateRateLimiterControlPlaneService(
+ const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig,
+ const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+{
+ return new TRateLimiterControlPlaneService(rateLimiterConfig, yqSharedResources, credentialsProviderFactory);
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h
new file mode 100644
index 0000000000..96a9d34715
--- /dev/null
+++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h
@@ -0,0 +1,18 @@
+#pragma once
+#include <ydb/core/yq/libs/config/protos/rate_limiter.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/actor.h>
+
+namespace NYq {
+
+NActors::TActorId RateLimiterControlPlaneServiceId();
+
+NActors::IActor* CreateRateLimiterControlPlaneService(
+ const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig,
+ const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/ydb/CMakeLists.txt b/ydb/core/yq/libs/ydb/CMakeLists.txt
index 3e9c460c51..e3773d6dc7 100644
--- a/ydb/core/yq/libs/ydb/CMakeLists.txt
+++ b/ydb/core/yq/libs/ydb/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(yq-libs-ydb PUBLIC
yq-libs-config
yq-libs-events
ydb-library-security
+ cpp-client-ydb_coordination
cpp-client-ydb_scheme
cpp-client-ydb_table
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/yq/libs/ydb/create_schema.cpp b/ydb/core/yq/libs/ydb/create_schema.cpp
index a0ce2efe22..675bc474e8 100644
--- a/ydb/core/yq/libs/ydb/create_schema.cpp
+++ b/ydb/core/yq/libs/ydb/create_schema.cpp
@@ -251,6 +251,34 @@ private:
const TString DirectoryPath;
};
+class TCreateCoordinationNodeActor : public TCreateActorBase {
+public:
+ TCreateCoordinationNodeActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+ : TCreateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie)
+ , CoordinationNodePath(coordinationNodePath)
+ {
+ }
+
+private:
+ TString GetEntityName() const override {
+ return TStringBuilder() << "coordination node \"" << CoordinationNodePath << "\"";
+ }
+
+ NYdb::TAsyncStatus CallYdbSdk() override {
+ SCHEMA_LOG_DEBUG("Call create coordination node \"" << CoordinationNodePath << "\"");
+ return Connection->CoordinationClient.CreateNode(CoordinationNodePath);
+ }
+
+private:
+ const TString CoordinationNodePath;
+};
+
NActors::IActor* MakeCreateTableActor(
NActors::TActorId parent,
ui64 logComponent,
@@ -289,4 +317,22 @@ NActors::IActor* MakeCreateDirectoryActor(
);
}
+NActors::IActor* MakeCreateCoordinationNodeActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+{
+ return new TCreateCoordinationNodeActor(
+ parent,
+ logComponent,
+ std::move(connection),
+ coordinationNodePath,
+ std::move(retryPolicy),
+ cookie
+ );
+}
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/ydb/create_schema.h b/ydb/core/yq/libs/ydb/create_schema.h
index b05c0ec01b..d390431caf 100644
--- a/ydb/core/yq/libs/ydb/create_schema.h
+++ b/ydb/core/yq/libs/ydb/create_schema.h
@@ -30,4 +30,14 @@ NActors::IActor* MakeCreateDirectoryActor(
TYdbSdkRetryPolicy::TPtr retryPolicy,
ui64 cookie = 0);
+// Actor that creates coordination node.
+// Send TEvSchemaCreated to parent (if any).
+NActors::IActor* MakeCreateCoordinationNodeActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie = 0);
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/ydb/ydb.cpp b/ydb/core/yq/libs/ydb/ydb.cpp
index 190efa13df..ac791c01be 100644
--- a/ydb/core/yq/libs/ydb/ydb.cpp
+++ b/ydb/core/yq/libs/ydb/ydb.cpp
@@ -221,6 +221,7 @@ TYdbConnection::TYdbConnection(const NConfig::TYdbStorageConfig& config,
: Driver(driver)
, TableClient(Driver, GetClientSettings<NYdb::NTable::TClientSettings>(config, credProviderFactory))
, SchemeClient(Driver, GetClientSettings<NYdb::TCommonClientSettings>(config, credProviderFactory))
+ , CoordinationClient(Driver, GetClientSettings<NYdb::TCommonClientSettings>(config, credProviderFactory))
, DB(config.GetDatabase())
, TablePathPrefix(JoinPath(DB, config.GetTablePrefix()))
{
diff --git a/ydb/core/yq/libs/ydb/ydb.h b/ydb/core/yq/libs/ydb/ydb.h
index 9675dd8181..6903722e6c 100644
--- a/ydb/core/yq/libs/ydb/ydb.h
+++ b/ydb/core/yq/libs/ydb/ydb.h
@@ -3,6 +3,7 @@
#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/core/yq/libs/config/protos/storage.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_coordination/coordination.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
@@ -14,6 +15,7 @@ struct TYdbConnection : public TThrRefBase {
NYdb::TDriver Driver;
NYdb::NTable::TTableClient TableClient;
NYdb::NScheme::TSchemeClient SchemeClient;
+ NYdb::NCoordination::TClient CoordinationClient;
const TString DB;
const TString TablePathPrefix;