aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-02-15 17:22:38 +0300
committerilnaz <ilnaz@ydb.tech>2023-02-15 17:22:38 +0300
commit43f8c0b7c5ffc473bc7d12e19cce5c0f45bbd15d (patch)
tree95c33ac0d639a6d218af93a7bb0dc51078e02948
parenta21f85ad7d3ea9b211498f3ff32968ae7da66f9a (diff)
downloadydb-43f8c0b7c5ffc473bc7d12e19cce5c0f45bbd15d.tar.gz
Discovery fixes
-rw-r--r--ydb/core/discovery/discovery.cpp39
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp10
-rw-r--r--ydb/core/grpc_services/rpc_discovery.cpp21
3 files changed, 47 insertions, 23 deletions
diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp
index 41ac6421ca0..d513f2a3b38 100644
--- a/ydb/core/discovery/discovery.cpp
+++ b/ydb/core/discovery/discovery.cpp
@@ -51,6 +51,22 @@ namespace NDiscoveryPrivate {
THashMap<TString, TVector<TWaiter>> Requested;
bool Scheduled = false;
+ auto Request(const TString& database, ui32 groupId) {
+ auto result = Requested.emplace(database, TVector<TWaiter>());
+ if (result.second) {
+ CLOG_D("Lookup"
+ << ": path# " << database);
+ Register(CreateBoardLookupActor(database, SelfId(), groupId, EBoardLookupMode::Second, false, false));
+ }
+
+ return result.first;
+ }
+
+ void Request(const TString& database, ui32 groupId, const TWaiter& waiter) {
+ auto it = Request(database, groupId);
+ it->second.push_back(waiter);
+ }
+
void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
CLOG_T("Handle " << ev->Get()->ToString());
@@ -65,6 +81,7 @@ namespace NDiscoveryPrivate {
Requested.erase(it);
}
+ OldInfo.erase(path);
NewInfo.emplace(path, std::move(msg));
if (!Scheduled) {
@@ -90,25 +107,18 @@ namespace NDiscoveryPrivate {
const auto* msg = ev->Get();
- if (const auto* x = OldInfo.FindPtr(msg->Database)) {
+ if (const auto* x = NewInfo.FindPtr(msg->Database)) {
Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie);
return;
}
- if (const auto* x = NewInfo.FindPtr(msg->Database)) {
+ if (const auto* x = OldInfo.FindPtr(msg->Database)) {
+ Request(msg->Database, msg->StateStorageId);
Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie);
return;
}
- auto& requested = Requested[msg->Database];
- if (requested.empty()) {
- CLOG_D("Lookup"
- << ": path# " << msg->Database);
-
- Register(CreateBoardLookupActor(msg->Database, SelfId(), msg->StateStorageId, EBoardLookupMode::Second, false, false));
- }
-
- requested.push_back({ev->Sender, ev->Cookie});
+ Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie});
}
public:
@@ -335,10 +345,3 @@ IActor* CreateDiscoveryCache() {
}
}
-
-#undef DLOG_T
-#undef DLOG_D
-#undef CLOG_T
-#undef CLOG_D
-#undef LOG_T
-#undef LOG_D
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index c52bf5fc46c..60e203c2c17 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -266,14 +266,20 @@ private:
}
virtual void PassAway() override {
- for (auto& [database, queue] : DeferredEvents) {
+ for (auto& [_, queue] : DeferredEvents) {
for (TEventReqHolder& req : queue) {
req.Ctx->ReplyUnavaliable();
}
}
- for (const auto& [database, actor] : Subscribers) {
+
+ for (const auto& [_, actor] : Subscribers) {
Send(actor, new TEvents::TEvPoisonPill());
}
+
+ if (DiscoveryCacheActorID) {
+ Send(DiscoveryCacheActorID, new TEvents::TEvPoisonPill());
+ }
+
TBase::PassAway();
}
diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp
index 3cf14e7c437..ee1285e4ab0 100644
--- a/ydb/core/grpc_services/rpc_discovery.cpp
+++ b/ydb/core/grpc_services/rpc_discovery.cpp
@@ -50,6 +50,14 @@ public:
Become(&TThis::StateWait);
}
+ void PassAway() override {
+ if (Discoverer) {
+ Send(Discoverer, new TEvents::TEvPoisonPill());
+ }
+
+ TActorBootstrapped<TListEndpointsRPC>::PassAway();
+ }
+
STATEFN(StateWait) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvBoardInfo, Handle);
@@ -59,6 +67,8 @@ public:
}
void Handle(TEvStateStorage::TEvBoardInfo::TPtr &ev) {
+ Discoverer = {};
+
LookupResponse.Reset(ev->Release().Release());
TryReplyAndDie();
}
@@ -69,11 +79,12 @@ public:
}
void Handle(TEvDiscovery::TEvError::TPtr &ev) {
+ Discoverer = {};
+
auto issue = MakeIssue(ErrorToIssueCode(ev->Get()->Status), ev->Get()->Error);
google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
NYql::IssueToMessage(issue, issueMessages.Add());
- Request->SendResult(ErrorToStatusCode(ev->Get()->Status), issueMessages);
- PassAway();
+ Reply(ErrorToStatusCode(ev->Get()->Status), issueMessages);
}
static NKikimrIssues::TIssuesIds::EIssueCode ErrorToIssueCode(TEvDiscovery::TEvError::EStatus status) {
@@ -202,8 +213,12 @@ public:
result->set_self_location(location);
}
+ Reply(*result, Ydb::StatusIds::SUCCESS);
+ }
- Request->SendResult(*result, Ydb::StatusIds::SUCCESS);
+ template <typename... Args>
+ void Reply(Args&&... args) {
+ Request->SendResult(std::forward<Args>(args)...);
PassAway();
}