diff options
author | alexvru <alexvru@ydb.tech> | 2022-11-09 09:23:26 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-11-09 09:23:26 +0300 |
commit | a2e4ed1195515c99e93377ea2e7804e80882e920 (patch) | |
tree | 223d82ad4f07b4d9c3de6cca2fe7cd8fc663c586 | |
parent | 8a9b85e341bf74e07dec920080119a0a25c82a9e (diff) | |
download | ydb-a2e4ed1195515c99e93377ea2e7804e80882e920.tar.gz |
Improve BlobDepot agent queueing logic
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 10 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 47 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 3 |
3 files changed, 42 insertions, 18 deletions
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 289c266fd8..8745e873ab 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -86,6 +86,7 @@ namespace NKikimr::NBlobDepot { struct TEvPrivate { enum { EvQueryWatchdog = EventSpaceBegin(TEvents::ES_PRIVATE), + EvProcessPendingEvent, }; }; @@ -122,6 +123,7 @@ namespace NKikimr::NBlobDepot { ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY) hFunc(TEvBlobStorage::TEvBunchOfEvents, Handle); + cFunc(TEvPrivate::EvProcessPendingEvent, HandlePendingEvent); cFunc(TEvPrivate::EvQueryWatchdog, HandleQueryWatchdog); ); @@ -140,10 +142,7 @@ namespace NKikimr::NBlobDepot { if (TabletId && TabletId != Max<ui64>()) { ConnectToBlobDepot(); } - - for (auto& ev : std::exchange(PendingEventQ, {})) { - TActivationContext::Send(ev.release()); - } + HandlePendingEvent(); } if (!info->GetTotalVDisksNum()) { TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ProxyId, {}, nullptr, 0)); @@ -275,8 +274,9 @@ namespace NKikimr::NBlobDepot { void HandleQueryWatchdog(); void HandleStorageProxy(TAutoPtr<IEventHandle> ev); + void HandlePendingEvent(); + void ProcessStorageEvent(std::unique_ptr<IEventHandle> ev); void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); - TQuery *CreateQuery(TAutoPtr<IEventHandle> ev); template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index ef077d21fc..8cdf457bcd 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -3,34 +3,55 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::HandleStorageProxy(TAutoPtr<IEventHandle> ev) { - if (TabletId == Max<ui64>()) { + if (TabletId == Max<ui64>() || !PendingEventQ.empty()) { // TODO: memory usage control PendingEventQ.emplace_back(ev.Release()); } else { - auto *query = CreateQuery(ev); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId), - (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName())); - if (!TabletId) { - query->EndWithError(NKikimrProto::ERROR, "group is in error state"); + ProcessStorageEvent(std::unique_ptr<IEventHandle>(ev.Release())); + } + } + + void TBlobDepotAgent::HandlePendingEvent() { + THPTimer timer; + + do { + if (!PendingEventQ.empty()) { + ProcessStorageEvent(std::move(PendingEventQ.front())); + PendingEventQ.pop_front(); } else { - query->Initiate(); + break; } + } while (TDuration::Seconds(timer.Passed()) <= TDuration::MicroSeconds(100)); + + if (!PendingEventQ.empty()) { + TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0)); } } - void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) { - ev->Get()->Process(this); - } + void TBlobDepotAgent::ProcessStorageEvent(std::unique_ptr<IEventHandle> ev) { + TQuery *query = nullptr; - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(TAutoPtr<IEventHandle> ev) { switch (ev->GetTypeRewrite()) { #define XX(TYPE) \ - case TEvBlobStorage::TYPE: return CreateQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release())); + case TEvBlobStorage::TYPE: query = CreateQuery<TEvBlobStorage::TYPE>(std::move(ev)); break; ENUMERATE_INCOMING_EVENTS(XX) #undef XX } - Y_FAIL(); + + Y_VERIFY(query); + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId), + (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName())); + if (!TabletId) { + query->EndWithError(NKikimrProto::ERROR, "group is in error state"); + } else { + query->Initiate(); + } + } + + void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) { + ev->Get()->Process(this); } void TBlobDepotAgent::HandleQueryWatchdog() { diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index d4b08c1305..7e4e3adf46 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -57,6 +57,9 @@ namespace NKikimr::NBlobDepot { if (!ProcessSingleResult(i, value)) { return; } + } else { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA29, "resolve pending", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, GetQueryId()), (QueryIdx, i), (BlobId, query.Id)); } } } |