aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-11-09 09:23:26 +0300
committeralexvru <alexvru@ydb.tech>2022-11-09 09:23:26 +0300
commita2e4ed1195515c99e93377ea2e7804e80882e920 (patch)
tree223d82ad4f07b4d9c3de6cca2fe7cd8fc663c586
parent8a9b85e341bf74e07dec920080119a0a25c82a9e (diff)
downloadydb-a2e4ed1195515c99e93377ea2e7804e80882e920.tar.gz
Improve BlobDepot agent queueing logic
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h10
-rw-r--r--ydb/core/blob_depot/agent/query.cpp47
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp3
3 files changed, 42 insertions, 18 deletions
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 289c266fd8..8745e873ab 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -86,6 +86,7 @@ namespace NKikimr::NBlobDepot {
struct TEvPrivate {
enum {
EvQueryWatchdog = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvProcessPendingEvent,
};
};
@@ -122,6 +123,7 @@ namespace NKikimr::NBlobDepot {
ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY)
hFunc(TEvBlobStorage::TEvBunchOfEvents, Handle);
+ cFunc(TEvPrivate::EvProcessPendingEvent, HandlePendingEvent);
cFunc(TEvPrivate::EvQueryWatchdog, HandleQueryWatchdog);
);
@@ -140,10 +142,7 @@ namespace NKikimr::NBlobDepot {
if (TabletId && TabletId != Max<ui64>()) {
ConnectToBlobDepot();
}
-
- for (auto& ev : std::exchange(PendingEventQ, {})) {
- TActivationContext::Send(ev.release());
- }
+ HandlePendingEvent();
}
if (!info->GetTotalVDisksNum()) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ProxyId, {}, nullptr, 0));
@@ -275,8 +274,9 @@ namespace NKikimr::NBlobDepot {
void HandleQueryWatchdog();
void HandleStorageProxy(TAutoPtr<IEventHandle> ev);
+ void HandlePendingEvent();
+ void ProcessStorageEvent(std::unique_ptr<IEventHandle> ev);
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
- TQuery *CreateQuery(TAutoPtr<IEventHandle> ev);
template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index ef077d21fc..8cdf457bcd 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -3,34 +3,55 @@
namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::HandleStorageProxy(TAutoPtr<IEventHandle> ev) {
- if (TabletId == Max<ui64>()) {
+ if (TabletId == Max<ui64>() || !PendingEventQ.empty()) {
// TODO: memory usage control
PendingEventQ.emplace_back(ev.Release());
} else {
- auto *query = CreateQuery(ev);
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId),
- (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName()));
- if (!TabletId) {
- query->EndWithError(NKikimrProto::ERROR, "group is in error state");
+ ProcessStorageEvent(std::unique_ptr<IEventHandle>(ev.Release()));
+ }
+ }
+
+ void TBlobDepotAgent::HandlePendingEvent() {
+ THPTimer timer;
+
+ do {
+ if (!PendingEventQ.empty()) {
+ ProcessStorageEvent(std::move(PendingEventQ.front()));
+ PendingEventQ.pop_front();
} else {
- query->Initiate();
+ break;
}
+ } while (TDuration::Seconds(timer.Passed()) <= TDuration::MicroSeconds(100));
+
+ if (!PendingEventQ.empty()) {
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0));
}
}
- void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) {
- ev->Get()->Process(this);
- }
+ void TBlobDepotAgent::ProcessStorageEvent(std::unique_ptr<IEventHandle> ev) {
+ TQuery *query = nullptr;
- TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(TAutoPtr<IEventHandle> ev) {
switch (ev->GetTypeRewrite()) {
#define XX(TYPE) \
- case TEvBlobStorage::TYPE: return CreateQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release()));
+ case TEvBlobStorage::TYPE: query = CreateQuery<TEvBlobStorage::TYPE>(std::move(ev)); break;
ENUMERATE_INCOMING_EVENTS(XX)
#undef XX
}
- Y_FAIL();
+
+ Y_VERIFY(query);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId),
+ (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName()));
+ if (!TabletId) {
+ query->EndWithError(NKikimrProto::ERROR, "group is in error state");
+ } else {
+ query->Initiate();
+ }
+ }
+
+ void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) {
+ ev->Get()->Process(this);
}
void TBlobDepotAgent::HandleQueryWatchdog() {
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
index d4b08c1305..7e4e3adf46 100644
--- a/ydb/core/blob_depot/agent/storage_get.cpp
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -57,6 +57,9 @@ namespace NKikimr::NBlobDepot {
if (!ProcessSingleResult(i, value)) {
return;
}
+ } else {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA29, "resolve pending", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, GetQueryId()), (QueryIdx, i), (BlobId, query.Id));
}
}
}