aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-02-14 21:01:51 +0300
committerilnaz <ilnaz@ydb.tech>2023-02-14 21:01:51 +0300
commitbd1053d7928808e8c89aecfb118bfdfd81bc9b5d (patch)
tree207708473c436641271d959c693c5c8f74c1230c
parent41d7e17179cfdd2d4479b8dafc0793566acf3c9a (diff)
downloadydb-bd1053d7928808e8c89aecfb118bfdfd81bc9b5d.tar.gz
Common discovery actor
-rw-r--r--ydb/core/base/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/base/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/base/CMakeLists.linux.txt1
-rw-r--r--ydb/core/base/discovery.cpp342
-rw-r--r--ydb/core/base/discovery.h41
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/grpc_services/rpc_discovery.cpp332
-rw-r--r--ydb/core/protos/services.proto6
8 files changed, 429 insertions, 296 deletions
diff --git a/ydb/core/base/CMakeLists.darwin.txt b/ydb/core/base/CMakeLists.darwin.txt
index 0d68a17abe..620438b37f 100644
--- a/ydb/core/base/CMakeLists.darwin.txt
+++ b/ydb/core/base/CMakeLists.darwin.txt
@@ -47,6 +47,7 @@ target_sources(ydb-core-base PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/base/board_replica.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/blobstorage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/base/discovery.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/event_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/group_stat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/kikimr_issue.cpp
diff --git a/ydb/core/base/CMakeLists.linux-aarch64.txt b/ydb/core/base/CMakeLists.linux-aarch64.txt
index b7db9bc79f..2ae07134f3 100644
--- a/ydb/core/base/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/base/CMakeLists.linux-aarch64.txt
@@ -48,6 +48,7 @@ target_sources(ydb-core-base PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/base/board_replica.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/blobstorage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/base/discovery.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/event_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/group_stat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/kikimr_issue.cpp
diff --git a/ydb/core/base/CMakeLists.linux.txt b/ydb/core/base/CMakeLists.linux.txt
index b7db9bc79f..2ae07134f3 100644
--- a/ydb/core/base/CMakeLists.linux.txt
+++ b/ydb/core/base/CMakeLists.linux.txt
@@ -48,6 +48,7 @@ target_sources(ydb-core-base PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/base/board_replica.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/blobstorage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/base/discovery.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/event_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/group_stat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/base/kikimr_issue.cpp
diff --git a/ydb/core/base/discovery.cpp b/ydb/core/base/discovery.cpp
new file mode 100644
index 0000000000..e504f34ef3
--- /dev/null
+++ b/ydb/core/base/discovery.cpp
@@ -0,0 +1,342 @@
+#include "discovery.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/path.h>
+#include <ydb/core/base/statestorage.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+#define LOG_T(service, stream) LOG_TRACE_S(*TlsActivationContext, service, stream)
+#define LOG_D(service, stream) LOG_DEBUG_S(*TlsActivationContext, service, stream)
+
+#define CLOG_T(stream) LOG_T(NKikimrServices::DISCOVERY_CACHE, stream)
+#define CLOG_D(stream) LOG_D(NKikimrServices::DISCOVERY_CACHE, stream)
+
+#define DLOG_T(stream) LOG_T(NKikimrServices::DISCOVERY, stream)
+#define DLOG_D(stream) LOG_D(NKikimrServices::DISCOVERY, stream)
+
+namespace NKikimr {
+
+namespace NDiscoveryPrivate {
+ struct TEvPrivate {
+ enum EEv {
+ EvRequest = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvEnd
+ };
+
+ struct TEvRequest: public TEventLocal<TEvRequest, EvRequest> {
+ const TString Database;
+ const ui32 StateStorageId;
+
+ TEvRequest(const TString& db, ui32 stateStorageId)
+ : Database(db)
+ , StateStorageId(stateStorageId)
+ {
+ }
+ };
+ };
+
+ class TDiscoveryCache: public TActor<TDiscoveryCache> {
+ THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> OldInfo;
+ THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> NewInfo;
+
+ struct TWaiter {
+ TActorId ActorId;
+ ui64 Cookie;
+ };
+
+ THashMap<TString, TVector<TWaiter>> Requested;
+ bool Scheduled = false;
+
+ void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
+ CLOG_T("Handle " << ev->Get()->ToString());
+
+ THolder<TEvStateStorage::TEvBoardInfo> msg = ev->Release();
+ const auto& path = msg->Path;
+
+ if (auto it = Requested.find(path); it != Requested.end()) {
+ for (const auto& waiter : it->second) {
+ Send(waiter.ActorId, new TEvStateStorage::TEvBoardInfo(*msg), 0, waiter.Cookie);
+ }
+
+ Requested.erase(it);
+ }
+
+ NewInfo.emplace(path, std::move(msg));
+
+ if (!Scheduled) {
+ Scheduled = true;
+ Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
+ }
+ }
+
+ void Wakeup() {
+ OldInfo.swap(NewInfo);
+ NewInfo.clear();
+
+ if (!OldInfo.empty()) {
+ Scheduled = true;
+ Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
+ } else {
+ Scheduled = false;
+ }
+ }
+
+ void Handle(TEvPrivate::TEvRequest::TPtr& ev) {
+ CLOG_T("Handle " << ev->Get()->ToString());
+
+ const auto* msg = ev->Get();
+
+ if (const auto* x = OldInfo.FindPtr(msg->Database)) {
+ Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie);
+ return;
+ }
+
+ if (const auto* x = NewInfo.FindPtr(msg->Database)) {
+ Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie);
+ return;
+ }
+
+ auto& requested = Requested[msg->Database];
+ if (requested.empty()) {
+ CLOG_D("Lookup"
+ << ": path# " << msg->Database);
+
+ Register(CreateBoardLookupActor(msg->Database, SelfId(), msg->StateStorageId, EBoardLookupMode::Second, false, false));
+ }
+
+ requested.push_back({ev->Sender, ev->Cookie});
+ }
+
+ public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR;
+ }
+
+ TDiscoveryCache()
+ : TActor(&TThis::StateWork)
+ {
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPrivate::TEvRequest, Handle);
+ hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ sFunc(TEvents::TEvWakeup, Wakeup);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+ };
+}
+
+class TDiscoverer: public TActorBootstrapped<TDiscoverer> {
+ const TString Database;
+ const TActorId ReplyTo;
+ const TActorId CacheId;
+
+ THolder<TEvStateStorage::TEvBoardInfo> LookupResponse;
+ THolder<TEvTxProxySchemeCache::TEvNavigateKeySetResult> SchemeCacheResponse;
+
+ bool ResolveResources = false;
+ ui64 LookupCookie = 0;
+
+ void Reply(IEventBase* ev) {
+ Send(ReplyTo, ev);
+ PassAway();
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::DISCOVERY_ACTOR;
+ }
+
+ explicit TDiscoverer(const TString& database, const TActorId& replyTo, const TActorId& cacheId)
+ : Database(database)
+ , ReplyTo(replyTo)
+ , CacheId(cacheId)
+ {
+ }
+
+ void Bootstrap() {
+ Lookup(Database);
+ Navigate(Database);
+
+ Become(&TThis::StateWork);
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+ void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
+ DLOG_T("Handle " << LookupResponse->ToString()
+ << ": cookie# " << ev->Cookie);
+
+ if (ev->Cookie != LookupCookie) {
+ DLOG_D("Stale lookup response"
+ << ": got# " << ev->Cookie
+ << ", expected# " << LookupCookie);
+ return;
+ }
+
+ LookupResponse.Reset(ev->Release().Release());
+
+ MaybeReply();
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ SchemeCacheResponse.Reset(ev->Release().Release());
+
+ const auto* response = SchemeCacheResponse.Get()->Request.Get();
+
+ Y_VERIFY(response->ResultSet.size() == 1);
+ const auto& entry = response->ResultSet.front();
+
+ DLOG_T("Handle " << SchemeCacheResponse->ToString()
+ << ": entry# " << entry.ToString());
+
+ if (response->ErrorCount > 0) {
+ switch (entry.Status) {
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
+ return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::DATABASE_NOT_EXIST,
+ "Requested database not exists"));
+ default:
+ DLOG_D("Unexpected status"
+ << ": entry# " << entry.ToString());
+ return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR,
+ "Database resolve failed with no certain result"));
+ }
+ }
+
+ if (!entry.DomainInfo) {
+ DLOG_D("Empty domain info"
+ << ": entry# " << entry.ToString());
+ return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR,
+ "Database resolve failed with no certain result"));
+ }
+
+ auto info = entry.DomainInfo;
+ if (info->DomainKey != info->ResourcesDomainKey) {
+ DLOG_D("Resolve resources domain"
+ << ": domain key# " << info->DomainKey
+ << ", resources domain key# " << info->ResourcesDomainKey);
+
+ Navigate(info->ResourcesDomainKey);
+ ResolveResources = true;
+ } else if (ResolveResources) {
+ Lookup(CanonizePath(entry.Path));
+ }
+
+ MaybeReply();
+ }
+
+ void MaybeReply() {
+ if (!LookupResponse || !SchemeCacheResponse) {
+ return;
+ }
+
+ {
+ // check presence of database (acl should be checked here too)
+ const auto& entry = SchemeCacheResponse->Request->ResultSet.front();
+ const auto isDomain = entry.Path.size() == 1;
+ const auto isSubDomain = entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindSubdomain
+ || entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindExtSubdomain;
+
+ if (!isDomain && !isSubDomain) {
+ DLOG_D("Path is not database"
+ << ": entry# " << entry.ToString());
+ return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::ACCESS_DENIED,
+ "Requested path is not database name"));
+ }
+ }
+
+ if (LookupResponse->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
+ DLOG_D("Lookup error"
+ << ": status# " << ui64(LookupResponse->Status));
+ return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR,
+ "Database nodes resolve failed with no certain result"));
+ }
+
+ Reply(new TEvStateStorage::TEvBoardInfo(*LookupResponse));
+ }
+
+ void Lookup(const TString& db) {
+ DLOG_T("Lookup"
+ << ": path# " << db);
+
+ const auto path = NKikimr::SplitPath(db);
+ const auto domainName = path ? path[0] : TString();
+ auto* domainInfo = AppData()->DomainsInfo->GetDomainByName(domainName);
+ if (!domainInfo) {
+ return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::DATABASE_NOT_EXIST,
+ "Database " + domainName + " not exists"));
+ }
+
+ TString database;
+ for (const auto& token : path) {
+ if (token.size() > 4100) {
+ return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::KEY_PARSE_ERROR,
+ "Requested database name too long"));
+ }
+
+ database.append("/").append(token);
+ }
+
+ const auto stateStorageGroupId = domainInfo->DefaultStateStorageGroup;
+ const auto reqPath = MakeEndpointsBoardPath(database);
+
+ Send(CacheId, new NDiscoveryPrivate::TEvPrivate::TEvRequest(reqPath, stateStorageGroupId), 0, ++LookupCookie);
+ LookupResponse.Reset();
+ }
+
+ static void FillNavigateKey(const TString& path, NSchemeCache::TSchemeCacheNavigate::TEntry& entry) {
+ entry.Path = NKikimr::SplitPath(path);
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath;
+ }
+
+ static void FillNavigateKey(const TPathId& pathId, NSchemeCache::TSchemeCacheNavigate::TEntry& entry) {
+ entry.TableId = TTableId(pathId.OwnerId, pathId.LocalPathId);
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
+ }
+
+ template <typename T>
+ void Navigate(const T& id) {
+ DLOG_T("Navigate"
+ << ": path# " << id);
+
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
+ entry.RedirectRequired = false;
+ FillNavigateKey(id, entry);
+
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
+ SchemeCacheResponse.Reset();
+ }
+};
+
+IActor* CreateDiscoverer(const TString& database, const TActorId& replyTo, const TActorId& cacheId) {
+ return new TDiscoverer(database, replyTo, cacheId);
+}
+
+IActor* CreateDiscoveryCache() {
+ return new NDiscoveryPrivate::TDiscoveryCache();
+}
+
+}
+
+#undef DLOG_T
+#undef DLOG_D
+#undef CLOG_T
+#undef CLOG_D
+#undef LOG_T
+#undef LOG_D
diff --git a/ydb/core/base/discovery.h b/ydb/core/base/discovery.h
new file mode 100644
index 0000000000..c9b8ad0547
--- /dev/null
+++ b/ydb/core/base/discovery.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "defs.h"
+#include "events.h"
+
+namespace NKikimr {
+
+struct TEvDiscovery {
+ enum EEv {
+ EvError = EventSpaceBegin(TKikimrEvents::ES_DISCOVERY),
+ EvEnd
+ };
+
+ struct TEvError: public TEventLocal<TEvError, EvError> {
+ enum EStatus {
+ KEY_PARSE_ERROR,
+ RESOLVE_ERROR,
+ DATABASE_NOT_EXIST,
+ ACCESS_DENIED,
+ };
+
+ EStatus Status;
+ TString Error;
+
+ explicit TEvError(EStatus status, const TString& error)
+ : Status(status)
+ , Error(error)
+ {
+ }
+ };
+};
+
+// Reply with:
+// - in case of success: TEvStateStorage::TEvBoardInfo
+// - otherwise: TEvDiscovery::TEvError
+IActor* CreateDiscoverer(const TString& database, const TActorId& replyTo, const TActorId& cacheId);
+
+// Used to reduce number of requests to Board
+IActor* CreateDiscoveryCache();
+
+}
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index 6123921917..7ee00f8332 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -158,6 +158,7 @@ struct TKikimrEvents : TEvents {
ES_METADATA_SECRET,
ES_TEST_LOAD,
ES_GRPC_CANCELATION,
+ ES_DISCOVERY,
};
};
diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp
index 75550f92ac..088eed6a2f 100644
--- a/ydb/core/grpc_services/rpc_discovery.cpp
+++ b/ydb/core/grpc_services/rpc_discovery.cpp
@@ -3,11 +3,8 @@
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
-#include <ydb/core/base/statestorage.h>
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/base/path.h>
+#include <ydb/core/base/discovery.h>
#include <ydb/core/base/location.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -18,126 +15,19 @@
#include <util/random/shuffle.h>
-namespace NKikimr {
-namespace NGRpcService {
+namespace NKikimr::NGRpcService {
using namespace NActors;
using namespace Ydb;
using namespace NKqp;
-namespace NDiscoveryPrivate {
- struct TEvPrivate {
- enum EEv {
- EvRequest = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
- EvEnd
- };
-
- struct TEvRequest : public TEventLocal<TEvRequest, EvRequest> {
- const TString Database;
- const ui32 StateStorageId;
-
- TEvRequest(const TString &db, ui32 stateStorageId)
- : Database(db)
- , StateStorageId(stateStorageId)
- {}
- };
- };
-
- class TDiscoveryCache : public TActor<TDiscoveryCache> {
- THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> OldInfo;
- THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> NewInfo;
-
- struct TWaiter {
- TActorId ActorId;
- ui64 Cookie;
- };
-
- THashMap<TString, TVector<TWaiter>> Requested;
- bool Scheduled;
-
- void Handle(TEvStateStorage::TEvBoardInfo::TPtr &ev) {
- THolder<TEvStateStorage::TEvBoardInfo> msg = ev->Release();
- const TString &path = msg->Path;
-
- auto vecIt = Requested.find(path);
- if (vecIt != Requested.end()) {
- for (auto &x : vecIt->second)
- Send(x.ActorId, new TEvStateStorage::TEvBoardInfo(*msg), 0, x.Cookie);
- Requested.erase(vecIt);
- }
-
- NewInfo.emplace(path, std::move(msg));
-
- if (!Scheduled) {
- Scheduled = true;
- Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
- }
- }
-
- void Wakeup() {
- OldInfo.swap(NewInfo);
- NewInfo.clear();
-
- if (!OldInfo.empty()) {
- Scheduled = true;
- Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
- } else {
- Scheduled = false;
- }
- }
-
- void Handle(TEvPrivate::TEvRequest::TPtr &ev) {
- auto *msg = ev->Get();
- if (auto *x = OldInfo.FindPtr(msg->Database)) {
- Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie);
- return;
- }
-
- if (auto *x = NewInfo.FindPtr(msg->Database)) {
- Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie);
- return;
- }
-
- auto &rqstd = Requested[msg->Database];
- if (rqstd.empty()) {
- Register(CreateBoardLookupActor(msg->Database, SelfId(), msg->StateStorageId, EBoardLookupMode::Second, false, false));
- }
-
- rqstd.push_back({ev->Sender, ev->Cookie});
- }
- public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::GRPC_REQ;
- }
-
- TDiscoveryCache()
- : TActor(&TThis::StateWork)
- , Scheduled(false)
- {}
-
- STFUNC(StateWork) {
- Y_UNUSED(ctx);
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvPrivate::TEvRequest, Handle);
- hFunc(TEvStateStorage::TEvBoardInfo, Handle);
- cFunc(TEvents::TEvWakeup::EventType, Wakeup);
- }
- }
- };
-}
-
class TListEndpointsRPC : public TActorBootstrapped<TListEndpointsRPC> {
THolder<TEvListEndpointsRequest> Request;
const TActorId CacheId;
-
- const bool RequestScheme = true;
+ TActorId Discoverer;
THolder<TEvStateStorage::TEvBoardInfo> LookupResponse;
THolder<TEvInterconnect::TEvNodeInfo> NameserviceResponse;
- THolder<TEvTxProxySchemeCache::TEvNavigateKeySetResult> SchemeCacheResponse;
-
- bool ResolveResources = false;
- ui64 LookupCookie = 0;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -151,119 +41,58 @@ public:
void Bootstrap() {
// request endpoints
- Lookup(Request->GetProtoRequest()->database());
+ Discoverer = Register(CreateDiscoverer(Request->GetProtoRequest()->database(), SelfId(), CacheId));
// request self node info
Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(SelfId().NodeId()));
- // request path info
- if (RequestScheme) {
- Navigate(Request->GetProtoRequest()->database());
- }
-
Become(&TThis::StateWait);
}
- STFUNC(StateWait) {
- Y_UNUSED(ctx);
+ STATEFN(StateWait) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvBoardInfo, Handle);
hFunc(TEvInterconnect::TEvNodeInfo, Handle);
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvDiscovery::TEvError, Handle);
}
}
void Handle(TEvStateStorage::TEvBoardInfo::TPtr &ev) {
- if (ev->Cookie != LookupCookie) {
- return;
- }
-
- LookupResponse = THolder<TEvStateStorage::TEvBoardInfo>(ev->Release().Release());
-
+ LookupResponse.Reset(ev->Release().Release());
TryReplyAndDie();
}
void Handle(TEvInterconnect::TEvNodeInfo::TPtr &ev) {
- NameserviceResponse = THolder<TEvInterconnect::TEvNodeInfo>(ev->Release().Release());
-
+ NameserviceResponse.Reset(ev->Release().Release());
TryReplyAndDie();
}
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) {
- SchemeCacheResponse = THolder<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(ev->Release().Release());
-
- TEvTxProxySchemeCache::TEvNavigateKeySetResult *msg = SchemeCacheResponse.Get();
- NSchemeCache::TSchemeCacheNavigate *navigate = msg->Request.Get();
-
- Y_VERIFY(navigate->ResultSet.size() == 1);
- const auto &entry = navigate->ResultSet.front();
-
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY,
- "TListEndpointsRPC: handle TEvNavigateKeySetResult"
- << ", entry: " << entry.ToString());
-
- if (navigate->ErrorCount > 0) {
- switch (entry.Status) {
- case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
- case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
- {
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DATABASE_NOT_EXIST, "Requested database not exists");
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
- NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(Ydb::StatusIds::NOT_FOUND, issueMessages);
- return PassAway();
- }
- default:
- {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY,
- "TListEndpointsRPC: GENERIC_RESOLVE_ERROR"
- << ", entry: " << entry.ToString());
-
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, "Database resolve failed with no certain result");
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
- NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(Ydb::StatusIds::UNAVAILABLE, issueMessages);
- return PassAway();
- }
- }
- }
-
- if (!entry.DomainInfo) {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY,
- "TListEndpointsRPC: GENERIC_RESOLVE_ERROR (empty domain info)"
- << ", entry: " << entry.ToString());
-
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, "Database resolve failed with no certain result");
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
- NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(Ydb::StatusIds::UNAVAILABLE, issueMessages);
- return PassAway();
- }
+ void Handle(TEvDiscovery::TEvError::TPtr &ev) {
+ auto issue = MakeIssue(ErrorToIssueCode(ev->Get()->Status), ev->Get()->Error);
+ google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
+ NYql::IssueToMessage(issue, issueMessages.Add());
+ Request->SendResult(ErrorToStatusCode(ev->Get()->Status), issueMessages);
+ PassAway();
+ }
- auto info = entry.DomainInfo;
- if (info->DomainKey != info->ResourcesDomainKey) {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY,
- "TListEndpointsRPC: domain key differs from resources domain key"
- << ", domain key: " << info->DomainKey
- << ", resources domain key: " << info->ResourcesDomainKey);
-
- Navigate(info->ResourcesDomainKey);
- ResolveResources = true;
- } else if (ResolveResources) {
- Lookup(CanonizePath(entry.Path));
+ static NKikimrIssues::TIssuesIds::EIssueCode ErrorToIssueCode(TEvDiscovery::TEvError::EStatus status) {
+ switch (status) {
+ case TEvDiscovery::TEvError::KEY_PARSE_ERROR: return NKikimrIssues::TIssuesIds::KEY_PARSE_ERROR;
+ case TEvDiscovery::TEvError::RESOLVE_ERROR: return NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR;
+ case TEvDiscovery::TEvError::DATABASE_NOT_EXIST: return NKikimrIssues::TIssuesIds::DATABASE_NOT_EXIST;
+ case TEvDiscovery::TEvError::ACCESS_DENIED: return NKikimrIssues::TIssuesIds::ACCESS_DENIED;
+ default: return NKikimrIssues::TIssuesIds::DEFAULT_ERROR;
}
-
- TryReplyAndDie();
}
- void Handle(TEvents::TEvUndelivered::TPtr &ev) {
- Y_UNUSED(ev);
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::UNEXPECTED, "Unexpected error while resolving database");
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
- NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(Ydb::StatusIds::INTERNAL_ERROR, issueMessages);
- return PassAway();
+ static Ydb::StatusIds::StatusCode ErrorToStatusCode(TEvDiscovery::TEvError::EStatus status) {
+ switch (status) {
+ case TEvDiscovery::TEvError::KEY_PARSE_ERROR: return Ydb::StatusIds::BAD_REQUEST;
+ case TEvDiscovery::TEvError::RESOLVE_ERROR: return Ydb::StatusIds::UNAVAILABLE;
+ case TEvDiscovery::TEvError::DATABASE_NOT_EXIST: return Ydb::StatusIds::NOT_FOUND;
+ case TEvDiscovery::TEvError::ACCESS_DENIED: return Ydb::StatusIds::NOT_FOUND;
+ default: return Ydb::StatusIds::BAD_REQUEST;
+ }
}
bool CheckServices(const TSet<TString> &req, const NKikimrStateStorage::TEndpointBoardEntry &entry) {
@@ -278,39 +107,10 @@ public:
}
void TryReplyAndDie() {
- if (!NameserviceResponse || !LookupResponse || (RequestScheme && !SchemeCacheResponse))
+ if (!NameserviceResponse || !LookupResponse)
return;
- if (RequestScheme) {
- // check presence of database (acl should be checked here too)
- const auto &entry = SchemeCacheResponse->Request->ResultSet.front();
- if (entry.Path.size() != 1
- && (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindSubdomain && entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindExtSubdomain))
- {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY,
- "TListEndpointsRPC: SchemeCacheResponse path is not a database"
- << ", entry.Path: " << CanonizePath(entry.Path)
- << ", entry.Kind: " << (ui64)entry.Kind);
-
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, "Requested path is not database name");
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
- NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(Ydb::StatusIds::NOT_FOUND, issueMessages);
- return PassAway();
- }
- }
-
- if (LookupResponse->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY,
- "TListEndpointsRPC: LookupResponse in not OK"
- << ", LookupResponse->Status: " << ui64(LookupResponse->Status));
-
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, "Database nodes resolve failed with no certain result");
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
- NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(Ydb::StatusIds::UNAVAILABLE, issueMessages);
- return PassAway();
- }
+ Y_VERIFY(LookupResponse->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
TStackVec<const TString*> entries;
entries.reserve(LookupResponse->InfoEntries.size());
@@ -414,74 +214,14 @@ public:
}
return true;
}
-
- void Lookup(const TString& db) {
- TVector<TString> path = NKikimr::SplitPath(db);
- auto domainName = path ? path[0] : TString();
- auto *appdata = AppData();
- auto *domainInfo = appdata->DomainsInfo->GetDomainByName(domainName);
- if (!domainInfo) {
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DATABASE_NOT_EXIST, "Database " + domainName + " not exists");
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
- NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(Ydb::StatusIds::BAD_REQUEST, issueMessages);
- return PassAway();
- }
-
- TString database;
- for (auto &x : path) {
- if (x.size() > 4100) {
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::KEY_PARSE_ERROR, "Requested database name too long");
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
- NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(Ydb::StatusIds::BAD_REQUEST, issueMessages);
- return PassAway();
- }
- database.append("/").append(x);
- }
-
- // request endpoints
- auto stateStorageGroupId = domainInfo->DefaultStateStorageGroup;
- auto reqPath = MakeEndpointsBoardPath(database);
-
- Send(CacheId, new NDiscoveryPrivate::TEvPrivate::TEvRequest(reqPath, stateStorageGroupId), 0, ++LookupCookie);
- LookupResponse.Reset();
- }
-
- void FillNavigateKey(const TString& path, NSchemeCache::TSchemeCacheNavigate::TEntry& entry) {
- entry.Path = NKikimr::SplitPath(path);
- entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath;
- }
-
- void FillNavigateKey(const TPathId& pathId, NSchemeCache::TSchemeCacheNavigate::TEntry& entry) {
- entry.TableId = TTableId(pathId.OwnerId, pathId.LocalPathId);
- entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
- }
-
- template <typename T>
- void Navigate(const T& id) {
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY,
- "TListEndpointsRPC: make TEvNavigateKeySet request"
- << ", path: " << id);
-
- auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
-
- request->ResultSet.emplace_back();
- FillNavigateKey(id, request->ResultSet.back());
- request->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
- request->ResultSet.back().RedirectRequired = false;
-
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), IEventHandle::FlagTrackDelivery);
- SchemeCacheResponse.Reset();
- }
};
void TGRpcRequestProxy::Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx) {
- if (!DiscoveryCacheActorID)
- DiscoveryCacheActorID = ctx.Register(new NDiscoveryPrivate::TDiscoveryCache());
+ if (!DiscoveryCacheActorID) {
+ DiscoveryCacheActorID = ctx.Register(CreateDiscoveryCache());
+ }
ctx.Register(new TListEndpointsRPC(ev, DiscoveryCacheActorID));
}
-} // namespace NGRpcService
-} // namespace NKikimr
+}
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 029eac76d8..b2e2f52c48 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -355,6 +355,10 @@ enum EServiceKikimr {
// Background tasks
BG_TASKS = 1700;
+
+ // Discovery
+ DISCOVERY = 1800;
+ DISCOVERY_CACHE = 1801;
};
message TActivity {
@@ -966,5 +970,7 @@ message TActivity {
SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER = 603;
REPLICATION_CONTROLLER_STREAM_REMOVER = 604;
REPLICATION_CONTROLLER_DST_REMOVER = 605;
+ DISCOVERY_ACTOR = 607;
+ DISCOVERY_CACHE_ACTOR = 608;
};
};