1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
#include "agent_impl.h"
namespace NKikimr::NBlobDepot {
TRequestInFlight::TRequestInFlight(ui64 id, TRequestSender *sender, TRequestContext::TPtr context,
TCancelCallback cancelCallback, bool toBlobDepotTablet)
: Id(id)
, Sender(sender)
, Context(std::move(context))
, CancelCallback(std::move(cancelCallback))
, ToBlobDepotTablet(toBlobDepotTablet)
{
Sender->RegisterRequestInFlight(this);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TRequestSender class
TRequestSender::TRequestSender(TBlobDepotAgent& agent)
: Agent(agent)
{}
TRequestSender::~TRequestSender() {
ClearRequestsInFlight();
}
void TRequestSender::ClearRequestsInFlight() {
RequestsInFlight.ForEach([this](TRequestInFlight *requestInFlight) {
auto& map = requestInFlight->ToBlobDepotTablet ? Agent.TabletRequestInFlight : Agent.OtherRequestInFlight;
auto node = map.extract(requestInFlight->Id);
Y_VERIFY(node);
if (requestInFlight->CancelCallback) {
requestInFlight->CancelCallback();
}
});
}
void TRequestSender::OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response) {
requestInFlight.Unlink();
ProcessResponse(requestInFlight.Id, std::move(requestInFlight.Context), std::move(response));
}
void TRequestSender::RegisterRequestInFlight(TRequestInFlight *requestInFlight) {
RequestsInFlight.PushBack(requestInFlight);
}
TString TRequestSender::ToString(const TResponse& response) {
auto printer = [](auto& value) -> TString {
using T = std::decay_t<decltype(value)>;
if constexpr (std::is_same_v<T, TTabletDisconnected>) {
return "TTabletDisconnected";
} else if constexpr (std::is_same_v<T, TKeyResolved>) {
return "TKeyResolved";
} else {
return value->ToString();
}
};
return std::visit(printer, response);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TBlobDepotAgent machinery
void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context,
TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet) {
TRequestsInFlight& map = toBlobDepotTablet ? TabletRequestInFlight : OtherRequestInFlight;
const bool inserted = map.emplace(id, sender, std::move(context), std::move(cancelCallback),
toBlobDepotTablet).second;
Y_VERIFY(inserted);
}
template<typename TEvent>
void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleTabletResponse", (AgentId, LogId),
(Id, ev->Cookie), (Type, TypeName<TEvent>()), (Sender, ev->Sender), (PipeServerId, PipeServerId),
(Match, ev->Sender == PipeServerId));
if (ev->Sender == PipeServerId) {
Y_VERIFY(IsConnected || ev->GetTypeRewrite() == TEvBlobDepot::EvRegisterAgentResult);
OnRequestComplete(ev->Cookie, ev->Get(), TabletRequestInFlight);
}
}
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);
template<typename TEvent>
void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "HandleOtherResponse", (AgentId, LogId),
(Id, ev->Cookie), (Type, TypeName<TEvent>()));
OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight);
}
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvGetResult::TPtr ev);
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev);
void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) {
if (auto node = map.extract(id)) {
auto& requestInFlight = node.value();
requestInFlight.Sender->OnRequestComplete(requestInFlight, std::move(response));
}
}
void TBlobDepotAgent::DropTabletRequest(ui64 id) {
const size_t numErased = TabletRequestInFlight.erase(id);
Y_VERIFY(numErased == 1);
}
} // NKikimr::NBlobDepot
|