diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-02-15 17:22:38 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-02-15 17:22:38 +0300 |
commit | 43f8c0b7c5ffc473bc7d12e19cce5c0f45bbd15d (patch) | |
tree | 95c33ac0d639a6d218af93a7bb0dc51078e02948 | |
parent | a21f85ad7d3ea9b211498f3ff32968ae7da66f9a (diff) | |
download | ydb-43f8c0b7c5ffc473bc7d12e19cce5c0f45bbd15d.tar.gz |
Discovery fixes
-rw-r--r-- | ydb/core/discovery/discovery.cpp | 39 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.cpp | 10 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_discovery.cpp | 21 |
3 files changed, 47 insertions, 23 deletions
diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index 41ac6421ca0..d513f2a3b38 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -51,6 +51,22 @@ namespace NDiscoveryPrivate { THashMap<TString, TVector<TWaiter>> Requested; bool Scheduled = false; + auto Request(const TString& database, ui32 groupId) { + auto result = Requested.emplace(database, TVector<TWaiter>()); + if (result.second) { + CLOG_D("Lookup" + << ": path# " << database); + Register(CreateBoardLookupActor(database, SelfId(), groupId, EBoardLookupMode::Second, false, false)); + } + + return result.first; + } + + void Request(const TString& database, ui32 groupId, const TWaiter& waiter) { + auto it = Request(database, groupId); + it->second.push_back(waiter); + } + void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) { CLOG_T("Handle " << ev->Get()->ToString()); @@ -65,6 +81,7 @@ namespace NDiscoveryPrivate { Requested.erase(it); } + OldInfo.erase(path); NewInfo.emplace(path, std::move(msg)); if (!Scheduled) { @@ -90,25 +107,18 @@ namespace NDiscoveryPrivate { const auto* msg = ev->Get(); - if (const auto* x = OldInfo.FindPtr(msg->Database)) { + if (const auto* x = NewInfo.FindPtr(msg->Database)) { Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie); return; } - if (const auto* x = NewInfo.FindPtr(msg->Database)) { + if (const auto* x = OldInfo.FindPtr(msg->Database)) { + Request(msg->Database, msg->StateStorageId); Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie); return; } - auto& requested = Requested[msg->Database]; - if (requested.empty()) { - CLOG_D("Lookup" - << ": path# " << msg->Database); - - Register(CreateBoardLookupActor(msg->Database, SelfId(), msg->StateStorageId, EBoardLookupMode::Second, false, false)); - } - - requested.push_back({ev->Sender, ev->Cookie}); + Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie}); } public: @@ -335,10 +345,3 @@ IActor* CreateDiscoveryCache() { } } - -#undef DLOG_T -#undef DLOG_D -#undef CLOG_T -#undef CLOG_D -#undef LOG_T -#undef LOG_D diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index c52bf5fc46c..60e203c2c17 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -266,14 +266,20 @@ private: } virtual void PassAway() override { - for (auto& [database, queue] : DeferredEvents) { + for (auto& [_, queue] : DeferredEvents) { for (TEventReqHolder& req : queue) { req.Ctx->ReplyUnavaliable(); } } - for (const auto& [database, actor] : Subscribers) { + + for (const auto& [_, actor] : Subscribers) { Send(actor, new TEvents::TEvPoisonPill()); } + + if (DiscoveryCacheActorID) { + Send(DiscoveryCacheActorID, new TEvents::TEvPoisonPill()); + } + TBase::PassAway(); } diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp index 3cf14e7c437..ee1285e4ab0 100644 --- a/ydb/core/grpc_services/rpc_discovery.cpp +++ b/ydb/core/grpc_services/rpc_discovery.cpp @@ -50,6 +50,14 @@ public: Become(&TThis::StateWait); } + void PassAway() override { + if (Discoverer) { + Send(Discoverer, new TEvents::TEvPoisonPill()); + } + + TActorBootstrapped<TListEndpointsRPC>::PassAway(); + } + STATEFN(StateWait) { switch (ev->GetTypeRewrite()) { hFunc(TEvStateStorage::TEvBoardInfo, Handle); @@ -59,6 +67,8 @@ public: } void Handle(TEvStateStorage::TEvBoardInfo::TPtr &ev) { + Discoverer = {}; + LookupResponse.Reset(ev->Release().Release()); TryReplyAndDie(); } @@ -69,11 +79,12 @@ public: } void Handle(TEvDiscovery::TEvError::TPtr &ev) { + Discoverer = {}; + auto issue = MakeIssue(ErrorToIssueCode(ev->Get()->Status), ev->Get()->Error); google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages; NYql::IssueToMessage(issue, issueMessages.Add()); - Request->SendResult(ErrorToStatusCode(ev->Get()->Status), issueMessages); - PassAway(); + Reply(ErrorToStatusCode(ev->Get()->Status), issueMessages); } static NKikimrIssues::TIssuesIds::EIssueCode ErrorToIssueCode(TEvDiscovery::TEvError::EStatus status) { @@ -202,8 +213,12 @@ public: result->set_self_location(location); } + Reply(*result, Ydb::StatusIds::SUCCESS); + } - Request->SendResult(*result, Ydb::StatusIds::SUCCESS); + template <typename... Args> + void Reply(Args&&... args) { + Request->SendResult(std::forward<Args>(args)...); PassAway(); } |