diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-02-14 21:01:51 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-02-14 21:01:51 +0300 |
commit | bd1053d7928808e8c89aecfb118bfdfd81bc9b5d (patch) | |
tree | 207708473c436641271d959c693c5c8f74c1230c | |
parent | 41d7e17179cfdd2d4479b8dafc0793566acf3c9a (diff) | |
download | ydb-bd1053d7928808e8c89aecfb118bfdfd81bc9b5d.tar.gz |
Common discovery actor
-rw-r--r-- | ydb/core/base/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/base/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/base/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/base/discovery.cpp | 342 | ||||
-rw-r--r-- | ydb/core/base/discovery.h | 41 | ||||
-rw-r--r-- | ydb/core/base/events.h | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_discovery.cpp | 332 | ||||
-rw-r--r-- | ydb/core/protos/services.proto | 6 |
8 files changed, 429 insertions, 296 deletions
diff --git a/ydb/core/base/CMakeLists.darwin.txt b/ydb/core/base/CMakeLists.darwin.txt index 0d68a17abe..620438b37f 100644 --- a/ydb/core/base/CMakeLists.darwin.txt +++ b/ydb/core/base/CMakeLists.darwin.txt @@ -47,6 +47,7 @@ target_sources(ydb-core-base PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/base/board_replica.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/blobstorage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/base/discovery.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/event_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/group_stat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/kikimr_issue.cpp diff --git a/ydb/core/base/CMakeLists.linux-aarch64.txt b/ydb/core/base/CMakeLists.linux-aarch64.txt index b7db9bc79f..2ae07134f3 100644 --- a/ydb/core/base/CMakeLists.linux-aarch64.txt +++ b/ydb/core/base/CMakeLists.linux-aarch64.txt @@ -48,6 +48,7 @@ target_sources(ydb-core-base PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/base/board_replica.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/blobstorage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/base/discovery.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/event_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/group_stat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/kikimr_issue.cpp diff --git a/ydb/core/base/CMakeLists.linux.txt b/ydb/core/base/CMakeLists.linux.txt index b7db9bc79f..2ae07134f3 100644 --- a/ydb/core/base/CMakeLists.linux.txt +++ b/ydb/core/base/CMakeLists.linux.txt @@ -48,6 +48,7 @@ target_sources(ydb-core-base PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/base/board_replica.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/blobstorage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/base/discovery.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/event_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/group_stat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/base/kikimr_issue.cpp diff --git a/ydb/core/base/discovery.cpp b/ydb/core/base/discovery.cpp new file mode 100644 index 0000000000..e504f34ef3 --- /dev/null +++ b/ydb/core/base/discovery.cpp @@ -0,0 +1,342 @@ +#include "discovery.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/path.h> +#include <ydb/core/base/statestorage.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> + +#define LOG_T(service, stream) LOG_TRACE_S(*TlsActivationContext, service, stream) +#define LOG_D(service, stream) LOG_DEBUG_S(*TlsActivationContext, service, stream) + +#define CLOG_T(stream) LOG_T(NKikimrServices::DISCOVERY_CACHE, stream) +#define CLOG_D(stream) LOG_D(NKikimrServices::DISCOVERY_CACHE, stream) + +#define DLOG_T(stream) LOG_T(NKikimrServices::DISCOVERY, stream) +#define DLOG_D(stream) LOG_D(NKikimrServices::DISCOVERY, stream) + +namespace NKikimr { + +namespace NDiscoveryPrivate { + struct TEvPrivate { + enum EEv { + EvRequest = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvEnd + }; + + struct TEvRequest: public TEventLocal<TEvRequest, EvRequest> { + const TString Database; + const ui32 StateStorageId; + + TEvRequest(const TString& db, ui32 stateStorageId) + : Database(db) + , StateStorageId(stateStorageId) + { + } + }; + }; + + class TDiscoveryCache: public TActor<TDiscoveryCache> { + THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> OldInfo; + THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> NewInfo; + + struct TWaiter { + TActorId ActorId; + ui64 Cookie; + }; + + THashMap<TString, TVector<TWaiter>> Requested; + bool Scheduled = false; + + void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) { + CLOG_T("Handle " << ev->Get()->ToString()); + + THolder<TEvStateStorage::TEvBoardInfo> msg = ev->Release(); + const auto& path = msg->Path; + + if (auto it = Requested.find(path); it != Requested.end()) { + for (const auto& waiter : it->second) { + Send(waiter.ActorId, new TEvStateStorage::TEvBoardInfo(*msg), 0, waiter.Cookie); + } + + Requested.erase(it); + } + + NewInfo.emplace(path, std::move(msg)); + + if (!Scheduled) { + Scheduled = true; + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); + } + } + + void Wakeup() { + OldInfo.swap(NewInfo); + NewInfo.clear(); + + if (!OldInfo.empty()) { + Scheduled = true; + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); + } else { + Scheduled = false; + } + } + + void Handle(TEvPrivate::TEvRequest::TPtr& ev) { + CLOG_T("Handle " << ev->Get()->ToString()); + + const auto* msg = ev->Get(); + + if (const auto* x = OldInfo.FindPtr(msg->Database)) { + Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie); + return; + } + + if (const auto* x = NewInfo.FindPtr(msg->Database)) { + Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie); + return; + } + + auto& requested = Requested[msg->Database]; + if (requested.empty()) { + CLOG_D("Lookup" + << ": path# " << msg->Database); + + Register(CreateBoardLookupActor(msg->Database, SelfId(), msg->StateStorageId, EBoardLookupMode::Second, false, false)); + } + + requested.push_back({ev->Sender, ev->Cookie}); + } + + public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR; + } + + TDiscoveryCache() + : TActor(&TThis::StateWork) + { + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPrivate::TEvRequest, Handle); + hFunc(TEvStateStorage::TEvBoardInfo, Handle); + sFunc(TEvents::TEvWakeup, Wakeup); + sFunc(TEvents::TEvPoison, PassAway); + } + } + }; +} + +class TDiscoverer: public TActorBootstrapped<TDiscoverer> { + const TString Database; + const TActorId ReplyTo; + const TActorId CacheId; + + THolder<TEvStateStorage::TEvBoardInfo> LookupResponse; + THolder<TEvTxProxySchemeCache::TEvNavigateKeySetResult> SchemeCacheResponse; + + bool ResolveResources = false; + ui64 LookupCookie = 0; + + void Reply(IEventBase* ev) { + Send(ReplyTo, ev); + PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::DISCOVERY_ACTOR; + } + + explicit TDiscoverer(const TString& database, const TActorId& replyTo, const TActorId& cacheId) + : Database(database) + , ReplyTo(replyTo) + , CacheId(cacheId) + { + } + + void Bootstrap() { + Lookup(Database); + Navigate(Database); + + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvStateStorage::TEvBoardInfo, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + + void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) { + DLOG_T("Handle " << LookupResponse->ToString() + << ": cookie# " << ev->Cookie); + + if (ev->Cookie != LookupCookie) { + DLOG_D("Stale lookup response" + << ": got# " << ev->Cookie + << ", expected# " << LookupCookie); + return; + } + + LookupResponse.Reset(ev->Release().Release()); + + MaybeReply(); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + SchemeCacheResponse.Reset(ev->Release().Release()); + + const auto* response = SchemeCacheResponse.Get()->Request.Get(); + + Y_VERIFY(response->ResultSet.size() == 1); + const auto& entry = response->ResultSet.front(); + + DLOG_T("Handle " << SchemeCacheResponse->ToString() + << ": entry# " << entry.ToString()); + + if (response->ErrorCount > 0) { + switch (entry.Status) { + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: + case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: + return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::DATABASE_NOT_EXIST, + "Requested database not exists")); + default: + DLOG_D("Unexpected status" + << ": entry# " << entry.ToString()); + return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR, + "Database resolve failed with no certain result")); + } + } + + if (!entry.DomainInfo) { + DLOG_D("Empty domain info" + << ": entry# " << entry.ToString()); + return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR, + "Database resolve failed with no certain result")); + } + + auto info = entry.DomainInfo; + if (info->DomainKey != info->ResourcesDomainKey) { + DLOG_D("Resolve resources domain" + << ": domain key# " << info->DomainKey + << ", resources domain key# " << info->ResourcesDomainKey); + + Navigate(info->ResourcesDomainKey); + ResolveResources = true; + } else if (ResolveResources) { + Lookup(CanonizePath(entry.Path)); + } + + MaybeReply(); + } + + void MaybeReply() { + if (!LookupResponse || !SchemeCacheResponse) { + return; + } + + { + // check presence of database (acl should be checked here too) + const auto& entry = SchemeCacheResponse->Request->ResultSet.front(); + const auto isDomain = entry.Path.size() == 1; + const auto isSubDomain = entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindSubdomain + || entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindExtSubdomain; + + if (!isDomain && !isSubDomain) { + DLOG_D("Path is not database" + << ": entry# " << entry.ToString()); + return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::ACCESS_DENIED, + "Requested path is not database name")); + } + } + + if (LookupResponse->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) { + DLOG_D("Lookup error" + << ": status# " << ui64(LookupResponse->Status)); + return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR, + "Database nodes resolve failed with no certain result")); + } + + Reply(new TEvStateStorage::TEvBoardInfo(*LookupResponse)); + } + + void Lookup(const TString& db) { + DLOG_T("Lookup" + << ": path# " << db); + + const auto path = NKikimr::SplitPath(db); + const auto domainName = path ? path[0] : TString(); + auto* domainInfo = AppData()->DomainsInfo->GetDomainByName(domainName); + if (!domainInfo) { + return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::DATABASE_NOT_EXIST, + "Database " + domainName + " not exists")); + } + + TString database; + for (const auto& token : path) { + if (token.size() > 4100) { + return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::KEY_PARSE_ERROR, + "Requested database name too long")); + } + + database.append("/").append(token); + } + + const auto stateStorageGroupId = domainInfo->DefaultStateStorageGroup; + const auto reqPath = MakeEndpointsBoardPath(database); + + Send(CacheId, new NDiscoveryPrivate::TEvPrivate::TEvRequest(reqPath, stateStorageGroupId), 0, ++LookupCookie); + LookupResponse.Reset(); + } + + static void FillNavigateKey(const TString& path, NSchemeCache::TSchemeCacheNavigate::TEntry& entry) { + entry.Path = NKikimr::SplitPath(path); + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath; + } + + static void FillNavigateKey(const TPathId& pathId, NSchemeCache::TSchemeCacheNavigate::TEntry& entry) { + entry.TableId = TTableId(pathId.OwnerId, pathId.LocalPathId); + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + } + + template <typename T> + void Navigate(const T& id) { + DLOG_T("Navigate" + << ": path# " << id); + + auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + + auto& entry = request->ResultSet.emplace_back(); + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + entry.RedirectRequired = false; + FillNavigateKey(id, entry); + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); + SchemeCacheResponse.Reset(); + } +}; + +IActor* CreateDiscoverer(const TString& database, const TActorId& replyTo, const TActorId& cacheId) { + return new TDiscoverer(database, replyTo, cacheId); +} + +IActor* CreateDiscoveryCache() { + return new NDiscoveryPrivate::TDiscoveryCache(); +} + +} + +#undef DLOG_T +#undef DLOG_D +#undef CLOG_T +#undef CLOG_D +#undef LOG_T +#undef LOG_D diff --git a/ydb/core/base/discovery.h b/ydb/core/base/discovery.h new file mode 100644 index 0000000000..c9b8ad0547 --- /dev/null +++ b/ydb/core/base/discovery.h @@ -0,0 +1,41 @@ +#pragma once + +#include "defs.h" +#include "events.h" + +namespace NKikimr { + +struct TEvDiscovery { + enum EEv { + EvError = EventSpaceBegin(TKikimrEvents::ES_DISCOVERY), + EvEnd + }; + + struct TEvError: public TEventLocal<TEvError, EvError> { + enum EStatus { + KEY_PARSE_ERROR, + RESOLVE_ERROR, + DATABASE_NOT_EXIST, + ACCESS_DENIED, + }; + + EStatus Status; + TString Error; + + explicit TEvError(EStatus status, const TString& error) + : Status(status) + , Error(error) + { + } + }; +}; + +// Reply with: +// - in case of success: TEvStateStorage::TEvBoardInfo +// - otherwise: TEvDiscovery::TEvError +IActor* CreateDiscoverer(const TString& database, const TActorId& replyTo, const TActorId& cacheId); + +// Used to reduce number of requests to Board +IActor* CreateDiscoveryCache(); + +} diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 6123921917..7ee00f8332 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -158,6 +158,7 @@ struct TKikimrEvents : TEvents { ES_METADATA_SECRET, ES_TEST_LOAD, ES_GRPC_CANCELATION, + ES_DISCOVERY, }; }; diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp index 75550f92ac..088eed6a2f 100644 --- a/ydb/core/grpc_services/rpc_discovery.cpp +++ b/ydb/core/grpc_services/rpc_discovery.cpp @@ -3,11 +3,8 @@ #include "rpc_calls.h" #include "rpc_kqp_base.h" -#include <ydb/core/base/statestorage.h> -#include <ydb/core/base/appdata.h> -#include <ydb/core/base/path.h> +#include <ydb/core/base/discovery.h> #include <ydb/core/base/location.h> -#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -18,126 +15,19 @@ #include <util/random/shuffle.h> -namespace NKikimr { -namespace NGRpcService { +namespace NKikimr::NGRpcService { using namespace NActors; using namespace Ydb; using namespace NKqp; -namespace NDiscoveryPrivate { - struct TEvPrivate { - enum EEv { - EvRequest = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), - EvEnd - }; - - struct TEvRequest : public TEventLocal<TEvRequest, EvRequest> { - const TString Database; - const ui32 StateStorageId; - - TEvRequest(const TString &db, ui32 stateStorageId) - : Database(db) - , StateStorageId(stateStorageId) - {} - }; - }; - - class TDiscoveryCache : public TActor<TDiscoveryCache> { - THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> OldInfo; - THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> NewInfo; - - struct TWaiter { - TActorId ActorId; - ui64 Cookie; - }; - - THashMap<TString, TVector<TWaiter>> Requested; - bool Scheduled; - - void Handle(TEvStateStorage::TEvBoardInfo::TPtr &ev) { - THolder<TEvStateStorage::TEvBoardInfo> msg = ev->Release(); - const TString &path = msg->Path; - - auto vecIt = Requested.find(path); - if (vecIt != Requested.end()) { - for (auto &x : vecIt->second) - Send(x.ActorId, new TEvStateStorage::TEvBoardInfo(*msg), 0, x.Cookie); - Requested.erase(vecIt); - } - - NewInfo.emplace(path, std::move(msg)); - - if (!Scheduled) { - Scheduled = true; - Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); - } - } - - void Wakeup() { - OldInfo.swap(NewInfo); - NewInfo.clear(); - - if (!OldInfo.empty()) { - Scheduled = true; - Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); - } else { - Scheduled = false; - } - } - - void Handle(TEvPrivate::TEvRequest::TPtr &ev) { - auto *msg = ev->Get(); - if (auto *x = OldInfo.FindPtr(msg->Database)) { - Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie); - return; - } - - if (auto *x = NewInfo.FindPtr(msg->Database)) { - Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie); - return; - } - - auto &rqstd = Requested[msg->Database]; - if (rqstd.empty()) { - Register(CreateBoardLookupActor(msg->Database, SelfId(), msg->StateStorageId, EBoardLookupMode::Second, false, false)); - } - - rqstd.push_back({ev->Sender, ev->Cookie}); - } - public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - TDiscoveryCache() - : TActor(&TThis::StateWork) - , Scheduled(false) - {} - - STFUNC(StateWork) { - Y_UNUSED(ctx); - switch (ev->GetTypeRewrite()) { - hFunc(TEvPrivate::TEvRequest, Handle); - hFunc(TEvStateStorage::TEvBoardInfo, Handle); - cFunc(TEvents::TEvWakeup::EventType, Wakeup); - } - } - }; -} - class TListEndpointsRPC : public TActorBootstrapped<TListEndpointsRPC> { THolder<TEvListEndpointsRequest> Request; const TActorId CacheId; - - const bool RequestScheme = true; + TActorId Discoverer; THolder<TEvStateStorage::TEvBoardInfo> LookupResponse; THolder<TEvInterconnect::TEvNodeInfo> NameserviceResponse; - THolder<TEvTxProxySchemeCache::TEvNavigateKeySetResult> SchemeCacheResponse; - - bool ResolveResources = false; - ui64 LookupCookie = 0; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -151,119 +41,58 @@ public: void Bootstrap() { // request endpoints - Lookup(Request->GetProtoRequest()->database()); + Discoverer = Register(CreateDiscoverer(Request->GetProtoRequest()->database(), SelfId(), CacheId)); // request self node info Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(SelfId().NodeId())); - // request path info - if (RequestScheme) { - Navigate(Request->GetProtoRequest()->database()); - } - Become(&TThis::StateWait); } - STFUNC(StateWait) { - Y_UNUSED(ctx); + STATEFN(StateWait) { switch (ev->GetTypeRewrite()) { hFunc(TEvStateStorage::TEvBoardInfo, Handle); hFunc(TEvInterconnect::TEvNodeInfo, Handle); - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvDiscovery::TEvError, Handle); } } void Handle(TEvStateStorage::TEvBoardInfo::TPtr &ev) { - if (ev->Cookie != LookupCookie) { - return; - } - - LookupResponse = THolder<TEvStateStorage::TEvBoardInfo>(ev->Release().Release()); - + LookupResponse.Reset(ev->Release().Release()); TryReplyAndDie(); } void Handle(TEvInterconnect::TEvNodeInfo::TPtr &ev) { - NameserviceResponse = THolder<TEvInterconnect::TEvNodeInfo>(ev->Release().Release()); - + NameserviceResponse.Reset(ev->Release().Release()); TryReplyAndDie(); } - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) { - SchemeCacheResponse = THolder<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(ev->Release().Release()); - - TEvTxProxySchemeCache::TEvNavigateKeySetResult *msg = SchemeCacheResponse.Get(); - NSchemeCache::TSchemeCacheNavigate *navigate = msg->Request.Get(); - - Y_VERIFY(navigate->ResultSet.size() == 1); - const auto &entry = navigate->ResultSet.front(); - - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - "TListEndpointsRPC: handle TEvNavigateKeySetResult" - << ", entry: " << entry.ToString()); - - if (navigate->ErrorCount > 0) { - switch (entry.Status) { - case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: - case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: - { - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DATABASE_NOT_EXIST, "Requested database not exists"); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; - NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(Ydb::StatusIds::NOT_FOUND, issueMessages); - return PassAway(); - } - default: - { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - "TListEndpointsRPC: GENERIC_RESOLVE_ERROR" - << ", entry: " << entry.ToString()); - - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, "Database resolve failed with no certain result"); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; - NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(Ydb::StatusIds::UNAVAILABLE, issueMessages); - return PassAway(); - } - } - } - - if (!entry.DomainInfo) { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - "TListEndpointsRPC: GENERIC_RESOLVE_ERROR (empty domain info)" - << ", entry: " << entry.ToString()); - - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, "Database resolve failed with no certain result"); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; - NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(Ydb::StatusIds::UNAVAILABLE, issueMessages); - return PassAway(); - } + void Handle(TEvDiscovery::TEvError::TPtr &ev) { + auto issue = MakeIssue(ErrorToIssueCode(ev->Get()->Status), ev->Get()->Error); + google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; + NYql::IssueToMessage(issue, issueMessages.Add()); + Request->SendResult(ErrorToStatusCode(ev->Get()->Status), issueMessages); + PassAway(); + } - auto info = entry.DomainInfo; - if (info->DomainKey != info->ResourcesDomainKey) { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - "TListEndpointsRPC: domain key differs from resources domain key" - << ", domain key: " << info->DomainKey - << ", resources domain key: " << info->ResourcesDomainKey); - - Navigate(info->ResourcesDomainKey); - ResolveResources = true; - } else if (ResolveResources) { - Lookup(CanonizePath(entry.Path)); + static NKikimrIssues::TIssuesIds::EIssueCode ErrorToIssueCode(TEvDiscovery::TEvError::EStatus status) { + switch (status) { + case TEvDiscovery::TEvError::KEY_PARSE_ERROR: return NKikimrIssues::TIssuesIds::KEY_PARSE_ERROR; + case TEvDiscovery::TEvError::RESOLVE_ERROR: return NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR; + case TEvDiscovery::TEvError::DATABASE_NOT_EXIST: return NKikimrIssues::TIssuesIds::DATABASE_NOT_EXIST; + case TEvDiscovery::TEvError::ACCESS_DENIED: return NKikimrIssues::TIssuesIds::ACCESS_DENIED; + default: return NKikimrIssues::TIssuesIds::DEFAULT_ERROR; } - - TryReplyAndDie(); } - void Handle(TEvents::TEvUndelivered::TPtr &ev) { - Y_UNUSED(ev); - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::UNEXPECTED, "Unexpected error while resolving database"); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; - NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(Ydb::StatusIds::INTERNAL_ERROR, issueMessages); - return PassAway(); + static Ydb::StatusIds::StatusCode ErrorToStatusCode(TEvDiscovery::TEvError::EStatus status) { + switch (status) { + case TEvDiscovery::TEvError::KEY_PARSE_ERROR: return Ydb::StatusIds::BAD_REQUEST; + case TEvDiscovery::TEvError::RESOLVE_ERROR: return Ydb::StatusIds::UNAVAILABLE; + case TEvDiscovery::TEvError::DATABASE_NOT_EXIST: return Ydb::StatusIds::NOT_FOUND; + case TEvDiscovery::TEvError::ACCESS_DENIED: return Ydb::StatusIds::NOT_FOUND; + default: return Ydb::StatusIds::BAD_REQUEST; + } } bool CheckServices(const TSet<TString> &req, const NKikimrStateStorage::TEndpointBoardEntry &entry) { @@ -278,39 +107,10 @@ public: } void TryReplyAndDie() { - if (!NameserviceResponse || !LookupResponse || (RequestScheme && !SchemeCacheResponse)) + if (!NameserviceResponse || !LookupResponse) return; - if (RequestScheme) { - // check presence of database (acl should be checked here too) - const auto &entry = SchemeCacheResponse->Request->ResultSet.front(); - if (entry.Path.size() != 1 - && (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindSubdomain && entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindExtSubdomain)) - { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - "TListEndpointsRPC: SchemeCacheResponse path is not a database" - << ", entry.Path: " << CanonizePath(entry.Path) - << ", entry.Kind: " << (ui64)entry.Kind); - - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, "Requested path is not database name"); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; - NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(Ydb::StatusIds::NOT_FOUND, issueMessages); - return PassAway(); - } - } - - if (LookupResponse->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - "TListEndpointsRPC: LookupResponse in not OK" - << ", LookupResponse->Status: " << ui64(LookupResponse->Status)); - - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, "Database nodes resolve failed with no certain result"); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; - NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(Ydb::StatusIds::UNAVAILABLE, issueMessages); - return PassAway(); - } + Y_VERIFY(LookupResponse->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); TStackVec<const TString*> entries; entries.reserve(LookupResponse->InfoEntries.size()); @@ -414,74 +214,14 @@ public: } return true; } - - void Lookup(const TString& db) { - TVector<TString> path = NKikimr::SplitPath(db); - auto domainName = path ? path[0] : TString(); - auto *appdata = AppData(); - auto *domainInfo = appdata->DomainsInfo->GetDomainByName(domainName); - if (!domainInfo) { - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DATABASE_NOT_EXIST, "Database " + domainName + " not exists"); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; - NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(Ydb::StatusIds::BAD_REQUEST, issueMessages); - return PassAway(); - } - - TString database; - for (auto &x : path) { - if (x.size() > 4100) { - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::KEY_PARSE_ERROR, "Requested database name too long"); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; - NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(Ydb::StatusIds::BAD_REQUEST, issueMessages); - return PassAway(); - } - database.append("/").append(x); - } - - // request endpoints - auto stateStorageGroupId = domainInfo->DefaultStateStorageGroup; - auto reqPath = MakeEndpointsBoardPath(database); - - Send(CacheId, new NDiscoveryPrivate::TEvPrivate::TEvRequest(reqPath, stateStorageGroupId), 0, ++LookupCookie); - LookupResponse.Reset(); - } - - void FillNavigateKey(const TString& path, NSchemeCache::TSchemeCacheNavigate::TEntry& entry) { - entry.Path = NKikimr::SplitPath(path); - entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath; - } - - void FillNavigateKey(const TPathId& pathId, NSchemeCache::TSchemeCacheNavigate::TEntry& entry) { - entry.TableId = TTableId(pathId.OwnerId, pathId.LocalPathId); - entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; - } - - template <typename T> - void Navigate(const T& id) { - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - "TListEndpointsRPC: make TEvNavigateKeySet request" - << ", path: " << id); - - auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - - request->ResultSet.emplace_back(); - FillNavigateKey(id, request->ResultSet.back()); - request->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; - request->ResultSet.back().RedirectRequired = false; - - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), IEventHandle::FlagTrackDelivery); - SchemeCacheResponse.Reset(); - } }; void TGRpcRequestProxy::Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx) { - if (!DiscoveryCacheActorID) - DiscoveryCacheActorID = ctx.Register(new NDiscoveryPrivate::TDiscoveryCache()); + if (!DiscoveryCacheActorID) { + DiscoveryCacheActorID = ctx.Register(CreateDiscoveryCache()); + } ctx.Register(new TListEndpointsRPC(ev, DiscoveryCacheActorID)); } -} // namespace NGRpcService -} // namespace NKikimr +} diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 029eac76d8..b2e2f52c48 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -355,6 +355,10 @@ enum EServiceKikimr { // Background tasks BG_TASKS = 1700; + + // Discovery + DISCOVERY = 1800; + DISCOVERY_CACHE = 1801; }; message TActivity { @@ -966,5 +970,7 @@ message TActivity { SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER = 603; REPLICATION_CONTROLLER_STREAM_REMOVER = 604; REPLICATION_CONTROLLER_DST_REMOVER = 605; + DISCOVERY_ACTOR = 607; + DISCOVERY_CACHE_ACTOR = 608; }; }; |