1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
#include "agent_impl.h"
namespace NKikimr::NBlobDepot {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TRequestSender class
TRequestSender::TRequestSender(TBlobDepotAgent& agent)
: Agent(agent)
{}
TRequestSender::~TRequestSender() {
if (this != &Agent) {
for (const auto& [id, context] : RequestsInFlight) {
const ui64 id_ = id;
auto tryToProcess = [&](auto& map) {
if (const auto it = map.find(id_); it != map.end()) {
TBlobDepotAgent::TRequestInFlight& request = it->second;
if (request.CancelCallback) {
request.CancelCallback();
}
map.erase(it);
return true;
} else {
return false;
}
};
const bool success = tryToProcess(Agent.TabletRequestInFlight) || tryToProcess(Agent.OtherRequestInFlight);
Y_VERIFY(success);
}
}
}
void TRequestSender::RegisterRequest(ui64 id, TRequestContext::TPtr context) {
const auto [_, inserted] = RequestsInFlight.emplace(id, std::move(context));
Y_VERIFY(inserted);
}
void TRequestSender::OnRequestComplete(ui64 id, TResponse response) {
const auto it = RequestsInFlight.find(id);
Y_VERIFY(it != RequestsInFlight.end());
TRequestContext::TPtr context = std::move(it->second);
RequestsInFlight.erase(it);
ProcessResponse(id, std::move(context), std::move(response));
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TBlobDepotAgent machinery
void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context,
TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet) {
TRequestsInFlight& map = toBlobDepotTablet ? TabletRequestInFlight : OtherRequestInFlight;
const auto [_, inserted] = map.emplace(id, TRequestInFlight{sender, std::move(cancelCallback)});
Y_VERIFY(inserted);
sender->RegisterRequest(id, std::move(context));
}
template<typename TEvent>
void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId),
(Id, ev->Cookie), (Type, TypeName<TEvent>()));
auto *event = ev->Get();
OnRequestComplete(ev->Cookie, event, TabletRequestInFlight);
}
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);
template<typename TEvent>
void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId),
(Id, ev->Cookie), (Type, TypeName<TEvent>()));
auto *event = ev->Get();
OnRequestComplete(ev->Cookie, event, OtherRequestInFlight);
}
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvGetResult::TPtr ev);
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev);
void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) {
const auto it = map.find(id);
Y_VERIFY(it != map.end());
auto& [_, request] = *it;
request.Sender->OnRequestComplete(id, std::move(response));
map.erase(it);
}
} // NKikimr::NBlobDepot
|