diff options
author | gvit <gvit@ydb.tech> | 2022-10-27 12:44:59 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-10-27 12:44:59 +0300 |
commit | 4581bf276d80c35411dd0c3bb19a65337f50d8fb (patch) | |
tree | 08d9479b1732926dd2de241b8135558fd51f5cb6 | |
parent | 2419d757a143493a853a1aacc3152e5682617468 (diff) | |
download | ydb-4581bf276d80c35411dd0c3bb19a65337f50d8fb.tar.gz |
cleanup endpoint publisher actor
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 16 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp | 120 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_publisher_service_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 3 |
4 files changed, 37 insertions, 108 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index ecdb212a4e..37119176f8 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1587,13 +1587,6 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se ); } - TVector<TString> rootDomains; - for (auto &domain : appData->DomainsInfo->Domains) - rootDomains.emplace_back("/" + domain.second->Name); - - const bool ignoreServeRootDomain = ScopeId.IsTenantScope(); - const bool serveRootDomain = !ignoreServeRootDomain && config.GetServeRootDomains(); - auto stringsFromProto = [](TVector<TString>& vec, const auto& proto) { if (!proto.empty()) { vec.reserve(proto.size()); @@ -1614,8 +1607,6 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se stringsFromProto(desc->AddressesV4, config.GetPublicAddressesV4()); stringsFromProto(desc->AddressesV6, config.GetPublicAddressesV6()); - if (serveRootDomain) - desc->ServedDatabases.insert(desc->ServedDatabases.end(), rootDomains.begin(), rootDomains.end()); desc->ServedServices.insert(desc->ServedServices.end(), config.GetServices().begin(), config.GetServices().end()); endpoints.push_back(std::move(desc)); } @@ -1630,9 +1621,6 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se stringsFromProto(desc->AddressesV6, config.GetPublicAddressesV6()); desc->TargetNameOverride = config.GetPublicTargetNameOverride(); - if (serveRootDomain) - desc->ServedDatabases.insert(desc->ServedDatabases.end(), rootDomains.begin(), rootDomains.end()); - desc->ServedServices.insert(desc->ServedServices.end(), config.GetServices().begin(), config.GetServices().end()); endpoints.push_back(std::move(desc)); } @@ -1648,8 +1636,6 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se stringsFromProto(desc->AddressesV4, sx.GetPublicAddressesV4()); stringsFromProto(desc->AddressesV6, sx.GetPublicAddressesV6()); - if (serveRootDomain) - desc->ServedDatabases.insert(desc->ServedDatabases.end(), rootDomains.begin(), rootDomains.end()); desc->ServedServices.insert(desc->ServedServices.end(), sx.GetServices().begin(), sx.GetServices().end()); endpoints.push_back(std::move(desc)); } @@ -1664,8 +1650,6 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se stringsFromProto(desc->AddressesV6, sx.GetPublicAddressesV6()); desc->TargetNameOverride = sx.GetPublicTargetNameOverride(); - if (serveRootDomain) - desc->ServedDatabases.insert(desc->ServedDatabases.end(), rootDomains.begin(), rootDomains.end()); desc->ServedServices.insert(desc->ServedServices.end(), sx.GetServices().begin(), sx.GetServices().end()); endpoints.push_back(std::move(desc)); } diff --git a/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp b/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp index ce65619641..031c7911b7 100644 --- a/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp +++ b/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp @@ -1,107 +1,65 @@ #include "grpc_endpoint.h" -#include <util/generic/map.h> -#include <util/generic/set.h> - #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/interconnect/interconnect.h> +#include <ydb/core/base/path.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/location.h> #include <ydb/core/base/statestorage.h> -#include <ydb/core/mind/tenant_pool.h> -namespace NKikimr { -namespace NGRpcService { +namespace NKikimr::NGRpcService { using namespace NActors; class TGRpcEndpointPublishActor : public TActorBootstrapped<TGRpcEndpointPublishActor> { TIntrusivePtr<TGrpcEndpointDescription> Description; - TString SelfDatacenter; + TActorId PublishActor; - bool resolvedState; - TMap<TActorId, TSet<TString>> ServedDatabases; - TMap<TString, TActorId> PublishedDatabases; + void CreatePublishActor() { + ui32 nodeId = SelfId().NodeId(); + TString database = AppData()->TenantName; - TActorId CreatePublishActor(const TString &database, TString &payload, ui32 nodeId) { auto *domains = AppData()->DomainsInfo.Get(); auto domainName = ExtractDomain(database); auto *domainInfo = domains->GetDomainByName(domainName); if (!domainInfo) - return TActorId(); + return; auto statestorageGroupId = domainInfo->DefaultStateStorageGroup; auto assignedPath = MakeEndpointsBoardPath(database); - TActorId &aid = PublishedDatabases[database]; - if (!aid) { - if (!payload) { - NKikimrStateStorage::TEndpointBoardEntry entry; - entry.SetAddress(Description->Address); - entry.SetPort(Description->Port); - entry.SetLoad(0.0f); - entry.SetSsl(Description->Ssl); - entry.MutableServices()->Reserve(Description->ServedServices.size()); - entry.SetDataCenter(SelfDatacenter); - entry.SetNodeId(nodeId); - for (const auto& addr : Description->AddressesV4) { - entry.AddAddressesV4(addr); - } - for (const auto& addr : Description->AddressesV6) { - entry.AddAddressesV6(addr); - } - if (Description->TargetNameOverride) { - entry.SetTargetNameOverride(Description->TargetNameOverride); - } - for (const auto &service : Description->ServedServices) - entry.AddServices(service); - Y_VERIFY(entry.SerializeToString(&payload)); - } - - aid = Register(CreateBoardPublishActor(assignedPath, payload, SelfId(), statestorageGroupId, 0, true)); - } - return aid; - } - - void Handle(TEvTenantPool::TEvTenantPoolStatus::TPtr &ev) { - const auto &record = ev->Get()->Record; - - auto &served = ServedDatabases[ev->Sender]; - auto toRemove = std::move(served); - served = TSet<TString>(); - TString payload; - - for (auto &x : record.GetSlots()) { - if (const TString &assignedDatabase = x.GetAssignedTenant()) { - if (toRemove.erase(assignedDatabase) == 0) - CreatePublishActor(assignedDatabase, payload, SelfId().NodeId()); - - served.insert(assignedDatabase); - } + NKikimrStateStorage::TEndpointBoardEntry entry; + entry.SetAddress(Description->Address); + entry.SetPort(Description->Port); + entry.SetLoad(0.0f); + entry.SetSsl(Description->Ssl); + entry.MutableServices()->Reserve(Description->ServedServices.size()); + entry.SetDataCenter(SelfDatacenter); + entry.SetNodeId(nodeId); + for (const auto& addr : Description->AddressesV4) { + entry.AddAddressesV4(addr); } - - for (auto &x : toRemove) { - auto it = PublishedDatabases.find(x); - if (it != PublishedDatabases.end()) { - LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Stop publish endpoints for database: " << x); - Send(it->second, new TEvents::TEvPoisonPill()); - PublishedDatabases.erase(it); - } else { - LOG_CRIT_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "multiple eviction of " << x << " database. Ignoring"); - } + for (const auto& addr : Description->AddressesV6) { + entry.AddAddressesV6(addr); + } + if (Description->TargetNameOverride) { + entry.SetTargetNameOverride(Description->TargetNameOverride); } + for (const auto &service : Description->ServedServices) + entry.AddServices(service); + + Y_VERIFY(entry.SerializeToString(&payload)); + + PublishActor = Register(CreateBoardPublishActor(assignedPath, payload, SelfId(), statestorageGroupId, 0, true)); } void PassAway() override { - if (resolvedState) { - Send(MakeTenantPoolRootID(), new TEvents::TEvUnsubscribe()); - for(const auto& x: PublishedDatabases) { - LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Stop publish endpoints for database: " << x.first); - Send(x.second, new TEvents::TEvPoisonPill()); - } + if (PublishActor) { + LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Stop publish endpoints for database: " << AppData()->TenantName); + Send(PublishActor, new TEvents::TEvPoisonPill()); } TActor::PassAway(); @@ -112,17 +70,8 @@ class TGRpcEndpointPublishActor : public TActorBootstrapped<TGRpcEndpointPublish if (msg->Node && msg->Node->Location.GetDataCenterId()) SelfDatacenter = msg->Node->Location.GetDataCenterId(); - if (Description->ServedDatabases) { - TString payload; - for (auto &x : Description->ServedDatabases) { - ServedDatabases[TActorId()].insert(x); - CreatePublishActor(x, payload, SelfId().NodeId()); - } - } - - Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe()); + CreatePublishActor(); Become(&TThis::StateWork); - resolvedState = true; } public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -131,7 +80,6 @@ public: TGRpcEndpointPublishActor(TGrpcEndpointDescription *desc) : Description(desc) - , resolvedState(false) {} void Bootstrap() { @@ -153,7 +101,6 @@ public: STFUNC(StateWork) { Y_UNUSED(ctx); switch (ev->GetTypeRewrite()) { - hFunc(TEvTenantPool::TEvTenantPoolStatus, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } @@ -163,5 +110,4 @@ IActor* CreateGrpcEndpointPublishActor(TGrpcEndpointDescription *description) { return new TGRpcEndpointPublishActor(description); } -} -} +} // NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/grpc_publisher_service_actor.cpp b/ydb/core/grpc_services/grpc_publisher_service_actor.cpp index f73de3a89c..0cd44095bd 100644 --- a/ydb/core/grpc_services/grpc_publisher_service_actor.cpp +++ b/ydb/core/grpc_services/grpc_publisher_service_actor.cpp @@ -3,8 +3,7 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/actor_bootstrapped.h> -namespace NKikimr { -namespace NGRpcService { +namespace NKikimr::NGRpcService { using namespace NActors; @@ -51,5 +50,4 @@ IActor* CreateGrpcPublisherServiceActor(TVector<TIntrusivePtr<TGrpcEndpointDescr return new TGrpcPublisherServiceActor(std::move(endpoints)); } -} -}
\ No newline at end of file +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index c8c8bfac98..77c99770b9 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1,6 +1,7 @@ #include "test_client.h" #include <ydb/core/testlib/basics/runtime.h> +#include <ydb/core/base/path.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/hive.h> #include <ydb/public/lib/base/msgbus.h> @@ -614,7 +615,7 @@ namespace Tests { TTenantPoolConfig::TPtr tenantPoolConfig = new TTenantPoolConfig(localConfig); tenantPoolConfig->AddStaticSlot(domainName); - appData.TenantName = domainName; + appData.TenantName = CanonizePath(domainName); auto poolId = Runtime->Register(CreateTenantPool(tenantPoolConfig), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); |