diff options
| author | galaxycrab <[email protected]> | 2022-07-08 10:56:19 +0300 | 
|---|---|---|
| committer | galaxycrab <[email protected]> | 2022-07-08 10:56:19 +0300 | 
| commit | 3f5056deffaa871aa49c77d6bc14d0c49a6b60c2 (patch) | |
| tree | fd0dbf540b1ea0a38fc5184ec9fccfe1e2c4b37f | |
| parent | d783e97de9fbfbbcec1ff6ef52c8c2515d9b3576 (diff) | |
Create kesuses for rate limiting in YQ
Fill config in kikimr runner
Create coordination nodes
Pass to config
Protos
24 files changed, 353 insertions, 35 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 9b6aa23a808..4a495cabf16 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -752,8 +752,9 @@ add_subdirectory(ydb/core/yq/libs/ydb)  add_subdirectory(library/cpp/retry)  add_subdirectory(library/cpp/retry/protos)  add_subdirectory(ydb/library/security) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)  add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme)  add_subdirectory(ydb/core/yq/libs/db_schema)  add_subdirectory(ydb/core/yq/libs/shared_resources)  add_subdirectory(ydb/core/yq/libs/shared_resources/interface) @@ -891,6 +892,7 @@ add_subdirectory(ydb/core/yq/libs/tasks_packer)  add_subdirectory(ydb/core/yq/libs/health)  add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)  add_subdirectory(ydb/core/yq/libs/quota_manager) +add_subdirectory(ydb/core/yq/libs/rate_limiter/control_plane_service)  add_subdirectory(ydb/core/yq/libs/test_connection)  add_subdirectory(ydb/core/yq/libs/test_connection/events)  add_subdirectory(ydb/library/yql/providers/solomon/async_io) @@ -1087,7 +1089,6 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)  add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl)  add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut)  add_subdirectory(ydb/services/rate_limiter/ut) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)  add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter)  add_subdirectory(ydb/services/ydb/index_ut)  add_subdirectory(ydb/services/ydb/sdk_credprovider_ut) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 81e3793715d..1e8c6079cc3 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -756,8 +756,9 @@ add_subdirectory(ydb/core/yq/libs/ydb)  add_subdirectory(library/cpp/retry)  add_subdirectory(library/cpp/retry/protos)  add_subdirectory(ydb/library/security) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)  add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme)  add_subdirectory(ydb/core/yq/libs/db_schema)  add_subdirectory(ydb/core/yq/libs/shared_resources)  add_subdirectory(ydb/core/yq/libs/shared_resources/interface) @@ -895,6 +896,7 @@ add_subdirectory(ydb/core/yq/libs/tasks_packer)  add_subdirectory(ydb/core/yq/libs/health)  add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)  add_subdirectory(ydb/core/yq/libs/quota_manager) +add_subdirectory(ydb/core/yq/libs/rate_limiter/control_plane_service)  add_subdirectory(ydb/core/yq/libs/test_connection)  add_subdirectory(ydb/core/yq/libs/test_connection/events)  add_subdirectory(ydb/library/yql/providers/solomon/async_io) @@ -1107,7 +1109,6 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)  add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl)  add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut)  add_subdirectory(ydb/services/rate_limiter/ut) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)  add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter)  add_subdirectory(ydb/services/ydb/index_ut)  add_subdirectory(ydb/services/ydb/sdk_credprovider_ut) diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 318000389c0..295b8f97b5c 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -54,6 +54,27 @@ struct TEqNoCase {      }  }; +struct TSensors { +    TString Direction; +    TString Host; +    TString Url; +    TString Status; +    TDuration Time; + +    TSensors( +            TStringBuf direction, +            TStringBuf host, +            TStringBuf url, +            TStringBuf status, +            TDuration time) +        : Direction(direction) +        , Host(host) +        , Url(url) +        , Status(status) +        , Time(time) +    {} +}; +  struct TUrlParameters {      THashMap<TStringBuf, TStringBuf> Parameters; @@ -840,6 +861,7 @@ public:  // it's temporary accessible for cleanup  //protected:      THttpIncomingRequestPtr Request; +    std::unique_ptr<TSensors> Sensors;  };  } diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp index 394cb17bd3a..7cf83d38d88 100644 --- a/library/cpp/actors/http/http_proxy.cpp +++ b/library/cpp/actors/http/http_proxy.cpp @@ -260,6 +260,10 @@ TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingR  }  TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) { +    const auto& sensors = response->Sensors; +    if (sensors) { +        return new TEvHttpProxy::TEvReportSensors(*sensors); +    }      return new TEvHttpProxy::TEvReportSensors(              "in",              request->Host, diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h index b95954e5614..0ed09119e43 100644 --- a/library/cpp/actors/http/http_proxy.h +++ b/library/cpp/actors/http/http_proxy.h @@ -20,7 +20,7 @@ struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig {      SocketType Socket;      TSocketDescriptor() = default; -     +      TSocketDescriptor(int af)          : Socket(af)      { @@ -213,24 +213,11 @@ struct TEvHttpProxy {          {}      }; -    struct TEvReportSensors : NActors::TEventLocal<TEvReportSensors, EvReportSensors> { -        TString Direction; -        TString Host; -        TString Url; -        TString Status; -        TDuration Time; - -        TEvReportSensors( -                TStringBuf direction, -                TStringBuf host, -                TStringBuf url, -                TStringBuf status, -                TDuration time) -            : Direction(direction) -            , Host(host) -            , Url(url) -            , Status(status) -            , Time(time) +    struct TEvReportSensors : TSensors, NActors::TEventLocal<TEvReportSensors, EvReportSensors> { +        using TSensors::TSensors; + +        TEvReportSensors(const TSensors& sensors) +            : TSensors(sensors)          {}      };  }; diff --git a/library/python/pytest/plugins/conftests.py b/library/python/pytest/plugins/conftests.py index 3c855c9c113..2ea36ae4c21 100644 --- a/library/python/pytest/plugins/conftests.py +++ b/library/python/pytest/plugins/conftests.py @@ -3,7 +3,10 @@ import importlib  import sys  import inspect +import yatest.common as yc +  from pytest import hookimpl +from yatest_lib.ya import Ya  from .fixtures import metrics, links  # noqa @@ -23,6 +26,7 @@ conftest_modules = []  @hookimpl(trylast=True)  def pytest_load_initial_conftests(early_config, parser, args): +    yc.runtime._set_ya_config(ya=Ya())      conftests = filter(lambda name: name.endswith(".conftest"), sys.extra_modules)      def conftest_key(name): diff --git a/library/python/pytest/plugins/ya.py b/library/python/pytest/plugins/ya.py index 5b48ab775df..715d24ce5b9 100644 --- a/library/python/pytest/plugins/ya.py +++ b/library/python/pytest/plugins/ya.py @@ -30,6 +30,7 @@ import _pytest.skipping  from _pytest.warning_types import PytestUnhandledCoroutineWarning  from yatest_lib import test_splitter +import yatest.common as yatest_common  try:      import resource @@ -283,6 +284,8 @@ def pytest_configure(config):      if config.option.pdb_on_sigusr1:          configure_pdb_on_demand() +    yatest_common.runtime._set_ya_config(config=config) +      # Dump python backtrace in case of any errors      faulthandler.enable()      if hasattr(signal, "SIGQUIT"): diff --git a/library/python/testing/import_test/import_test.py b/library/python/testing/import_test/import_test.py index 5290ae3cae5..114723a4c6a 100644 --- a/library/python/testing/import_test/import_test.py +++ b/library/python/testing/import_test/import_test.py @@ -11,6 +11,15 @@ import __res  from __res import importer +def setup_test_environment(): +    try: +        from yatest_lib.ya import Ya +        import yatest.common as yc +        yc.runtime._set_ya_config(ya=Ya()) +    except ImportError: +        pass + +  def check_imports(no_check=(), extra=(), skip_func=None, py_main=None):      """      tests all bundled modules are importable @@ -18,6 +27,7 @@ def check_imports(no_check=(), extra=(), skip_func=None, py_main=None):      "PEERDIR(library/python/import_test)" to your CMakeLists.txt and      "from import_test import test_imports" to your python test source file.      """ +      str_ = lambda s: s      if not isinstance(b'', str):          str_ = lambda s: s.decode('UTF-8') @@ -99,6 +109,8 @@ test_imports = check_imports  def main(): +    setup_test_environment() +      skip_names = sys.argv[1:]      try: diff --git a/library/python/testing/recipe/__init__.py b/library/python/testing/recipe/__init__.py index 5ef9c5c1895..5f48628ad38 100644 --- a/library/python/testing/recipe/__init__.py +++ b/library/python/testing/recipe/__init__.py @@ -8,6 +8,8 @@ import argparse  from yatest_lib.ya import Ya +import yatest.common as yc +  RECIPE_START_OPTION = "start"  RECIPE_STOP_OPTION = "stop" @@ -16,6 +18,13 @@ collect_cores = None  sanitizer_extra_checks = None +class Config: +    def __init__(self): +        self.ya = ya +        self.collect_cores = collect_cores +        self.sanitizer_extra_checks = sanitizer_extra_checks + +  def _setup_logging(level=logging.DEBUG):      root_logger = logging.getLogger()      root_logger.setLevel(level) @@ -60,6 +69,7 @@ def get_options():                  os.environ[envvar] = os.environ[envvar + '_ORIGINAL']      collect_cores = args.collect_cores +    yc.runtime._set_ya_config(config=Config())      for recipe_option in RECIPE_START_OPTION, RECIPE_STOP_OPTION:          if recipe_option in opts:              return args, opts[opts.index(recipe_option):] diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py index 6e6c3c87591..b8d5964d1d2 100644 --- a/library/python/testing/yatest_common/yatest/common/runtime.py +++ b/library/python/testing/yatest_common/yatest/common/runtime.py @@ -9,22 +9,30 @@ import six  _lock = threading.Lock() +_config = None + + +def _set_ya_config(config=None, ya=None): +    global _config +    if config: +        _config = config +    elif ya: +        class Config: +            def __init__(self): +                self.ya = None +        _config = Config() +        _config.ya = ya +  def _get_ya_config(): -    try: -        import library.python.pytest.plugins.ya as ya_plugin -        if ya_plugin.pytest_config is not None: -            return ya_plugin.pytest_config -        import pytest -        return pytest.config -    except (ImportError, AttributeError): +    if _config: +        return _config +    else:          try: -            import library.python.testing.recipe -            if library.python.testing.recipe.ya: -                return library.python.testing.recipe +            import pytest +            return pytest.config          except (ImportError, AttributeError): -            pass -        raise NotImplementedError("yatest.common.* is only available from the testing runtime") +            raise NotImplementedError("yatest.common.* is only available from the testing runtime")  def _get_ya_plugin_instance(): diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 5fc0856fc57..44c76870dc4 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -301,6 +301,7 @@ enum EServiceKikimr {      YQ_AUDIT = 1150;      YQ_AUDIT_EVENT_SENDER = 1151;      YQ_HEALTH = 1152; +    YQ_RATE_LIMITER = 1155;      FQ_INTERNAL_SERVICE = 1153;      FQ_QUOTA_SERVICE = 1154; diff --git a/ydb/core/yq/libs/config/protos/CMakeLists.txt b/ydb/core/yq/libs/config/protos/CMakeLists.txt index ce03e7b21f1..c399db4b329 100644 --- a/ydb/core/yq/libs/config/protos/CMakeLists.txt +++ b/ydb/core/yq/libs/config/protos/CMakeLists.txt @@ -33,6 +33,7 @@ target_proto_messages(libs-config-protos PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/private_api.proto    ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/private_proxy.proto    ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/quotas_manager.proto +  ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/rate_limiter.proto    ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/read_actors_factory.proto    ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/resource_manager.proto    ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/config/protos/storage.proto diff --git a/ydb/core/yq/libs/config/protos/rate_limiter.proto b/ydb/core/yq/libs/config/protos/rate_limiter.proto new file mode 100644 index 00000000000..3a82632b308 --- /dev/null +++ b/ydb/core/yq/libs/config/protos/rate_limiter.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package NYq.NConfig; +option java_package = "ru.yandex.kikimr.proto"; + +import "ydb/core/yq/libs/config/protos/storage.proto"; + +//////////////////////////////////////////////////////////// + +message TLimiter { +    string CoordinationNodePath = 1; +} + +message TRateLimiterConfig { +    bool Enabled = 1; +    bool ControlPlaneEnabled = 2; +    bool DataPlaneEnabled = 3; + +    NYq.NConfig.TYdbStorageConfig Database = 4; +    repeated TLimiter Limiters = 5; +} diff --git a/ydb/core/yq/libs/config/protos/yq_config.proto b/ydb/core/yq/libs/config/protos/yq_config.proto index 74344d2ccf5..c7c7deb1024 100644 --- a/ydb/core/yq/libs/config/protos/yq_config.proto +++ b/ydb/core/yq/libs/config/protos/yq_config.proto @@ -18,6 +18,7 @@ import "ydb/core/yq/libs/config/protos/pinger.proto";  import "ydb/core/yq/libs/config/protos/private_api.proto";  import "ydb/core/yq/libs/config/protos/private_proxy.proto";  import "ydb/core/yq/libs/config/protos/quotas_manager.proto"; +import "ydb/core/yq/libs/config/protos/rate_limiter.proto";  import "ydb/core/yq/libs/config/protos/read_actors_factory.proto";  import "ydb/core/yq/libs/config/protos/resource_manager.proto";  import "ydb/core/yq/libs/config/protos/test_connection.proto"; @@ -48,4 +49,5 @@ message TConfig {      TReadActorsFactoryConfig ReadActorsFactoryConfig = 19;      THealthConfig Health = 20;      TQuotasManagerConfig QuotasManager = 21; +    TRateLimiterConfig RateLimiter = 22;  } diff --git a/ydb/core/yq/libs/init/CMakeLists.txt b/ydb/core/yq/libs/init/CMakeLists.txt index 272bb515f00..e052e995862 100644 --- a/ydb/core/yq/libs/init/CMakeLists.txt +++ b/ydb/core/yq/libs/init/CMakeLists.txt @@ -29,6 +29,7 @@ target_link_libraries(yq-libs-init PUBLIC    yq-libs-gateway    yq-libs-health    yq-libs-quota_manager +  libs-rate_limiter-control_plane_service    yq-libs-shared_resources    yq-libs-test_connection    ydb-library-folder_service diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 15a0524d12d..0f1431b5759 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -11,6 +11,7 @@  #include <ydb/core/yq/libs/private_client/internal_service.h>  #include <ydb/core/yq/libs/private_client/loopback_service.h>  #include <ydb/core/yq/libs/quota_manager/quota_manager.h> +#include <ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h>  #include <ydb/core/yq/libs/shared_resources/shared_resources.h>  #include <ydb/library/folder_service/folder_service.h>  #include <ydb/library/yql/providers/common/metrics/service_counters.h> @@ -84,6 +85,11 @@ void Init(          actorRegistrator(NYq::ControlPlaneProxyActorId(), controlPlaneProxy);      } +    if (protoConfig.GetRateLimiter().GetControlPlaneEnabled()) { +        NActors::IActor* rateLimiterService = NYq::CreateRateLimiterControlPlaneService(protoConfig.GetRateLimiter(), yqSharedResources, credentialsProviderFactory); +        actorRegistrator(NYq::RateLimiterControlPlaneServiceId(), rateLimiterService); +    } +      if (protoConfig.GetAudit().GetEnabled()) {          auto* auditSerive = auditServiceFactory(              protoConfig.GetAudit(), diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt new file mode 100644 index 00000000000..da96f328b5e --- /dev/null +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt @@ -0,0 +1,27 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-rate_limiter-control_plane_service) +target_compile_options(libs-rate_limiter-control_plane_service PRIVATE +  -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-rate_limiter-control_plane_service PUBLIC +  contrib-libs-cxxsupp +  yutil +  cpp-actors-core +  ydb-core-protos +  libs-config-protos +  yq-libs-events +  yq-libs-shared_resources +  yq-libs-ydb +  ydb-library-security +) +target_sources(libs-rate_limiter-control_plane_service PRIVATE +  ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp +) diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp new file mode 100644 index 00000000000..d52cb03cb39 --- /dev/null +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp @@ -0,0 +1,128 @@ +#include "rate_limiter_control_plane_service.h" + +#include <ydb/core/protos/services.pb.h> +#include <ydb/core/yq/libs/events/events.h> +#include <ydb/core/yq/libs/ydb/create_schema.h> +#include <ydb/core/yq/libs/ydb/util.h> +#include <ydb/core/yq/libs/ydb/ydb.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +#define LOG_D(stream) LOG_DEBUG_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream) +#define LOG_I(stream) LOG_INFO_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream) +#define LOG_W(stream) LOG_WARN_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream) +#define LOG_E(stream) LOG_ERROR_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQ_RATE_LIMITER, stream) + +namespace NYq { + +class TRateLimiterControlPlaneService : public NActors::TActorBootstrapped<TRateLimiterControlPlaneService> { +public: +    static constexpr char ActorName[] = "YQ_RATE_LIMITER_CONTROL_PLANE"; + +    TRateLimiterControlPlaneService( +        const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, +        const NYq::TYqSharedResources::TPtr& yqSharedResources, +        const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) +        : Config(rateLimiterConfig) +        , YqSharedResources(yqSharedResources) +        , CredProviderFactory(credentialsProviderFactory) +    { +    } + +    void Bootstrap() { +        Y_VERIFY(Config.GetControlPlaneEnabled()); +        if (!Config.GetEnabled()) { +            Become(&TRateLimiterControlPlaneService::RateLimiterOffStateFunc); +            return; +        } + +        StartInit(); +    } + +    static ERetryErrorClass RetryFunc(const NYdb::TStatus& status) { +        return status.GetStatus() == NYdb::EStatus::OVERLOADED ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry; +    } + +    TYdbSdkRetryPolicy::TPtr MakeCreateSchemaRetryPolicy() { +        return TYdbSdkRetryPolicy::GetExponentialBackoffPolicy(RetryFunc, TDuration::MilliSeconds(10), TDuration::Seconds(1), TDuration::Seconds(5)); +    } + +    void RunCreateCoordinationNodeActor(const TString& path) { +        Register(MakeCreateCoordinationNodeActor(SelfId(), NKikimrServices::YQ_RATE_LIMITER, YdbConnection, path, MakeCreateSchemaRetryPolicy())); +        ++CreatingCoordinationNodes; +    } + +    void StartInit() { +        Become(&TRateLimiterControlPlaneService::InitStateFunc); + +        YdbConnection = NewYdbConnection(Config.GetDatabase(), CredProviderFactory, YqSharedResources->CoreYdbDriver); + +        for (const auto& limiterConfig : Config.GetLimiters()) { +            auto coordinationNodePath = JoinPath(YdbConnection->TablePathPrefix, limiterConfig.GetCoordinationNodePath()); +            RunCreateCoordinationNodeActor(coordinationNodePath); +        } + +        TryStartWorking(); +    } + +    void TryStartWorking() { +        Y_VERIFY(CreatingCoordinationNodes >= 0); +        if (CreatingCoordinationNodes > 0) { +            return; +        } + +        Become(&TRateLimiterControlPlaneService::WorkingStateFunc); + +        // Start processing deferred queries +    } + +    void HandleInit(TEvents::TEvSchemaCreated::TPtr&) { +        --CreatingCoordinationNodes; + +        TryStartWorking(); +    } + +    // State func that does nothing. Rate limiting is turned off. +    // Answers "OK" responses and does nothing. +    STRICT_STFUNC( +        RateLimiterOffStateFunc, +    ) + +    // State func that inits limiters. +    // Puts all incoming requests to queue. +    STRICT_STFUNC( +        InitStateFunc, +        hFunc(TEvents::TEvSchemaCreated, HandleInit); +    ) + +    // Working +    STRICT_STFUNC( +        WorkingStateFunc, +    ) + +private: +    const NYq::NConfig::TRateLimiterConfig Config; +    const NYq::TYqSharedResources::TPtr YqSharedResources; +    NKikimr::TYdbCredentialsProviderFactory CredProviderFactory; +    TYdbConnectionPtr YdbConnection; + +    // Init +    size_t CreatingCoordinationNodes = 0; +}; + +NActors::TActorId RateLimiterControlPlaneServiceId() { +    constexpr TStringBuf name = "RATE_LIM_CP"; +    return NActors::TActorId(0, name); +} + +NActors::IActor* CreateRateLimiterControlPlaneService( +    const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, +    const NYq::TYqSharedResources::TPtr& yqSharedResources, +    const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) +{ +    return new TRateLimiterControlPlaneService(rateLimiterConfig, yqSharedResources, credentialsProviderFactory); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h new file mode 100644 index 00000000000..96a9d347151 --- /dev/null +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.h @@ -0,0 +1,18 @@ +#pragma once +#include <ydb/core/yq/libs/config/protos/rate_limiter.pb.h> +#include <ydb/core/yq/libs/shared_resources/shared_resources.h> +#include <ydb/library/security/ydb_credentials_provider_factory.h> + +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/actor.h> + +namespace NYq { + +NActors::TActorId RateLimiterControlPlaneServiceId(); + +NActors::IActor* CreateRateLimiterControlPlaneService( +    const NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, +    const NYq::TYqSharedResources::TPtr& yqSharedResources, +    const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory); + +} // namespace NYq diff --git a/ydb/core/yq/libs/ydb/CMakeLists.txt b/ydb/core/yq/libs/ydb/CMakeLists.txt index 3e9c460c511..e3773d6dc73 100644 --- a/ydb/core/yq/libs/ydb/CMakeLists.txt +++ b/ydb/core/yq/libs/ydb/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(yq-libs-ydb PUBLIC    yq-libs-config    yq-libs-events    ydb-library-security +  cpp-client-ydb_coordination    cpp-client-ydb_scheme    cpp-client-ydb_table    tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/yq/libs/ydb/create_schema.cpp b/ydb/core/yq/libs/ydb/create_schema.cpp index a0ce2efe221..675bc474e88 100644 --- a/ydb/core/yq/libs/ydb/create_schema.cpp +++ b/ydb/core/yq/libs/ydb/create_schema.cpp @@ -251,6 +251,34 @@ private:      const TString DirectoryPath;  }; +class TCreateCoordinationNodeActor : public TCreateActorBase { +public: +    TCreateCoordinationNodeActor( +        NActors::TActorId parent, +        ui64 logComponent, +        TYdbConnectionPtr connection, +        const TString& coordinationNodePath, +        TYdbSdkRetryPolicy::TPtr retryPolicy, +        ui64 cookie) +        : TCreateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie) +        , CoordinationNodePath(coordinationNodePath) +    { +    } + +private: +    TString GetEntityName() const override { +        return TStringBuilder() << "coordination node \"" << CoordinationNodePath << "\""; +    } + +    NYdb::TAsyncStatus CallYdbSdk() override { +        SCHEMA_LOG_DEBUG("Call create coordination node \"" << CoordinationNodePath << "\""); +        return Connection->CoordinationClient.CreateNode(CoordinationNodePath); +    } + +private: +    const TString CoordinationNodePath; +}; +  NActors::IActor* MakeCreateTableActor(      NActors::TActorId parent,      ui64 logComponent, @@ -289,4 +317,22 @@ NActors::IActor* MakeCreateDirectoryActor(      );  } +NActors::IActor* MakeCreateCoordinationNodeActor( +    NActors::TActorId parent, +    ui64 logComponent, +    TYdbConnectionPtr connection, +    const TString& coordinationNodePath, +    TYdbSdkRetryPolicy::TPtr retryPolicy, +    ui64 cookie) +{ +    return new TCreateCoordinationNodeActor( +        parent, +        logComponent, +        std::move(connection), +        coordinationNodePath, +        std::move(retryPolicy), +        cookie +    ); +} +  } // namespace NYq diff --git a/ydb/core/yq/libs/ydb/create_schema.h b/ydb/core/yq/libs/ydb/create_schema.h index b05c0ec01b2..d390431caff 100644 --- a/ydb/core/yq/libs/ydb/create_schema.h +++ b/ydb/core/yq/libs/ydb/create_schema.h @@ -30,4 +30,14 @@ NActors::IActor* MakeCreateDirectoryActor(      TYdbSdkRetryPolicy::TPtr retryPolicy,      ui64 cookie = 0); +// Actor that creates coordination node. +// Send TEvSchemaCreated to parent (if any). +NActors::IActor* MakeCreateCoordinationNodeActor( +    NActors::TActorId parent, +    ui64 logComponent, +    TYdbConnectionPtr connection, +    const TString& coordinationNodePath, +    TYdbSdkRetryPolicy::TPtr retryPolicy, +    ui64 cookie = 0); +  } // namespace NYq diff --git a/ydb/core/yq/libs/ydb/ydb.cpp b/ydb/core/yq/libs/ydb/ydb.cpp index 190efa13df6..ac791c01beb 100644 --- a/ydb/core/yq/libs/ydb/ydb.cpp +++ b/ydb/core/yq/libs/ydb/ydb.cpp @@ -221,6 +221,7 @@ TYdbConnection::TYdbConnection(const NConfig::TYdbStorageConfig& config,      : Driver(driver)      , TableClient(Driver, GetClientSettings<NYdb::NTable::TClientSettings>(config, credProviderFactory))      , SchemeClient(Driver, GetClientSettings<NYdb::TCommonClientSettings>(config, credProviderFactory)) +    , CoordinationClient(Driver, GetClientSettings<NYdb::TCommonClientSettings>(config, credProviderFactory))      , DB(config.GetDatabase())      , TablePathPrefix(JoinPath(DB, config.GetTablePrefix()))  { diff --git a/ydb/core/yq/libs/ydb/ydb.h b/ydb/core/yq/libs/ydb/ydb.h index 9675dd81816..6903722e6c1 100644 --- a/ydb/core/yq/libs/ydb/ydb.h +++ b/ydb/core/yq/libs/ydb/ydb.h @@ -3,6 +3,7 @@  #include <ydb/library/security/ydb_credentials_provider_factory.h>  #include <ydb/core/yq/libs/config/protos/storage.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_coordination/coordination.h>  #include <ydb/public/sdk/cpp/client/ydb_table/table.h>  #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> @@ -14,6 +15,7 @@ struct TYdbConnection : public TThrRefBase {      NYdb::TDriver Driver;      NYdb::NTable::TTableClient TableClient;      NYdb::NScheme::TSchemeClient SchemeClient; +    NYdb::NCoordination::TClient CoordinationClient;      const TString DB;      const TString TablePathPrefix;  | 
