1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
#pragma once
#include "defs.h"
namespace NKikimr {
class TDiskResponsivenessTracker {
public:
using TDiskId = ui32; // VDisk order number inside group
struct TPerDiskStats : TThrRefBase {
struct TInfo {
TDuration Responsiveness;
ui32 NumRequests;
TInstant Since;
TInstant Until;
TString ToString() const {
const TInstant now = TInstant::Now();
const TDuration since = now - Since;
const TDuration until = now - Until;
return Sprintf("%s/%" PRIu32 "/%s-%s", Responsiveness.ToString().data(), (ui32)NumRequests, since.ToString().data(),
until.ToString().data());
}
};
THashMap<TDiskId, TInfo> DiskData;
};
using TPerDiskStatsPtr = TIntrusivePtr<TPerDiskStats>;
private:
struct TItem {
TInstant Timestamp;
TDuration Value;
TItem(TInstant timestamp, TDuration value)
: Timestamp(timestamp)
, Value(value)
{}
};
const ui32 MaxItems;
const TDuration Window;
THashMap<TDiskId, TDeque<TItem>> PerDiskQ;
public:
TDiskResponsivenessTracker(ui32 maxItems, TDuration window)
: MaxItems(maxItems)
, Window(window)
{}
void Register(const TDiskId &id, TInstant timestamp, TDuration value) {
auto &q = PerDiskQ[id];
if (q && timestamp < q.back().Timestamp) {
timestamp = q.back().Timestamp;
}
q.emplace_back(timestamp, value);
if (q.size() > MaxItems) {
q.pop_front();
}
}
TPerDiskStatsPtr Update(TInstant timestamp) {
const TInstant barrier = timestamp - Window;
TPerDiskStatsPtr result = MakeIntrusive<TPerDiskStats>();
for (auto diskIt = PerDiskQ.begin(); diskIt != PerDiskQ.end(); ) {
const TDiskId &id = diskIt->first;
auto &q = diskIt->second;
// find the 90-th percentile for this disk
TVector<TDuration> values;
values.reserve(q.size());
for (const TItem &item : q) {
values.push_back(item.Value);
}
std::sort(values.begin(), values.end());
TDuration value = TDuration::Zero();
if (values) {
const size_t pos = values.size() * 9 / 10;
value = values[pos];
}
// clamp it to the maximum value of last 3 items
auto qIter = q.rbegin();
for (ui32 i = 0; i < 3 && qIter != q.rend(); ++i, ++qIter) {
if (qIter->Value > value) {
value = qIter->Value;
}
}
// store it in the result
TPerDiskStats::TInfo info;
info.Responsiveness = value;
info.NumRequests = q.size();
info.Since = q ? q.front().Timestamp : TInstant();
info.Until = q ? q.back().Timestamp : TInstant();
result->DiskData.emplace(id, info);
// cut excessive items
auto comp = [](const TItem &x, const TInstant &y) {
return x.Timestamp < y;
};
auto it = std::lower_bound(q.begin(), q.end(), barrier, comp);
q.erase(q.begin(), it);
// advance iterator and delete queue if it becomes empty
auto current = diskIt;
++diskIt;
if (q.empty()) {
PerDiskQ.erase(current);
}
}
return result;
}
bool IsEmpty() const {
return PerDiskQ.empty();
}
};
} // NKikimr
|