diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-07-08 10:56:19 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-07-08 10:56:19 +0300 |
commit | 3f5056deffaa871aa49c77d6bc14d0c49a6b60c2 (patch) | |
tree | fd0dbf540b1ea0a38fc5184ec9fccfe1e2c4b37f | |
parent | d783e97de9fbfbbcec1ff6ef52c8c2515d9b3576 (diff) | |
download | ydb-3f5056deffaa871aa49c77d6bc14d0c49a6b60c2.tar.gz |
Create kesuses for rate limiting in YQ
Fill config in kikimr runner
Create coordination nodes
Pass to config
Protos
24 files changed, 353 insertions, 35 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 9b6aa23a80..4a495cabf1 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -752,8 +752,9 @@ add_subdirectory(ydb/core/yq/libs/ydb) add_subdirectory(library/cpp/retry) add_subdirectory(library/cpp/retry/protos) add_subdirectory(ydb/library/security) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination) add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme) add_subdirectory(ydb/core/yq/libs/db_schema) add_subdirectory(ydb/core/yq/libs/shared_resources) add_subdirectory(ydb/core/yq/libs/shared_resources/interface) @@ -891,6 +892,7 @@ add_subdirectory(ydb/core/yq/libs/tasks_packer) add_subdirectory(ydb/core/yq/libs/health) add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery) add_subdirectory(ydb/core/yq/libs/quota_manager) +add_subdirectory(ydb/core/yq/libs/rate_limiter/control_plane_service) add_subdirectory(ydb/core/yq/libs/test_connection) add_subdirectory(ydb/core/yq/libs/test_connection/events) add_subdirectory(ydb/library/yql/providers/solomon/async_io) @@ -1087,7 +1089,6 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl) add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut) add_subdirectory(ydb/services/rate_limiter/ut) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination) add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter) add_subdirectory(ydb/services/ydb/index_ut) add_subdirectory(ydb/services/ydb/sdk_credprovider_ut) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 81e3793715..1e8c6079cc 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -756,8 +756,9 @@ add_subdirectory(ydb/core/yq/libs/ydb) add_subdirectory(library/cpp/retry) add_subdirectory(library/cpp/retry/protos) add_subdirectory(ydb/library/security) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination) add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme) add_subdirectory(ydb/core/yq/libs/db_schema) add_subdirectory(ydb/core/yq/libs/shared_resources) add_subdirectory(ydb/core/yq/libs/shared_resources/interface) @@ -895,6 +896,7 @@ add_subdirectory(ydb/core/yq/libs/tasks_packer) add_subdirectory(ydb/core/yq/libs/health) add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery) add_subdirectory(ydb/core/yq/libs/quota_manager) +add_subdirectory(ydb/core/yq/libs/rate_limiter/control_plane_service) add_subdirectory(ydb/core/yq/libs/test_connection) add_subdirectory(ydb/core/yq/libs/test_connection/events) add_subdirectory(ydb/library/yql/providers/solomon/async_io) @@ -1107,7 +1109,6 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl) add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut) add_subdirectory(ydb/services/rate_limiter/ut) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination) add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter) add_subdirectory(ydb/services/ydb/index_ut) add_subdirectory(ydb/services/ydb/sdk_credprovider_ut) diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 318000389c..295b8f97b5 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -54,6 +54,27 @@ struct TEqNoCase { } }; +struct TSensors { + TString Direction; + TString Host; + TString Url; + TString Status; + TDuration Time; + + TSensors( + TStringBuf direction, + TStringBuf host, + TStringBuf url, + TStringBuf status, + TDuration time) + : Direction(direction) + , Host(host) + , Url(url) + , Status(status) + , Time(time) + {} +}; + struct TUrlParameters { THashMap<TStringBuf, TStringBuf> Parameters; @@ -840,6 +861,7 @@ public: // it's temporary accessible for cleanup //protected: THttpIncomingRequestPtr Request; + std::unique_ptr<TSensors> Sensors; }; } diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp index 394cb17bd3..7cf83d38d8 100644 --- a/library/cpp/actors/http/http_proxy.cpp +++ b/library/cpp/actors/http/http_proxy.cpp @@ -260,6 +260,10 @@ TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingR } TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) { + const auto& sensors = response->Sensors; + if (sensors) { + return new TEvHttpProxy::TEvReportSensors(*sensors); + } return new TEvHttpProxy::TEvReportSensors( "in", request->Host, diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h index b95954e561..0ed09119e4 100644 --- a/library/cpp/actors/http/http_proxy.h +++ b/library/cpp/actors/http/http_proxy.h @@ -20,7 +20,7 @@ struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig { SocketType Socket; TSocketDescriptor() = default; - + TSocketDescriptor(int af) : Socket(af) { @@ -213,24 +213,11 @@ struct TEvHttpProxy { {} }; - struct TEvReportSensors : NActors::TEventLocal<TEvReportSensors, EvReportSensors> { - TString Direction; - TString Host; - TString Url; - TString Status; - TDuration Time; - - TEvReportSensors( - TStringBuf direction, - TStringBuf host, - TStringBuf url, - TStringBuf status, - TDuration time) - : Direction(direction) - , Host(host) - , Url(url) - , Status(status) - , Time(time) + struct TEvReportSensors : TSensors, NActors::TEventLocal<TEvReportSensors, EvReportSensors> { + using TSensors::TSensors; + + TEvReportSensors(const TSensors& sensors) + : TSensors(sensors) {} }; }; diff --git a/library/python/pytest/plugins/conftests.py b/library/python/pytest/plugins/conftests.py index 3c855c9c11..2ea36ae4c2 100644 --- a/library/python/pytest/plugins/conftests.py +++ b/library/python/pytest/plugins/conftests.py @@ -3,7 +3,10 @@ import importlib import sys import inspect +import yatest.common as yc + from pytest import hookimpl +from yatest_lib.ya import Ya from .fixtures import metrics, links # noqa @@ -23,6 +26,7 @@ conftest_modules = [] @hookimpl(trylast=True) def pytest_load_initial_conftests(early_config, parser, args): + yc.runtime._set_ya_config(ya=Ya()) conftests = filter(lambda name: name.endswith(".conftest"), sys.extra_modules) def conftest_key(name): diff --git a/library/python/pytest/plugins/ya.py b/library/python/pytest/plugins/ya.py index 5b48ab775d..715d24ce5b 100644 --- a/library/python/pytest/plugins/ya.py +++ b/library/python/pytest/plugins/ya.py @@ -30,6 +30,7 @@ import _pytest.skipping from _pytest.warning_types import PytestUnhandledCoroutineWarning from yatest_lib import test_splitter +import yatest.common as yatest_common try: import resource @@ -283,6 +284,8 @@ def pytest_configure(config): if config.option.pdb_on_sigusr1: configure_pdb_on_demand() + yatest_common.runtime._set_ya_config(config=config) + # Dump python backtrace in case of any errors faulthandler.enable() if hasattr(signal, "SIGQUIT"): diff --git a/library/python/testing/import_test/import_test.py b/library/python/testing/import_test/import_test.py index 5290ae3cae..114723a4c6 100644 --- a/library/python/testing/import_test/import_test.py +++ b/library/python/testing/import_test/import_test.py @@ -11,6 +11,15 @@ import __res from __res import importer +def setup_test_environment(): + try: + from yatest_lib.ya import Ya + import yatest.common as yc + yc.runtime._set_ya_config(ya=Ya()) + except ImportError: + pass + + def check_imports(no_check=(), extra=(), skip_func=None, py_main=None): """ tests all bundled modules are importable @@ -18,6 +27,7 @@ def check_imports(no_check=(), extra=(), skip_func=None, py_main=None): "PEERDIR(library/python/import_test)" to your CMakeLists.txt and "from import_test import test_imports" to your python test source file. """ + str_ = lambda s: s if not isinstance(b'', str): str_ = lambda s: s.decode('UTF-8') @@ -99,6 +109,8 @@ test_imports = check_imports def main(): + setup_test_environment() + skip_names = sys.argv[1:] try: diff --git a/library/python/testing/recipe/__init__.py b/library/python/testing/recipe/__init__.py index 5ef9c5c189..5f48628ad3 100644 --- a/library/python/testing/recipe/__init__.py +++ b/library/python/testing/recipe/__init__.py @@ -8,6 +8,8 @@ import argparse from yatest_lib.ya import Ya +import yatest.common as yc + RECIPE_START_OPTION = "start" RECIPE_STOP_OPTION = "stop" @@ -16,6 +18,13 @@ collect_cores = None sanitizer_extra_checks = None +class Config: + def __init__(self): + self.ya = ya + self.collect_cores = collect_cores + self.sanitizer_extra_checks = sanitizer_extra_checks + + def _setup_logging(level=logging.DEBUG): root_logger = logging.getLogger() root_logger.setLevel(level) @@ -60,6 +69,7 @@ def get_options(): os.environ[envvar] = os.environ[envvar + '_ORIGINAL'] collect_cores = args.collect_cores + yc.runtime._set_ya_config(config=Config()) for recipe_option in RECIPE_START_OPTION, RECIPE_STOP_OPTION: if recipe_option in opts: return args, opts[opts.index(recipe_option):] diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py index 6e6c3c8759..b8d5964d1d 100644 --- a/library/python/testing/yatest_common/yatest/common/runtime.py +++ b/library/python/testing/yatest_common/yatest/common/runtime.py @@ -9,22 +9,30 @@ import six _lock = threading.Lock() +_config = None + + +def _set_ya_config(config=None, ya=None): + global _config + if config: + _config = config + elif ya: + class Config: + def __init__(self): + self.ya = None + _config = Config() + _config.ya = ya + def _get_ya_config(): - try: - import library.python.pytest.plugins.ya as ya_plugin - if ya_plugin.pytest_config is not None: - return ya_plugin.pytest_config - import pytest - return pytest.config - except (ImportError, AttributeError): + if _config: + return _config + else: try: - import library.python.testing.recipe - if library.python.testing.recipe.ya: - return library.python.testing.recipe + import pytest + return pytest.config except (ImportError, AttributeError): - pass - raise NotImplementedError("yatest.common.* is only available from the testing runtime") + raise NotImplementedError("yatest.common.* is only available from the testing runtime") def _get_ya_plugin_instance(): diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 5fc0856fc5..44c76870dc 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -301,6 +301,7 @@ enum EServiceKikimr { YQ_AUDIT = 1150; YQ_AUDIT_EVENT_SENDER = 1151; YQ_HEALTH = 1152; + YQ_RATE_LIMITER = 1155; FQ_INTERNAL_SERVICE = 1153; FQ_QUOTA_SERVICE = 1154; diff --git a/ydb/core/yq/libs/config/protos/CMakeLists.txt b/ydb/core/yq/libs/config/protos/CMakeLists.txt index ce03e7b21f..c399db4b32 100644 --- a/ydb/core/yq/libs/config/protos/CMakeLists.txt +++ b/ydb/core/yq/libs/config/protos/CMakeLists.txt @@ -33,6 +33,7 @@ target_proto_messages(libs-config-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/private_api.proto ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/private_proxy.proto ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/quotas_manager.proto + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/rate_limiter.proto ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/read_actors_factory.proto ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/resource_manager.proto ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/storage.proto diff --git a/ydb/core/yq/libs/config/protos/rate_limiter.proto b/ydb/core/yq/libs/config/protos/rate_limiter.proto new file mode 100644 index 0000000000..3a82632b30 --- /dev/null +++ b/ydb/core/yq/libs/config/protos/rate_limiter.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package NYq.NConfig; +option java_package = "ru.yandex.kikimr.proto"; + +import "ydb/core/yq/libs/config/protos/storage.proto"; + +//////////////////////////////////////////////////////////// + +message TLimiter { + string CoordinationNodePath = 1; +} + +message TRateLimiterConfig { + bool Enabled = 1; + bool ControlPlaneEnabled = 2; + bool DataPlaneEnabled = 3; + + NYq.NConfig.TYdbStorageConfig Database = 4; + repeated TLimiter Limiters = 5; +} diff --git a/ydb/core/yq/libs/config/protos/yq_config.proto b/ydb/core/yq/libs/config/protos/yq_config.proto index 74344d2ccf..c7c7deb102 100644 --- a/ydb/core/yq/libs/config/protos/yq_config.proto +++ b/ydb/core/yq/libs/config/protos/yq_config.proto @@ -18,6 +18,7 @@ import "ydb/core/yq/libs/config/protos/pinger.proto"; import "ydb/core/yq/libs/config/protos/private_api.proto"; import "ydb/core/yq/libs/config/protos/private_proxy.proto"; import "ydb/core/yq/libs/config/protos/quotas_manager.proto"; +import "ydb/core/yq/libs/config/protos/rate_limiter.proto"; import "ydb/core/yq/libs/config/protos/read_actors_factory.proto"; import "ydb/core/yq/libs/config/protos/resource_manager.proto"; import "ydb/core/yq/libs/config/protos/test_connection.proto"; @@ -48,4 +49,5 @@ message TConfig { TReadActorsFactoryConfig ReadActorsFactoryConfig = 19; THealthConfig Health = 20; TQuotasManagerConfig QuotasManager = 21; + TRateLimiterConfig RateLimiter = 22; } diff --git a/ydb/core/yq/libs/init/CMakeLists.txt b/ydb/core/yq/libs/init/CMakeLists.txt index 272bb515f0..e052e99586 100644 --- a/ydb/core/yq/libs/init/CMakeLists.txt +++ b/ydb/core/yq/libs/init/CMakeLists.txt @@ -29,6 +29,7 @@ target_link_libraries(yq-libs-init PUBLIC yq-libs-gateway yq-libs-health yq-libs-quota_manager + libs-rate_limiter-control_plane_service yq-libs-shared_resources yq-libs-test_connection ydb-library-folder_service diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 15a0524d12..0f1431b575 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -11,6 +11,7 @@ #include <ydb/core/yq/libs/private_client/internal_service.h> #include <ydb/core/yq/libs/private_client/loopback_service.h> #include <ydb/core/yq/libs/quota_manager/quota_manager.h> +#include <ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h> #include <ydb/core/yq/libs/shared_resources/shared_resources.h> #include <ydb/library/folder_service/folder_service.h> #include <ydb/library/yql/providers/common/metrics/service_counters.h> @@ -84,6 +85,11 @@ void Init( actorRegistrator(NYq::ControlPlaneProxyActorId(), controlPlaneProxy); } + if (protoConfig.GetRateLimiter().GetControlPlaneEnabled()) { + NActors::IActor* rateLimiterService = NYq::CreateRateLimiterControlPlaneService(protoConfig.GetRateLimiter(), yqSharedResources, credentialsProviderFactory); + actorRegistrator(NYq::RateLimiterControlPlaneServiceId(), rateLimiterService); + } + if (protoConfig.GetAudit().GetEnabled()) { auto* auditSerive = auditServiceFactory( protoConfig.GetAudit(), diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt new file mode 100644 index 0000000000..da96f328b5 --- /dev/null +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt @@ -0,0 +1,27 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-rate_limiter-control_plane_service) +target_compile_options(libs-rate_limiter-control_plane_service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-rate_limiter-control_plane_service PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-protos + libs-config-protos + yq-libs-events + yq-libs-shared_resources + yq-libs-ydb + ydb-library-security +) +target_sources(libs-rate_limiter-control_plane_service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp +) diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp new file mode 100644 index 0000000000..d52cb03cb3 --- /dev/null +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp @@ -0,0 +1,128 @@ +#include "rate_limiter_control_plane_service.h" + +#include <ydb/core/protos/services.pb.h> +#include <ydb/core/yq/libs/events/events.h> +#include <ydb/core/yq/libs/ydb/create_schema.h> +#include <ydb/core/yq/libs/ydb/util.h> +#include <ydb/core/yq/libs/ydb/ydb.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +#define LOG_D(stream) LOG_DEBUG_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream) +#define LOG_I(stream) LOG_INFO_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream) +#define LOG_W(stream) LOG_WARN_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream) +#define LOG_E(stream) LOG_ERROR_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream) + +namespace NYq { + +class TRateLimiterControlPlaneService : public NActors::TActorBootstrapped<TRateLimiterControlPlaneService> { +public: + static constexpr char ActorName[] = "YQ_RATE_LIMITER_CONTROL_PLANE"; + + TRateLimiterControlPlaneService( + const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, + const NYq::TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) + : Config(rateLimiterConfig) + , YqSharedResources(yqSharedResources) + , CredProviderFactory(credentialsProviderFactory) + { + } + + void Bootstrap() { + Y_VERIFY(Config.GetControlPlaneEnabled()); + if (!Config.GetEnabled()) { + Become(&TRateLimiterControlPlaneService::RateLimiterOffStateFunc); + return; + } + + StartInit(); + } + + static ERetryErrorClass RetryFunc(const NYdb::TStatus& status) { + return status.GetStatus() == NYdb::EStatus::OVERLOADED ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry; + } + + TYdbSdkRetryPolicy::TPtr MakeCreateSchemaRetryPolicy() { + return TYdbSdkRetryPolicy::GetExponentialBackoffPolicy(RetryFunc, TDuration::MilliSeconds(10), TDuration::Seconds(1), TDuration::Seconds(5)); + } + + void RunCreateCoordinationNodeActor(const TString& path) { + Register(MakeCreateCoordinationNodeActor(SelfId(), NKikimrServices::YQ_RATE_LIMITER, YdbConnection, path, MakeCreateSchemaRetryPolicy())); + ++CreatingCoordinationNodes; + } + + void StartInit() { + Become(&TRateLimiterControlPlaneService::InitStateFunc); + + YdbConnection = NewYdbConnection(Config.GetDatabase(), CredProviderFactory, YqSharedResources->CoreYdbDriver); + + for (const auto& limiterConfig : Config.GetLimiters()) { + auto coordinationNodePath = JoinPath(YdbConnection->TablePathPrefix, limiterConfig.GetCoordinationNodePath()); + RunCreateCoordinationNodeActor(coordinationNodePath); + } + + TryStartWorking(); + } + + void TryStartWorking() { + Y_VERIFY(CreatingCoordinationNodes >= 0); + if (CreatingCoordinationNodes > 0) { + return; + } + + Become(&TRateLimiterControlPlaneService::WorkingStateFunc); + + // Start processing deferred queries + } + + void HandleInit(TEvents::TEvSchemaCreated::TPtr&) { + --CreatingCoordinationNodes; + + TryStartWorking(); + } + + // State func that does nothing. Rate limiting is turned off. + // Answers "OK" responses and does nothing. + STRICT_STFUNC( + RateLimiterOffStateFunc, + ) + + // State func that inits limiters. + // Puts all incoming requests to queue. + STRICT_STFUNC( + InitStateFunc, + hFunc(TEvents::TEvSchemaCreated, HandleInit); + ) + + // Working + STRICT_STFUNC( + WorkingStateFunc, + ) + +private: + const NYq::NConfig::TRateLimiterConfig Config; + const NYq::TYqSharedResources::TPtr YqSharedResources; + NKikimr::TYdbCredentialsProviderFactory CredProviderFactory; + TYdbConnectionPtr YdbConnection; + + // Init + size_t CreatingCoordinationNodes = 0; +}; + +NActors::TActorId RateLimiterControlPlaneServiceId() { + constexpr TStringBuf name = "RATE_LIM_CP"; + return NActors::TActorId(0, name); +} + +NActors::IActor* CreateRateLimiterControlPlaneService( + const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, + const NYq::TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) +{ + return new TRateLimiterControlPlaneService(rateLimiterConfig, yqSharedResources, credentialsProviderFactory); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h new file mode 100644 index 0000000000..96a9d34715 --- /dev/null +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h @@ -0,0 +1,18 @@ +#pragma once +#include <ydb/core/yq/libs/config/protos/rate_limiter.pb.h> +#include <ydb/core/yq/libs/shared_resources/shared_resources.h> +#include <ydb/library/security/ydb_credentials_provider_factory.h> + +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/actor.h> + +namespace NYq { + +NActors::TActorId RateLimiterControlPlaneServiceId(); + +NActors::IActor* CreateRateLimiterControlPlaneService( + const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, + const NYq::TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory); + +} // namespace NYq diff --git a/ydb/core/yq/libs/ydb/CMakeLists.txt b/ydb/core/yq/libs/ydb/CMakeLists.txt index 3e9c460c51..e3773d6dc7 100644 --- a/ydb/core/yq/libs/ydb/CMakeLists.txt +++ b/ydb/core/yq/libs/ydb/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(yq-libs-ydb PUBLIC yq-libs-config yq-libs-events ydb-library-security + cpp-client-ydb_coordination cpp-client-ydb_scheme cpp-client-ydb_table tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/yq/libs/ydb/create_schema.cpp b/ydb/core/yq/libs/ydb/create_schema.cpp index a0ce2efe22..675bc474e8 100644 --- a/ydb/core/yq/libs/ydb/create_schema.cpp +++ b/ydb/core/yq/libs/ydb/create_schema.cpp @@ -251,6 +251,34 @@ private: const TString DirectoryPath; }; +class TCreateCoordinationNodeActor : public TCreateActorBase { +public: + TCreateCoordinationNodeActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) + : TCreateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie) + , CoordinationNodePath(coordinationNodePath) + { + } + +private: + TString GetEntityName() const override { + return TStringBuilder() << "coordination node \"" << CoordinationNodePath << "\""; + } + + NYdb::TAsyncStatus CallYdbSdk() override { + SCHEMA_LOG_DEBUG("Call create coordination node \"" << CoordinationNodePath << "\""); + return Connection->CoordinationClient.CreateNode(CoordinationNodePath); + } + +private: + const TString CoordinationNodePath; +}; + NActors::IActor* MakeCreateTableActor( NActors::TActorId parent, ui64 logComponent, @@ -289,4 +317,22 @@ NActors::IActor* MakeCreateDirectoryActor( ); } +NActors::IActor* MakeCreateCoordinationNodeActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) +{ + return new TCreateCoordinationNodeActor( + parent, + logComponent, + std::move(connection), + coordinationNodePath, + std::move(retryPolicy), + cookie + ); +} + } // namespace NYq diff --git a/ydb/core/yq/libs/ydb/create_schema.h b/ydb/core/yq/libs/ydb/create_schema.h index b05c0ec01b..d390431caf 100644 --- a/ydb/core/yq/libs/ydb/create_schema.h +++ b/ydb/core/yq/libs/ydb/create_schema.h @@ -30,4 +30,14 @@ NActors::IActor* MakeCreateDirectoryActor( TYdbSdkRetryPolicy::TPtr retryPolicy, ui64 cookie = 0); +// Actor that creates coordination node. +// Send TEvSchemaCreated to parent (if any). +NActors::IActor* MakeCreateCoordinationNodeActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie = 0); + } // namespace NYq diff --git a/ydb/core/yq/libs/ydb/ydb.cpp b/ydb/core/yq/libs/ydb/ydb.cpp index 190efa13df..ac791c01be 100644 --- a/ydb/core/yq/libs/ydb/ydb.cpp +++ b/ydb/core/yq/libs/ydb/ydb.cpp @@ -221,6 +221,7 @@ TYdbConnection::TYdbConnection(const NConfig::TYdbStorageConfig& config, : Driver(driver) , TableClient(Driver, GetClientSettings<NYdb::NTable::TClientSettings>(config, credProviderFactory)) , SchemeClient(Driver, GetClientSettings<NYdb::TCommonClientSettings>(config, credProviderFactory)) + , CoordinationClient(Driver, GetClientSettings<NYdb::TCommonClientSettings>(config, credProviderFactory)) , DB(config.GetDatabase()) , TablePathPrefix(JoinPath(DB, config.GetTablePrefix())) { diff --git a/ydb/core/yq/libs/ydb/ydb.h b/ydb/core/yq/libs/ydb/ydb.h index 9675dd8181..6903722e6c 100644 --- a/ydb/core/yq/libs/ydb/ydb.h +++ b/ydb/core/yq/libs/ydb/ydb.h @@ -3,6 +3,7 @@ #include <ydb/library/security/ydb_credentials_provider_factory.h> #include <ydb/core/yq/libs/config/protos/storage.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_coordination/coordination.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> @@ -14,6 +15,7 @@ struct TYdbConnection : public TThrRefBase { NYdb::TDriver Driver; NYdb::NTable::TTableClient TableClient; NYdb::NScheme::TSchemeClient SchemeClient; + NYdb::NCoordination::TClient CoordinationClient; const TString DB; const TString TablePathPrefix; |