aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/persqueue/partition_monitoring.cpp
blob: 460582d8b88f9076e0201606fb4890625200e508 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#include "event_helpers.h"
#include "common_app.h"
#include "mirrorer.h"
#include "partition_util.h"
#include "partition.h"
#include "read.h"
#include "transaction.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/base/counters.h>
#include <ydb/core/base/path.h>
#include <ydb/core/quoter/public/quoter.h>
#include <ydb/core/protos/counters_pq.pb.h>
#include <ydb/core/protos/msgbus.pb.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <ydb/library/protobuf_printer/security_printer.h>
#include <ydb/public/lib/base/msgbus.h>
#include <library/cpp/html/pcdata/pcdata.h>
#include <library/cpp/monlib/service/pages/templates.h>
#include <library/cpp/time_provider/time_provider.h>
#include <util/folder/path.h>
#include <util/string/escape.h>
#include <util/system/byteorder.h>

namespace NKikimr::NPQ {

void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx) {
    auto GetPartitionState = [&]() {
        if (CurrentStateFunc() == &TThis::StateInit) {
            return "Init";
        } else if (CurrentStateFunc() == &TThis::StateIdle) {
            return "Idle";
        } else {
            Y_ABORT("");
        }
    };

    TStringStream out;
    HTML_PART(out) {
        NAVIGATION_TAB_CONTENT_PART("partition_" << Partition.InternalPartitionId) {
            LAYOUT_ROW() {
                LAYOUT_COLUMN() {
                    PROPERTIES("General") {
                        PROPERTY("Partition", Partition);
                        PROPERTY("State", GetPartitionState());
                        PROPERTY("CreationTime", CreationTime.ToStringLocalUpToSeconds());
                        PROPERTY("InitDuration", InitDuration.ToString());
                    }

                    PROPERTIES("Status") {
                        PROPERTY("Disk", DiskIsFull ? "Full" : "Normal");
                        PROPERTY("Quota", WaitingForSubDomainQuota(ctx) ? "Out of space" : "Normal");
                    }

                    PROPERTIES("Information") {
                        PROPERTY("Total partition size, bytes", Size());
                        PROPERTY("Total message count", (Head.GetNextOffset() - StartOffset));
                        PROPERTY("StartOffset", StartOffset);
                        PROPERTY("EndOffset", EndOffset);
                        PROPERTY("LastOffset", Head.GetNextOffset());
                        PROPERTY("Last message WriteTimestamp", EndWriteTimestamp.ToRfc822String());
                        PROPERTY("HeadOffset", Head.Offset << ", count: " << Head.GetCount());
                    }
                }

                LAYOUT_COLUMN() {
                    PROPERTIES("Runtime information") {
                        PROPERTY("WriteInflightSize", WriteInflightSize);
                        PROPERTY("ReservedBytesSize", ReservedSize);
                        PROPERTY("OwnerPipes", OwnerPipes.size());
                        PROPERTY("Owners", Owners.size());
                        PROPERTY("Currently writing", Responses.size());
                        PROPERTY("MaxCurrently writing", MaxWriteResponsesSize);
                        PROPERTY("DataKeysBody size", DataKeysBody.size());
                    }

                    PROPERTIES("DataKeysHead size") {
                        for (ui32 i = 0; i < DataKeysHead.size(); ++i) {
                            PROPERTY(TStringBuilder() << i, DataKeysHead[i].KeysCount() << " sum: " << DataKeysHead[i].Sum()
                                << " border: " << DataKeysHead[i].Border() << " recs: " << DataKeysHead[i].RecsCount()
                                << " intCount: " << DataKeysHead[i].InternalPartsCount());
                        }
                    }

                    PROPERTIES("AvgWriteSize, bytes") {
                        for (auto& avg : AvgWriteBytes) {
                            PROPERTY(avg.GetDuration().ToString(), avg.GetValue());
                        }
                    }
                }
            }

            LAYOUT_ROW() {
                LAYOUT_COLUMN() {
                    CONFIGURATION(SecureDebugStringMultiline(Config));
                }
            }

            LAYOUT_ROW() {
                LAYOUT_COLUMN() {
                    TABLE_CLASS("table") {
                        TABLEHEAD() {
                            TABLER() {
                                TABLEH() {out << "Type";}
                                TABLEH() {out << "Pos";}
                                TABLEH() {out << "timestamp";}
                                TABLEH() {out << "Offset";}
                                TABLEH() {out << "PartNo";}
                                TABLEH() {out << "Count";}
                                TABLEH() {out << "InternalPartsCount";}
                                TABLEH() {out << "Size";}
                            }
                        }
                        TABLEBODY() {
                            ui32 i = 0;
                            for (auto& d: DataKeysBody) {
                                TABLER() {
                                    TABLED() {out << "DataBody";}
                                    TABLED() {out << i++;}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(d.Timestamp);}
                                    TABLED() {out << d.Key.GetOffset();}
                                    TABLED() {out << d.Key.GetPartNo();}
                                    TABLED() {out << d.Key.GetCount();}
                                    TABLED() {out << d.Key.GetInternalPartsCount();}
                                    TABLED() {out << d.Size;}
                                }
                            }
                            ui32 currentLevel = 0;
                            for (ui32 p = 0; p < HeadKeys.size(); ++p) {
                                ui32 size  = HeadKeys[p].Size;
                                while (currentLevel + 1 < TotalLevels && size < CompactLevelBorder[currentLevel + 1])
                                    ++currentLevel;
                                Y_ABORT_UNLESS(size < CompactLevelBorder[currentLevel]);
                                TABLER() {
                                    TABLED() {out << "DataHead[" << currentLevel << "]";}
                                    TABLED() {out << i++;}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(HeadKeys[p].Timestamp);}
                                    TABLED() {out << HeadKeys[p].Key.GetOffset();}
                                    TABLED() {out << HeadKeys[p].Key.GetPartNo();}
                                    TABLED() {out << HeadKeys[p].Key.GetCount();}
                                    TABLED() {out << HeadKeys[p].Key.GetInternalPartsCount();}
                                    TABLED() {out << size;}
                                }
                            }
                        }
                    }

                    TABLE_CLASS("table") {
                        CAPTION() { out << "Gaps"; }
                        TABLEHEAD() {
                            TABLER() {
                                TABLEH() {out << "GapStartOffset";}
                                TABLEH() {out << "GapEndOffset";}
                                TABLEH() {out << "GapSize";}
                                TABLEH() {out << "id";}
                            }
                        }
                        ui32 i = 0;
                        TABLEBODY() {
                            for (auto& d: GapOffsets) {
                                TABLER() {
                                    TABLED() {out << d.first;}
                                    TABLED() {out << d.second;}
                                    TABLED() {out << (d.second - d.first);}
                                    TABLED() {out << (i++);}
                                }
                            }
                            if (!DataKeysBody.empty() && DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() < Head.Offset) {
                                TABLER() {
                                    TABLED() {out << (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount());}
                                    TABLED() {out << Head.Offset;}
                                    TABLED() {out << (Head.Offset - (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()));}
                                    TABLED() {out << (i++);}
                                }

                            }
                        }
                    }

                    TABLE_CLASS("table") {
                        CAPTION() { out << "Source ids"; }
                        TABLEHEAD() {
                            TABLER() {
                                TABLEH() {out << "SourceId";}
                                TABLEH() {out << "SeqNo";}
                                TABLEH() {out << "Offset";}
                                TABLEH() {out << "WriteTimestamp";}
                                TABLEH() {out << "CreateTimestamp";}
                                TABLEH() {out << "Explicit";}
                                TABLEH() {out << "State";}
                                TABLEH() {out << "LastHeartbeat";}
                            }
                        }
                        TABLEBODY() {
                            for (const auto& [sourceId, sourceIdInfo]: SourceIdStorage.GetInMemorySourceIds()) {
                                TABLER() {
                                    TABLED() {out << EncodeHtmlPcdata(EscapeC(sourceId));}
                                    TABLED() {out << sourceIdInfo.SeqNo;}
                                    TABLED() {out << sourceIdInfo.Offset;}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(sourceIdInfo.WriteTimestamp);}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(sourceIdInfo.CreateTimestamp);}
                                    TABLED() {out << (sourceIdInfo.Explicit ? "true" : "false");}
                                    TABLED() {out << sourceIdInfo.State;}
                                    if (const auto& hb = sourceIdInfo.LastHeartbeat) {
                                        TABLED() {out << hb->Version;}
                                    } else {
                                        TABLED() {out << "null";}
                                    }
                                }
                            }
                        }
                    }

                    TABLE_CLASS("table") {
                        CAPTION() { out << "UsersInfo"; }
                        TABLEHEAD() {
                            TABLER() {
                                TABLEH() {out << "user";}
                                TABLEH() {out << "offset";}
                                TABLEH() {out << "lag";}
                                TABLEH() {out << "ReadFromTimestamp";}
                                TABLEH() {out << "WriteTimestamp";}
                                TABLEH() {out << "CreateTimestamp";}
                                TABLEH() {out << "ReadOffset";}
                                TABLEH() {out << "ReadWriteTimestamp";}
                                TABLEH() {out << "ReadCreateTimestamp";}
                                TABLEH() {out << "ReadOffsetRewindSum";}
                                TABLEH() {out << "ActiveReads";}
                                TABLEH() {out << "Subscriptions";}
                            }
                        }
                        TABLEBODY() {
                            for (auto& d: UsersInfoStorage->GetAll()) {
                                TABLER() {
                                    TABLED() {out << EncodeHtmlPcdata(d.first);}
                                    TABLED() {out << d.second.Offset;}
                                    TABLED() {out << (EndOffset - d.second.Offset);}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.ReadFromTimestamp);}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.WriteTimestamp);}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.CreateTimestamp);}
                                    TABLED() {out << (d.second.GetReadOffset());}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadWriteTimestamp());}
                                    TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadCreateTimestamp());}
                                    TABLED() {out << (d.second.ReadOffsetRewindSum);}
                                    TABLED() {out << d.second.ActiveReads;}
                                    TABLED() {out << d.second.Subscriptions;}
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(Partition, out.Str()));
}

} // namespace NKikimr::NPQ