aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/blob_depot/agent/request.cpp
blob: dc830f5cecdcd71e42ddd9cb5733a4438df768c1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include "agent_impl.h"

namespace NKikimr::NBlobDepot {

    TRequestInFlight::TRequestInFlight(ui64 id, TRequestSender *sender, TRequestContext::TPtr context,
            TCancelCallback cancelCallback, bool toBlobDepotTablet)
        : Id(id)
        , Sender(sender)
        , Context(std::move(context))
        , CancelCallback(std::move(cancelCallback))
        , ToBlobDepotTablet(toBlobDepotTablet)
    {
        Sender->RegisterRequestInFlight(this);
    }

    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    // TRequestSender class

    TRequestSender::TRequestSender(TBlobDepotAgent& agent)
        : Agent(agent)
    {}

    TRequestSender::~TRequestSender() {
        ClearRequestsInFlight();
    }

    void TRequestSender::ClearRequestsInFlight() {
        RequestsInFlight.ForEach([this](TRequestInFlight *requestInFlight) {
            auto& map = requestInFlight->ToBlobDepotTablet ? Agent.TabletRequestInFlight : Agent.OtherRequestInFlight;
            auto node = map.extract(requestInFlight->Id);
            Y_VERIFY(node);

            if (requestInFlight->CancelCallback) {
                requestInFlight->CancelCallback();
            }
        });
    }

    void TRequestSender::OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response) {
        requestInFlight.Unlink();
        ProcessResponse(requestInFlight.Id, std::move(requestInFlight.Context), std::move(response));
    }

    void TRequestSender::RegisterRequestInFlight(TRequestInFlight *requestInFlight) {
        RequestsInFlight.PushBack(requestInFlight);
    }

    TString TRequestSender::ToString(const TResponse& response) {
        auto printer = [](auto& value) -> TString {
            using T = std::decay_t<decltype(value)>;
            if constexpr (std::is_same_v<T, TTabletDisconnected>) {
                return "TTabletDisconnected";
            } else if constexpr (std::is_same_v<T, TKeyResolved>) {
                return "TKeyResolved";
            } else {
                return value->ToString();
            }
        };
        return std::visit(printer, response);
    }

    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    // TBlobDepotAgent machinery

    void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context,
            TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet) {
        TRequestsInFlight& map = toBlobDepotTablet ? TabletRequestInFlight : OtherRequestInFlight;
        const bool inserted = map.emplace(id, sender, std::move(context), std::move(cancelCallback),
            toBlobDepotTablet).second;
        Y_VERIFY(inserted);
    }

    template<typename TEvent>
    void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
        STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleTabletResponse", (AgentId, LogId),
            (Id, ev->Cookie), (Type, TypeName<TEvent>()), (Sender, ev->Sender), (PipeServerId, PipeServerId),
            (Match, ev->Sender == PipeServerId));
        if (ev->Sender == PipeServerId) {
            Y_VERIFY(IsConnected || ev->GetTypeRewrite() == TEvBlobDepot::EvRegisterAgentResult);
            OnRequestComplete(ev->Cookie, ev->Get(), TabletRequestInFlight);
        }
    }

    template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev);
    template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev);
    template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev);
    template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev);
    template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev);
    template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
    template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);

    template<typename TEvent>
    void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
        STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "HandleOtherResponse", (AgentId, LogId),
            (Id, ev->Cookie), (Type, TypeName<TEvent>()));
        OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight);
    }

    template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvGetResult::TPtr ev);
    template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev);

    void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) {
        if (auto node = map.extract(id)) {
            auto& requestInFlight = node.value();
            requestInFlight.Sender->OnRequestComplete(requestInFlight, std::move(response));
        }
    }

    void TBlobDepotAgent::DropTabletRequest(ui64 id) {
        const size_t numErased = TabletRequestInFlight.erase(id);
        Y_VERIFY(numErased == 1);
    }

} // NKikimr::NBlobDepot