aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-11-13 20:46:20 +0300
committeralexvru <alexvru@ydb.tech>2022-11-13 20:46:20 +0300
commitb10c413747afc8d9e64c182ac9dbbeebcf767eab (patch)
tree2d69f2e0fa01afc31fc0b311ab38d0cd1b995952
parent8160efc4de0d94701b85b71fe88fb953053f638c (diff)
downloadydb-b10c413747afc8d9e64c182ac9dbbeebcf767eab.tar.gz
Fix agent reconnect bug
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h3
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp13
-rw-r--r--ydb/core/blob_depot/agent/query.cpp3
3 files changed, 15 insertions, 4 deletions
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 4b7a9b70865..70e33e18c8b 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -85,6 +85,7 @@ namespace NKikimr::NBlobDepot {
const ui64 AgentInstanceId;
ui64 TabletId = Max<ui64>();
TActorId PipeId;
+ bool IsConnected = false;
private:
struct TEvPrivate {
@@ -148,7 +149,6 @@ namespace NKikimr::NBlobDepot {
if (TabletId && TabletId != Max<ui64>()) {
ConnectToBlobDepot();
}
- HandlePendingEvent();
}
if (!info->GetTotalVDisksNum()) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ProxyId, {}, nullptr, 0));
@@ -204,6 +204,7 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
void ConnectToBlobDepot();
+ void OnConnect();
void OnDisconnect();
void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override;
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 622b993a8a8..63ce937f32d 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -78,6 +78,8 @@ namespace NKikimr::NBlobDepot {
SpaceColor = msg.GetSpaceColor();
ApproximateFreeSpaceShare = msg.GetApproximateFreeSpaceShare();
+
+ OnConnect();
}
void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) {
@@ -110,8 +112,13 @@ namespace NKikimr::NBlobDepot {
kind.IssueGivenIdRange(msg.GetGivenIdRange());
}
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
- (Msg, msg), (NumAvailableItems, kind.GetNumAvailableItems()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), (Msg, msg),
+ (NumAvailableItems, kind.GetNumAvailableItems()));
+ }
+
+ void TBlobDepotAgent::OnConnect() {
+ IsConnected = true;
+ HandlePendingEvent();
}
void TBlobDepotAgent::OnDisconnect() {
@@ -124,6 +131,8 @@ namespace NKikimr::NBlobDepot {
for (auto& [_, kind] : ChannelKinds) {
kind.IdAllocInFlight = false;
}
+
+ IsConnected = false;
}
void TBlobDepotAgent::ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) {
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index 6969dda9bb3..1832688f3ca 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -15,7 +15,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::HandleStorageProxy(TAutoPtr<IEventHandle> ev) {
std::unique_ptr<IEventHandle> p(ev.Release());
- if (TabletId == Max<ui64>() || !PendingEventQ.empty()) {
+ if (!IsConnected || !PendingEventQ.empty()) {
size_t size = Max<size_t>();
switch (p->GetTypeRewrite()) {
#define XX(TYPE) case TEvBlobStorage::TYPE: size = p->Get<TEvBlobStorage::T##TYPE>()->CalculateSize(); break;
@@ -67,6 +67,7 @@ namespace NKikimr::NBlobDepot {
std::deque<TPendingEvent>::iterator it;
for (it = PendingEventQ.begin(); it != PendingEventQ.end() && it->ExpirationTimestamp <= now; ++it) {
CreateQuery<0>(std::move(it->Event))->EndWithError(NKikimrProto::ERROR, "pending event queue timeout");
+ PendingEventBytes -= it->Size;
}
PendingEventQ.erase(PendingEventQ.begin(), it);