aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/blobstorage/dsproxy/dsproxy_responsiveness.h
blob: c0b01b06d17f4e6ca21839a2d9a8628e2a97305d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#pragma once

#include "defs.h"

namespace NKikimr {

    class TDiskResponsivenessTracker {
    public:
        using TDiskId = ui32; // VDisk order number inside group

        struct TPerDiskStats : TThrRefBase {
            struct TInfo {
                TDuration Responsiveness;
                ui32 NumRequests;
                TInstant Since;
                TInstant Until;

                TString ToString() const {
                    const TInstant now = TInstant::Now();
                    const TDuration since = now - Since;
                    const TDuration until = now - Until;
                    return Sprintf("%s/%" PRIu32 "/%s-%s", Responsiveness.ToString().data(), (ui32)NumRequests, since.ToString().data(),
                        until.ToString().data());
                }
            };
            THashMap<TDiskId, TInfo> DiskData;
        };

        using TPerDiskStatsPtr = TIntrusivePtr<TPerDiskStats>;

    private:
        struct TItem {
            TInstant  Timestamp;
            TDuration Value;

            TItem(TInstant timestamp, TDuration value)
                : Timestamp(timestamp)
                , Value(value)
            {}
        };

        const ui32 MaxItems;
        const TDuration Window;
        THashMap<TDiskId, TDeque<TItem>> PerDiskQ;

    public:
        TDiskResponsivenessTracker(ui32 maxItems, TDuration window)
            : MaxItems(maxItems)
            , Window(window)
        {}

        void Register(const TDiskId &id, TInstant timestamp, TDuration value) {
            auto &q = PerDiskQ[id];
            if (q && timestamp < q.back().Timestamp) {
                timestamp = q.back().Timestamp;
            }
            q.emplace_back(timestamp, value);
            if (q.size() > MaxItems) {
                q.pop_front();
            }
        }

        TPerDiskStatsPtr Update(TInstant timestamp) {
            const TInstant barrier = timestamp - Window;
            TPerDiskStatsPtr result = MakeIntrusive<TPerDiskStats>();

            for (auto diskIt = PerDiskQ.begin(); diskIt != PerDiskQ.end(); ) {
                const TDiskId &id = diskIt->first;
                auto &q = diskIt->second;

                // find the 90-th percentile for this disk
                TVector<TDuration> values;
                values.reserve(q.size());
                for (const TItem &item : q) {
                    values.push_back(item.Value);
                }
                std::sort(values.begin(), values.end());
                TDuration value = TDuration::Zero();
                if (values) {
                    const size_t pos = values.size() * 9 / 10;
                    value = values[pos];
                }

                // clamp it to the maximum value of last 3 items
                auto qIter = q.rbegin();
                for (ui32 i = 0; i < 3 && qIter != q.rend(); ++i, ++qIter) {
                    if (qIter->Value > value) {
                        value = qIter->Value;
                    }
                }

                // store it in the result
                TPerDiskStats::TInfo info;
                info.Responsiveness = value;
                info.NumRequests = q.size();
                info.Since = q ? q.front().Timestamp : TInstant();
                info.Until = q ? q.back().Timestamp : TInstant();
                result->DiskData.emplace(id, info);

                // cut excessive items
                auto comp = [](const TItem &x, const TInstant &y) {
                    return x.Timestamp < y;
                };
                auto it = std::lower_bound(q.begin(), q.end(), barrier, comp);
                q.erase(q.begin(), it);

                // advance iterator and delete queue if it becomes empty
                auto current = diskIt;
                ++diskIt;
                if (q.empty()) {
                    PerDiskQ.erase(current);
                }
            }

            return result;
        }

        bool IsEmpty() const {
            return PerDiskQ.empty();
        }
    };

} // NKikimr