diff options
author | xenoxeno <xeno@ydb.tech> | 2022-12-07 11:54:18 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2022-12-07 11:54:18 +0300 |
commit | eb48b31550cde5bfcc585dc6b7a8e8ccb8eb6f25 (patch) | |
tree | 7711fd91e6f4fca3834c1115a7781119fae05fbb | |
parent | 8103c2467ceb13f1c39b8524a57189d3f556d0dc (diff) | |
download | ydb-eb48b31550cde5bfcc585dc6b7a8e8ccb8eb6f25.tar.gz |
optimize tablet info merging
-rw-r--r-- | ydb/core/node_whiteboard/node_whiteboard.h | 43 | ||||
-rw-r--r-- | ydb/core/protos/node_whiteboard.proto | 3 | ||||
-rw-r--r-- | ydb/core/protos/services.proto | 2 | ||||
-rw-r--r-- | ydb/core/tablet/node_whiteboard.cpp | 78 | ||||
-rw-r--r-- | ydb/core/viewer/json_bsgroupinfo.h | 4 | ||||
-rw-r--r-- | ydb/core/viewer/json_cluster.h | 6 | ||||
-rw-r--r-- | ydb/core/viewer/json_counters.h | 16 | ||||
-rw-r--r-- | ydb/core/viewer/json_nodeinfo.h | 4 | ||||
-rw-r--r-- | ydb/core/viewer/json_pdiskinfo.h | 4 | ||||
-rw-r--r-- | ydb/core/viewer/json_storage.h | 6 | ||||
-rw-r--r-- | ydb/core/viewer/json_sysinfo.h | 4 | ||||
-rw-r--r-- | ydb/core/viewer/json_tabletinfo.h | 44 | ||||
-rw-r--r-- | ydb/core/viewer/json_tenantinfo.h | 53 | ||||
-rw-r--r-- | ydb/core/viewer/json_vdiskinfo.h | 4 | ||||
-rw-r--r-- | ydb/core/viewer/json_wb_req.h | 39 | ||||
-rw-r--r-- | ydb/core/viewer/log.h | 23 | ||||
-rw-r--r-- | ydb/core/viewer/viewer_ut.cpp | 84 | ||||
-rw-r--r-- | ydb/core/viewer/wb_filter.h | 10 | ||||
-rw-r--r-- | ydb/core/viewer/wb_group.h | 10 | ||||
-rw-r--r-- | ydb/core/viewer/wb_merge.h | 126 |
20 files changed, 453 insertions, 110 deletions
diff --git a/ydb/core/node_whiteboard/node_whiteboard.h b/ydb/core/node_whiteboard/node_whiteboard.h index 1dd6f7bf2d8..05aa018e926 100644 --- a/ydb/core/node_whiteboard/node_whiteboard.h +++ b/ydb/core/node_whiteboard/node_whiteboard.h @@ -110,7 +110,48 @@ struct TEvWhiteboard{ struct TEvTabletStateRequest : public TEventPB<TEvTabletStateRequest, NKikimrWhiteboard::TEvTabletStateRequest, EvTabletStateRequest> {}; - struct TEvTabletStateResponse : public TEventPB<TEvTabletStateResponse, NKikimrWhiteboard::TEvTabletStateResponse, EvTabletStateResponse> {}; +#pragma pack(push, 1) + struct TEvTabletStateResponsePacked5 { + ui64 TabletId; + ui32 FollowerId; + ui32 Generation; + NKikimrTabletBase::TTabletTypes::EType Type; + NKikimrWhiteboard::TTabletStateInfo::ETabletState State; + + TEvTabletStateResponsePacked5() = default; + TEvTabletStateResponsePacked5(const NKikimrWhiteboard::TTabletStateInfo& elem) + : TabletId(elem.GetTabletId()) + , FollowerId(elem.GetFollowerId()) + , Generation(elem.GetGeneration()) + , Type(elem.GetType()) + , State(elem.GetState()) + {} + + operator NKikimrWhiteboard::TTabletStateInfo() const { + NKikimrWhiteboard::TTabletStateInfo result; + Fill(result); + return result; + } + + void Fill(NKikimrWhiteboard::TTabletStateInfo& result) const { + result.SetTabletId(TabletId); + result.SetFollowerId(FollowerId); + result.SetGeneration(Generation); + result.SetType(Type); + result.SetState(State); + } + } Y_PACKED; + + static_assert(sizeof(TEvTabletStateResponsePacked5) == 24); +#pragma pack(pop) + + struct TEvTabletStateResponse : public TEventPB<TEvTabletStateResponse, NKikimrWhiteboard::TEvTabletStateResponse, EvTabletStateResponse> { + TEvTabletStateResponsePacked5* AllocatePackedResponse(size_t count) { + auto& packed5 = *Record.MutablePacked5(); + packed5.resize(count * sizeof(TEvTabletStateResponsePacked5)); + return reinterpret_cast<TEvTabletStateResponsePacked5*>(packed5.Detach()); + } + }; struct TEvPDiskStateUpdate : TEventPB<TEvPDiskStateUpdate, NKikimrWhiteboard::TPDiskStateInfo, EvPDiskStateUpdate> { TEvPDiskStateUpdate() = default; diff --git a/ydb/core/protos/node_whiteboard.proto b/ydb/core/protos/node_whiteboard.proto index e9217b9a99c..03c9df2e868 100644 --- a/ydb/core/protos/node_whiteboard.proto +++ b/ydb/core/protos/node_whiteboard.proto @@ -68,7 +68,9 @@ message TTabletStateInfo { message TEvTabletStateRequest { optional uint64 ChangedSince = 1; + optional string Format = 5; // it could be "packed5" optional string GroupBy = 20; // it's either empty or "Type,State" for now + repeated fixed64 FilterTabletId = 22; } message TEvTabletStateResponse { @@ -76,6 +78,7 @@ message TEvTabletStateResponse { optional uint64 ResponseTime = 2; // ms, filled during processing and merging optional uint64 ResponseDuration = 3; // us, filled during collect optional uint64 ProcessDuration = 4; // us, filled during processing + optional bytes Packed5 = 5; } message TNodeStateInfo { diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index c136090061a..0810f2f59c3 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -322,6 +322,8 @@ enum EServiceKikimr { DQ_TASK_RUNNER = 1165; + VIEWER = 1166; + // 1024 - 1099 is reserved for nbs // Change exchange (async indexes & CDC) diff --git a/ydb/core/tablet/node_whiteboard.cpp b/ydb/core/tablet/node_whiteboard.cpp index 0677480cf3c..289e67c7881 100644 --- a/ydb/core/tablet/node_whiteboard.cpp +++ b/ydb/core/tablet/node_whiteboard.cpp @@ -49,13 +49,13 @@ public: auto versionCounter = GetServiceCounters(AppData(ctx)->Counters, "utils")->GetSubgroup("revision", version); *versionCounter->GetCounter("version", false) = 1; } - + // TODO(t1mursadykov): Add role for static nodes with sys tablets only if (AppData(ctx)->DynamicNameserviceConfig) { if (SelfId().NodeId() <= AppData(ctx)->DynamicNameserviceConfig->MaxStaticNodeId) ctx.Send(ctx.SelfID, new TEvWhiteboard::TEvSystemStateAddRole("Storage")); } - + SystemStateInfo.SetStartTime(ctx.Now().MilliSeconds()); ProcessStats.Fill(getpid()); if (ProcessStats.CGroupMemLim != 0) { @@ -655,34 +655,66 @@ protected: } } + static void CopyTabletStateInfo( + NKikimrWhiteboard::TTabletStateInfo& dst, + const NKikimrWhiteboard::TTabletStateInfo& src, + const NKikimrWhiteboard::TEvTabletStateRequest&) + { + dst = src; + } + void Handle(TEvWhiteboard::TEvTabletStateRequest::TPtr &ev, const TActorContext &ctx) { auto now = TMonotonic::Now(); const auto& request = ev->Get()->Record; std::unique_ptr<TEvWhiteboard::TEvTabletStateResponse> response = std::make_unique<TEvWhiteboard::TEvTabletStateResponse>(); auto& record = response->Record; - if (request.groupby().empty()) { - ui64 changedSince = request.has_changedsince() ? request.changedsince() : 0; - for (const auto& pr : TabletStateInfo) { - if (pr.second.changetime() >= changedSince) { - NKikimrWhiteboard::TTabletStateInfo& tabletStateInfo = *record.add_tabletstateinfo(); - tabletStateInfo = pr.second; - } + if (request.format() == "packed5") { + TEvWhiteboard::TEvTabletStateResponsePacked5* ptr = response->AllocatePackedResponse(TabletStateInfo.size()); + for (const auto& [tabletId, tabletInfo] : TabletStateInfo) { + ptr->TabletId = tabletInfo.tabletid(); + ptr->FollowerId = tabletInfo.followerid(); + ptr->Generation = tabletInfo.generation(); + ptr->Type = tabletInfo.type(); + ptr->State = tabletInfo.state(); + ++ptr; } - } else if (request.groupby() == "Type,State") { // the only supported group-by for now - std::unordered_map<std::pair<NKikimrTabletBase::TTabletTypes::EType, - NKikimrWhiteboard::TTabletStateInfo::ETabletState>, NKikimrWhiteboard::TTabletStateInfo> stateGroupBy; - for (const auto& [id, stateInfo] : TabletStateInfo) { - NKikimrWhiteboard::TTabletStateInfo& state = stateGroupBy[{stateInfo.type(), stateInfo.state()}]; - auto count = state.count(); - if (count == 0) { - state.set_type(stateInfo.type()); - state.set_state(stateInfo.state()); + } else { + if (request.groupby().empty()) { + ui64 changedSince = request.has_changedsince() ? request.changedsince() : 0; + if (request.filtertabletid_size() == 0) { + for (const auto& pr : TabletStateInfo) { + if (pr.second.changetime() >= changedSince) { + NKikimrWhiteboard::TTabletStateInfo& tabletStateInfo = *record.add_tabletstateinfo(); + CopyTabletStateInfo(tabletStateInfo, pr.second, request); + } + } + } else { + for (auto tabletId : request.filtertabletid()) { + auto it = TabletStateInfo.find({tabletId, 0}); + if (it != TabletStateInfo.end()) { + if (it->second.changetime() >= changedSince) { + NKikimrWhiteboard::TTabletStateInfo& tabletStateInfo = *record.add_tabletstateinfo(); + CopyTabletStateInfo(tabletStateInfo, it->second, request); + } + } + } + } + } else if (request.groupby() == "Type,State") { // the only supported group-by for now + std::unordered_map<std::pair<NKikimrTabletBase::TTabletTypes::EType, + NKikimrWhiteboard::TTabletStateInfo::ETabletState>, NKikimrWhiteboard::TTabletStateInfo> stateGroupBy; + for (const auto& [id, stateInfo] : TabletStateInfo) { + NKikimrWhiteboard::TTabletStateInfo& state = stateGroupBy[{stateInfo.type(), stateInfo.state()}]; + auto count = state.count(); + if (count == 0) { + state.set_type(stateInfo.type()); + state.set_state(stateInfo.state()); + } + state.set_count(count + 1); + } + for (auto& pr : stateGroupBy) { + NKikimrWhiteboard::TTabletStateInfo& tabletStateInfo = *record.add_tabletstateinfo(); + tabletStateInfo = std::move(pr.second); } - state.set_count(count + 1); - } - for (auto& pr : stateGroupBy) { - NKikimrWhiteboard::TTabletStateInfo& tabletStateInfo = *record.add_tabletstateinfo(); - tabletStateInfo = std::move(pr.second); } } response->Record.set_responsetime(ctx.Now().MilliSeconds()); diff --git a/ydb/core/viewer/json_bsgroupinfo.h b/ydb/core/viewer/json_bsgroupinfo.h index 7a95f443b7c..119207289f3 100644 --- a/ydb/core/viewer/json_bsgroupinfo.h +++ b/ydb/core/viewer/json_bsgroupinfo.h @@ -18,8 +18,8 @@ struct TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse> { static constexpr bool StaticNodesOnly = true; - static ::google::protobuf::RepeatedPtrField<TElementType>* GetElementsField(TResponseType* response) { - return response->Record.MutableBSGroupStateInfo(); + static ::google::protobuf::RepeatedPtrField<TElementType>& GetElementsField(TResponseType* response) { + return *response->Record.MutableBSGroupStateInfo(); } static ui32 GetElementKey(const TElementType& type) { diff --git a/ydb/core/viewer/json_cluster.h b/ydb/core/viewer/json_cluster.h index c7d15c25dec..b4ad8b64ceb 100644 --- a/ydb/core/viewer/json_cluster.h +++ b/ydb/core/viewer/json_cluster.h @@ -302,7 +302,7 @@ public: ui64 totalStorageSize = 0; ui64 availableStorageSize = 0; - for (auto& element : *TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetElementsField(MergedPDiskInfo.Get())) { + for (auto& element : TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetElementsField(MergedPDiskInfo.Get())) { if (element.HasTotalSize() && element.HasAvailableSize()) { totalStorageSize += element.GetTotalSize(); availableStorageSize += element.GetAvailableSize(); @@ -311,12 +311,12 @@ public: element.SetOverall(GetWhiteboardFlag(GetPDiskOverallFlag(element))); PDisksIndex.emplace(TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetElementKey(element), element); } - for (auto& element : *TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetElementsField(MergedVDiskInfo.Get())) { + for (auto& element : TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetElementsField(MergedVDiskInfo.Get())) { element.SetOverall(GetWhiteboardFlag(GetVDiskOverallFlag(element))); VDisksIndex.emplace(TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetElementKey(element), element); } NKikimrViewer::EFlag flag = NKikimrViewer::Grey; - for (const auto& element : *TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::GetElementsField(MergedBSGroupInfo.Get())) { + for (const auto& element : TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::GetElementsField(MergedBSGroupInfo.Get())) { flag = Max(flag, GetBSGroupOverallFlag(element, VDisksIndex, PDisksIndex)); } ui32 numberOfCpus = 0; diff --git a/ydb/core/viewer/json_counters.h b/ydb/core/viewer/json_counters.h index c56719c77d5..43a30b54a09 100644 --- a/ydb/core/viewer/json_counters.h +++ b/ydb/core/viewer/json_counters.h @@ -178,13 +178,13 @@ public: const TVector<const FieldDescriptor*>& groupFields) { THolder<ResponseType> groupedResponse = TWhiteboardGrouper<ResponseType>::GroupResponse(response, groupFields, true); - auto* stateInfo = TWhiteboardInfo<ResponseType>::GetElementsField(groupedResponse.Get()); + auto& stateInfo = TWhiteboardInfo<ResponseType>::GetElementsField(groupedResponse.Get()); TStringBuf host(nodeInfo.Host); size_t pos = host.find('.'); if (pos != TString::npos) { host = host.substr(0, pos); } - for (typename TWhiteboardInfo<ResponseType>::TElementType& info : *stateInfo) { + for (typename TWhiteboardInfo<ResponseType>::TElementType& info : stateInfo) { const Reflection& reflectionFrom = *info.GetReflection(); json << ",{\"labels\":{"; if (nodeInfo.NodeId != 0) { @@ -289,8 +289,8 @@ public: ++itPDiskInfo; if (itPDiskInfo != PDiskInfo.end() && itPDiskInfo->first == nodeInfo.NodeId && itPDiskInfo->second) { RenderStats(json, itPDiskInfo->second, nodeInfo); - auto* stateInfo = TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetElementsField(itPDiskInfo->second.Get()); - for (const typename TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::TElementType& info : *stateInfo) { + auto& stateInfo = TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetElementsField(itPDiskInfo->second.Get()); + for (const typename TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::TElementType& info : stateInfo) { if (info.GetTotalSize() > 0 && info.GetAvailableSize() > 0) { ++pDiskUserSpaceHistogram[std::min((info.GetTotalSize() - info.GetAvailableSize()) * pDiskUserSpaceHistogram.size() / info.GetTotalSize(), pDiskUserSpaceHistogram.size() - 1)]; } @@ -330,14 +330,14 @@ public: std::unordered_map<ui64, int> bsGroupGreenVDisks; std::unordered_map<ui64, int> bsGroupNotGreenVDisks; { - auto* stateInfo = TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::GetElementsField(mergedBSGroupInfo.Get()); - for (const typename TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::TElementType& info : *stateInfo) { + auto& stateInfo = TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::GetElementsField(mergedBSGroupInfo.Get()); + for (const typename TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::TElementType& info : stateInfo) { bsGroupVDisks[info.GetGroupID()] = info.VDiskIdsSize(); } } { - auto* stateInfo = TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetElementsField(mergedVDiskInfo.Get()); - for (const typename TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::TElementType& info : *stateInfo) { + auto& stateInfo = TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetElementsField(mergedVDiskInfo.Get()); + for (const typename TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::TElementType& info : stateInfo) { auto groupId = info.GetVDiskId().GetGroupID(); bsGroupVDisks[groupId]--; auto flag = GetVDiskOverallFlag(info); diff --git a/ydb/core/viewer/json_nodeinfo.h b/ydb/core/viewer/json_nodeinfo.h index 60245326fa6..acd2e64ecfa 100644 --- a/ydb/core/viewer/json_nodeinfo.h +++ b/ydb/core/viewer/json_nodeinfo.h @@ -18,8 +18,8 @@ struct TWhiteboardInfo<TEvWhiteboard::TEvNodeStateResponse> { static constexpr bool StaticNodesOnly = false; - static ::google::protobuf::RepeatedPtrField<TElementType>* GetElementsField(TResponseType* response) { - return response->Record.MutableNodeStateInfo(); + static ::google::protobuf::RepeatedPtrField<TElementType>& GetElementsField(TResponseType* response) { + return *response->Record.MutableNodeStateInfo(); } static const TString& GetElementKey(const TElementType& type) { diff --git a/ydb/core/viewer/json_pdiskinfo.h b/ydb/core/viewer/json_pdiskinfo.h index 881c108e513..f1062128438 100644 --- a/ydb/core/viewer/json_pdiskinfo.h +++ b/ydb/core/viewer/json_pdiskinfo.h @@ -17,8 +17,8 @@ struct TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse> { static constexpr bool StaticNodesOnly = true; - static ::google::protobuf::RepeatedPtrField<TElementType>* GetElementsField(TResponseType* response) { - return response->Record.MutablePDiskStateInfo(); + static ::google::protobuf::RepeatedPtrField<TElementType>& GetElementsField(TResponseType* response) { + return *response->Record.MutablePDiskStateInfo(); } static std::pair<ui32, ui32> GetElementKey(const TElementType& type) { diff --git a/ydb/core/viewer/json_storage.h b/ydb/core/viewer/json_storage.h index 140792ff1df..7df3bd871a6 100644 --- a/ydb/core/viewer/json_storage.h +++ b/ydb/core/viewer/json_storage.h @@ -510,7 +510,7 @@ public: MergedBSGroupInfo = MergeWhiteboardResponses(BSGroupInfo, TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::GetDefaultMergeField()); MergedVDiskInfo = MergeWhiteboardResponses(VDiskInfo, TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetDefaultMergeField()); MergedPDiskInfo = MergeWhiteboardResponses(PDiskInfo, TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetDefaultMergeField()); - for (auto& element : *TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetElementsField(MergedPDiskInfo.Get())) { + for (auto& element : TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetElementsField(MergedPDiskInfo.Get())) { element.SetStateFlag(GetWhiteboardFlag(GetPDiskStateFlag(element))); auto overall = NKikimrViewer::EFlag_Name(GetPDiskOverallFlag(element)); auto key = TWhiteboardInfo<TEvWhiteboard::TEvPDiskStateResponse>::GetElementKey(element); @@ -518,7 +518,7 @@ public: PDisksOverall.emplace(key, overall); PDisksIndex.emplace(key, element); } - for (auto& element : *TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetElementsField(MergedVDiskInfo.Get())) { + for (auto& element : TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetElementsField(MergedVDiskInfo.Get())) { auto overall = NKikimrViewer::EFlag_Name(GetVDiskOverallFlag(element)); auto key = TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse>::GetElementKey(element); element.ClearOverall(); @@ -533,7 +533,7 @@ public: VSlotsIndex.emplace(std::move(slotId), element); } } - for (auto& element : *TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::GetElementsField(MergedBSGroupInfo.Get())) { + for (auto& element : TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::GetElementsField(MergedBSGroupInfo.Get())) { auto state = GetBSGroupOverallState(element, VDisksIndex, PDisksIndex); auto key = ToString(TWhiteboardInfo<TEvWhiteboard::TEvBSGroupStateResponse>::GetElementKey(element)); if (state.MissingDisks > 0) { diff --git a/ydb/core/viewer/json_sysinfo.h b/ydb/core/viewer/json_sysinfo.h index 05a477c2df2..a279c48535a 100644 --- a/ydb/core/viewer/json_sysinfo.h +++ b/ydb/core/viewer/json_sysinfo.h @@ -39,8 +39,8 @@ struct TWhiteboardInfo<TEvWhiteboard::TEvSystemStateResponse> { static constexpr bool StaticNodesOnly = false; - static ::google::protobuf::RepeatedPtrField<TElementType>* GetElementsField(TResponseType* response) { - return response->Record.MutableSystemStateInfo(); + static ::google::protobuf::RepeatedPtrField<TElementType>& GetElementsField(TResponseType* response) { + return *response->Record.MutableSystemStateInfo(); } static TString GetDefaultMergeField() { diff --git a/ydb/core/viewer/json_tabletinfo.h b/ydb/core/viewer/json_tabletinfo.h index 3f273e8e95b..e972518dfda 100644 --- a/ydb/core/viewer/json_tabletinfo.h +++ b/ydb/core/viewer/json_tabletinfo.h @@ -9,24 +9,39 @@ #include <ydb/core/base/tablet_pipe.h> #include "json_pipe_req.h" #include "json_wb_req.h" +#include <span> namespace NKikimr { namespace NViewer { -template <> +template<> struct TWhiteboardInfo<TEvWhiteboard::TEvTabletStateResponse> { using TResponseType = TEvWhiteboard::TEvTabletStateResponse; using TElementType = NKikimrWhiteboard::TTabletStateInfo; + using TElementTypePacked5 = NNodeWhiteboard::TEvWhiteboard::TEvTabletStateResponsePacked5; using TElementKeyType = std::pair<ui64, ui32>; static constexpr bool StaticNodesOnly = false; - static ::google::protobuf::RepeatedPtrField<TElementType>* GetElementsField(TResponseType* response) { - return response->Record.MutableTabletStateInfo(); + static ::google::protobuf::RepeatedPtrField<TElementType>& GetElementsField(TResponseType* response) { + return *response->Record.MutableTabletStateInfo(); + } + + static std::span<const TElementTypePacked5> GetElementsFieldPacked5(TResponseType* response) { + const auto& packed5 = response->Record.GetPacked5(); + return std::span{reinterpret_cast<const TElementTypePacked5*>(packed5.data()), packed5.size() / sizeof(TElementTypePacked5)}; + } + + static size_t GetElementsCount(TResponseType* response) { + return response->Record.GetTabletStateInfo().size() + response->Record.GetPacked5().size() / sizeof(TElementTypePacked5); + } + + static TElementKeyType GetElementKey(const TElementType& type) { + return TElementKeyType(type.GetTabletId(), type.GetFollowerId()); } - static std::pair<ui64, ui32> GetElementKey(const TElementType& type) { - return std::pair<ui64, ui32>(type.GetTabletId(), type.GetFollowerId()); + static TElementKeyType GetElementKey(const TElementTypePacked5& type) { + return TElementKeyType(type.TabletId, type.FollowerId); } static TString GetDefaultMergeField() { @@ -35,9 +50,11 @@ struct TWhiteboardInfo<TEvWhiteboard::TEvTabletStateResponse> { static THolder<TResponseType> MergeResponses(TMap<ui32, THolder<TResponseType>>& responses, const TString& fields = GetDefaultMergeField()) { if (fields == GetDefaultMergeField()) { - return TWhiteboardMerger<TResponseType>::MergeResponsesElementKey(responses); + TStaticMergeKey<TResponseType> mergeKey; + return TWhiteboardMerger<TResponseType>::MergeResponsesBaseHybrid(responses, mergeKey); } else { - return TWhiteboardMerger<TResponseType>::MergeResponses(responses, fields); + TWhiteboardMerger<TResponseType>::TDynamicMergeKey mergeKey(fields); + return TWhiteboardMerger<TResponseType>::MergeResponsesBase(responses, mergeKey); } } }; @@ -49,6 +66,13 @@ struct TWhiteboardMergerComparator<NKikimrWhiteboard::TTabletStateInfo> { } }; +template <> +struct TWhiteboardMergerComparator<NNodeWhiteboard::TEvWhiteboard::TEvTabletStateResponsePacked5> { + bool operator ()(const NNodeWhiteboard::TEvWhiteboard::TEvTabletStateResponsePacked5& a, const NNodeWhiteboard::TEvWhiteboard::TEvTabletStateResponsePacked5& b) const { + return a.Generation < b.Generation; + } +}; + class TJsonTabletInfo : public TJsonWhiteboardRequest<TEvWhiteboard::TEvTabletStateRequest, TEvWhiteboard::TEvTabletStateResponse> { static const bool WithRetry = false; using TBase = TJsonWhiteboardRequest<TEvWhiteboard::TEvTabletStateRequest, TEvWhiteboard::TEvTabletStateResponse>; @@ -57,7 +81,10 @@ class TJsonTabletInfo : public TJsonWhiteboardRequest<TEvWhiteboard::TEvTabletSt public: TJsonTabletInfo(IViewer *viewer, NMon::TEvHttpInfo::TPtr &ev) : TJsonWhiteboardRequest(viewer, ev) - {} + { + static TString prefix = "json/tabletinfo "; + LogPrefix = prefix; + } static NTabletPipe::TClientConfig InitPipeClientConfig() { NTabletPipe::TClientConfig clientConfig; @@ -73,6 +100,7 @@ public: } void Bootstrap() override { + BLOG_TRACE("Bootstrap()"); const auto& params(Event->Get()->Request.GetParams()); Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 10000); if (params.Has("path")) { diff --git a/ydb/core/viewer/json_tenantinfo.h b/ydb/core/viewer/json_tenantinfo.h index 9a27f8e9682..9c506e78db6 100644 --- a/ydb/core/viewer/json_tenantinfo.h +++ b/ydb/core/viewer/json_tenantinfo.h @@ -15,6 +15,7 @@ #include "json_pipe_req.h" #include "wb_aggregate.h" #include "wb_merge.h" +#include "log.h" namespace NKikimr { namespace NViewer { @@ -31,6 +32,7 @@ class TJsonTenantInfo : public TViewerPipeClient<TJsonTenantInfo> { THashMap<TTabletId, THolder<TEvHive::TEvResponseHiveStorageStats>> HiveStorageStats; NMon::TEvHttpInfo::TPtr Event; THashSet<TNodeId> NodeIds; + THashSet<TNodeId> NodeIdsForTablets; TMap<TNodeId, THolder<TEvWhiteboard::TEvSystemStateResponse>> NodeSysInfo; TMap<TNodeId, THolder<TEvWhiteboard::TEvTabletStateResponse>> NodeTabletInfo; TJsonSettings JsonSettings; @@ -54,11 +56,17 @@ public: , Event(ev) {} + TString GetLogPrefix() { + static TString prefix = "json/tenantinfo "; + return prefix; + } + TString GetDomainId(TPathId pathId) { return TStringBuilder() << pathId.OwnerId << '-' << pathId.LocalPathId; } void Bootstrap() { + BLOG_TRACE("Bootstrap()"); const auto& params(Event->Get()->Request.GetParams()); JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), false); @@ -103,8 +111,12 @@ public: void PassAway() override { for (const TNodeId nodeId : NodeIds) { Send(TActivationContext::InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe()); - }; + } + for (const TNodeId nodeId : NodeIdsForTablets) { + Send(TActivationContext::InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe()); + } TBase::PassAway(); + BLOG_TRACE("PassAway()"); } STATEFN(StateRequested) { @@ -124,6 +136,7 @@ public: } void Handle(NConsole::TEvConsole::TEvListTenantsResponse::TPtr& ev) { + BLOG_TRACE("Received ListTenantsResponse"); Ydb::Cms::ListDatabasesResult listTenantsResult; ev->Get()->Record.GetResponse().operation().result().UnpackTo(&listTenantsResult); for (const TString& path : listTenantsResult.paths()) { @@ -137,6 +150,7 @@ public: } void Handle(NConsole::TEvConsole::TEvGetTenantStatusResponse::TPtr& ev) { + BLOG_TRACE("Received GetTenantStatusResponse"); Ydb::Cms::GetDatabaseStatusResult getTenantStatusResult; ev->Get()->Record.GetResponse().operation().result().UnpackTo(&getTenantStatusResult); TString path = getTenantStatusResult.path(); @@ -192,15 +206,20 @@ public: tenant.SetAliveNodes(hiveStat.GetAliveNodes()); } } + + BLOG_TRACE("Received HiveDomainStats for " << tenant.GetId() << " from " << ev->Cookie); + for (TNodeId nodeId : hiveStat.GetNodeIds()) { + TActorId whiteboardServiceId = MakeNodeWhiteboardServiceId(nodeId); if (NodeIds.insert(nodeId).second) { - TActorId whiteboardServiceId = MakeNodeWhiteboardServiceId(nodeId); THolder<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest> request = MakeHolder<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest>(); SendRequest(whiteboardServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, nodeId); - if (Tablets) { - THolder<NNodeWhiteboard::TEvWhiteboard::TEvTabletStateRequest> request = MakeHolder<NNodeWhiteboard::TEvWhiteboard::TEvTabletStateRequest>(); - SendRequest(whiteboardServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, nodeId); - } + } + if (Tablets && NodeIdsForTablets.insert(nodeId).second) { + THolder<NNodeWhiteboard::TEvWhiteboard::TEvTabletStateRequest> request = MakeHolder<NNodeWhiteboard::TEvWhiteboard::TEvTabletStateRequest>(); + request->Record.SetFormat("packed5"); + BLOG_TRACE("Tenant " << tenant.GetId() << " send to " << nodeId << " TEvTabletStateRequest: " << request->Record.ShortDebugString()); + SendRequest(whiteboardServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, nodeId); } } } @@ -209,6 +228,7 @@ public: } void Handle(TEvHive::TEvResponseHiveStorageStats::TPtr& ev) { + BLOG_TRACE("Received HiveStorageStats from " << ev->Cookie); HiveStorageStats[ev->Cookie] = std::move(ev->Release()); RequestDone(); } @@ -235,6 +255,7 @@ public: } TString id = GetDomainId(domainInfo->DomainKey); TString path = CanonizePath(ev->Get()->Request->ResultSet.begin()->Path); + BLOG_TRACE("Received Navigate for " << id << " " << path); tenant.SetId(id); tenant.SetName(path); if (tenant.GetType() == NKikimrViewer::UnknownTenantType) { @@ -247,18 +268,21 @@ public: void Handle(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) { ui32 nodeId = ev.Get()->Cookie; + BLOG_TRACE("Received TEvSystemStateResponse from " << nodeId); NodeSysInfo[nodeId] = ev->Release(); RequestDone(); } void Handle(NNodeWhiteboard::TEvWhiteboard::TEvTabletStateResponse::TPtr& ev) { ui32 nodeId = ev.Get()->Cookie; + BLOG_TRACE("Received TEvTabletStateResponse from " << nodeId << " with " << ev->Get()->Record.TabletStateInfoSize() << " tablets"); NodeTabletInfo[nodeId] = ev->Release(); RequestDone(); } void Undelivered(TEvents::TEvUndelivered::TPtr &ev) { ui32 nodeId = ev.Get()->Cookie; + BLOG_TRACE("Undelivered for node " << nodeId << " event " << ev->Get()->SourceType); if (ev->Get()->SourceType == NNodeWhiteboard::TEvWhiteboard::EvSystemStateRequest) { if (NodeSysInfo.emplace(nodeId, nullptr).second) { RequestDone(); @@ -273,6 +297,7 @@ public: void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { ui32 nodeId = ev->Get()->NodeId; + BLOG_TRACE("NodeDisconnected for node " << nodeId); if (NodeSysInfo.emplace(nodeId, nullptr).second) { RequestDone(); } @@ -282,13 +307,14 @@ public: } void ReplyAndPassAway() { + BLOG_TRACE("ReplyAndPassAway() started"); TIntrusivePtr<TDomainsInfo> domains = AppData()->DomainsInfo; TIntrusivePtr<TDomainsInfo::TDomain> domain = domains->Domains.begin()->second; THolder<TEvWhiteboard::TEvTabletStateResponse> tabletInfo; THashMap<TTabletId, const NKikimrWhiteboard::TTabletStateInfo*> tabletInfoIndex; if (Tablets) { - tabletInfo = MergeWhiteboardResponses(NodeTabletInfo); - for (const auto& info : tabletInfo->Record.GetTabletStateInfo()) { + tabletInfo = TWhiteboardInfo<NNodeWhiteboard::TEvWhiteboard::TEvTabletStateResponse>::MergeResponses(NodeTabletInfo); + for (const auto& info : TWhiteboardInfo<NNodeWhiteboard::TEvWhiteboard::TEvTabletStateResponse>::GetElementsField(tabletInfo.Get())) { tabletInfoIndex[info.GetTabletId()] = &info; } } @@ -380,10 +406,12 @@ public: uint64 storageMinAvailableSize = std::numeric_limits<ui64>::max(); uint64 storageGroups = 0; for (const NKikimrHive::THiveStoragePoolStats& poolStat : record.GetPools()) { - for (const NKikimrHive::THiveStorageGroupStats& groupStat : poolStat.GetGroups()) { - storageAllocatedSize += groupStat.GetAllocatedSize(); - storageMinAvailableSize = std::min(storageMinAvailableSize, groupStat.GetAvailableSize()); - ++storageGroups; + if (poolStat.GetName().StartsWith(tenantBySubDomainKey.GetName())) { + for (const NKikimrHive::THiveStorageGroupStats& groupStat : poolStat.GetGroups()) { + storageAllocatedSize += groupStat.GetAllocatedSize(); + storageMinAvailableSize = std::min(storageMinAvailableSize, groupStat.GetAvailableSize()); + ++storageGroups; + } } } tenant.SetStorageAllocatedSize(storageAllocatedSize); @@ -504,6 +532,7 @@ public: } void HandleTimeout() { + BLOG_TRACE("Timeout occurred"); Result.AddErrors("Timeout occurred"); ReplyAndPassAway(); } diff --git a/ydb/core/viewer/json_vdiskinfo.h b/ydb/core/viewer/json_vdiskinfo.h index db681f044b5..5c015a5258c 100644 --- a/ydb/core/viewer/json_vdiskinfo.h +++ b/ydb/core/viewer/json_vdiskinfo.h @@ -54,8 +54,8 @@ struct TWhiteboardInfo<TEvWhiteboard::TEvVDiskStateResponse> { static constexpr bool StaticNodesOnly = true; - static ::google::protobuf::RepeatedPtrField<TElementType>* GetElementsField(TResponseType* response) { - return response->Record.MutableVDiskStateInfo(); + static ::google::protobuf::RepeatedPtrField<TElementType>& GetElementsField(TResponseType* response) { + return *response->Record.MutableVDiskStateInfo(); } static const NKikimrBlobStorage::TVDiskID& GetElementKey(const TElementType& type) { diff --git a/ydb/core/viewer/json_wb_req.h b/ydb/core/viewer/json_wb_req.h index a0f0d9cdda8..9773f951454 100644 --- a/ydb/core/viewer/json_wb_req.h +++ b/ydb/core/viewer/json_wb_req.h @@ -11,6 +11,7 @@ #include "wb_merge.h" #include "wb_group.h" #include "wb_filter.h" +#include "log.h" namespace NKikimr { namespace NViewer { @@ -60,12 +61,18 @@ protected: std::unordered_map<TNodeId, ui32> NodeRetries; bool StaticNodesOnly = TWhiteboardInfo<ResponseType>::StaticNodesOnly; TDuration RetryPeriod = TDuration::MilliSeconds(500); + TString LogPrefix; + TString Format; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::VIEWER_HANDLER; } + TString GetLogPrefix() { + return LogPrefix; + } + TJsonWhiteboardRequest(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) : Viewer(viewer) , Initiator(ev->Sender) @@ -75,6 +82,10 @@ public: THolder<RequestType> BuildRequest(TNodeId nodeId) { Y_UNUSED(nodeId); THolder<RequestType> request = MakeHolder<RequestType>(); + constexpr bool hasFormat = requires(const RequestType* r) {r->Record.GetFormat();}; + if constexpr (hasFormat) { + request->Record.SetFormat(Format); + } if (ChangedSince != 0) { request->Record.SetChangedSince(ChangedSince); } @@ -128,6 +139,7 @@ public: Retries = FromStringWithDefault<ui32>(params.Get("retries"), 0); RetryPeriod = TDuration::MilliSeconds(FromStringWithDefault<ui32>(params.Get("retry_period"), RetryPeriod.MilliSeconds())); StaticNodesOnly = FromStringWithDefault<bool>(params.Get("static"), StaticNodesOnly); + Format = params.Get("format"); if (FilterNodeIds.empty()) { if (AliveOnly) { static const TActorId whiteboardServiceId = MakeNodeWhiteboardServiceId(TBase::SelfId().NodeId()); @@ -264,14 +276,35 @@ public: } } - template <typename ResponseRecordType> - void UpdateDuration(ResponseRecordType& record) { + template<typename ResponseRecordType> + TString GetResponseDuration(ResponseRecordType& record) { + constexpr bool hasResponseDuration = requires(const ResponseRecordType& r) {r.GetResponseDuration();}; + if constexpr (hasResponseDuration) { + return TStringBuilder() << " ResponseDuration: " << record.GetResponseDuration() << "us"; + } else { + return {}; + } + } + + template<typename ResponseRecordType> + TString GetProcessDuration(ResponseRecordType& record) { + constexpr bool hasProcessDuration = requires(const ResponseRecordType& r) {r.GetProcessDuration();}; + if constexpr (hasProcessDuration) { + return TStringBuilder() << " ProcessDuration: " << record.GetProcessDuration() << "us"; + } else { + return {}; + } + } + + template<typename ResponseRecordType> + void OnRecordReceived(ResponseRecordType& record, TNodeId nodeId) { record.SetResponseDuration((AppData()->TimeProvider->Now() - NodesRequestedTime).MicroSeconds()); + BLOG_TRACE("Received " << typeid(ResponseType).name() << " from " << nodeId << GetResponseDuration(record) << GetProcessDuration(record)); } void HandleNodeInfo(typename ResponseType::TPtr& ev) { - UpdateDuration(ev->Get()->Record); ui64 nodeId = ev.Get()->Cookie; + OnRecordReceived(ev->Get()->Record, nodeId); PerNodeStateInfo[nodeId] = ev->Release(); NodeErrors.erase(nodeId); TBase::RequestDone(); diff --git a/ydb/core/viewer/log.h b/ydb/core/viewer/log.h new file mode 100644 index 00000000000..8dce2ce5729 --- /dev/null +++ b/ydb/core/viewer/log.h @@ -0,0 +1,23 @@ +#pragma once + +#include <ydb/core/protos/services.pb.h> +#include <library/cpp/actors/core/log.h> + +namespace NKikimr { +namespace NViewer { + +inline TString GetLogPrefix() { + return {}; +} + +} +} + +#define BLOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::VIEWER, GetLogPrefix() << stream) +#define BLOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::VIEWER, GetLogPrefix() << stream) +#define BLOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::VIEWER, GetLogPrefix() << stream) +#define BLOG_NOTICE(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::VIEWER, GetLogPrefix() << stream) +#define BLOG_ERROR(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::VIEWER, GetLogPrefix() << stream) +#define BLOG_CRIT(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::VIEWER, GetLogPrefix() << stream) +#define BLOG_TRACE(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::VIEWER, GetLogPrefix() << stream) +#define Y_ENSURE_LOG(cond, stream) if (!(cond)) { BLOG_ERROR("Failed condition \"" << #cond << "\" " << stream); } diff --git a/ydb/core/viewer/viewer_ut.cpp b/ydb/core/viewer/viewer_ut.cpp index e85cb3c542e..d114b467d9b 100644 --- a/ydb/core/viewer/viewer_ut.cpp +++ b/ydb/core/viewer/viewer_ut.cpp @@ -21,25 +21,77 @@ using namespace NKikimrWhiteboard; Y_UNIT_TEST_SUITE(Viewer) { Y_UNIT_TEST(TabletMerging) { - TMap<ui32, THolder<TEvWhiteboard::TEvTabletStateResponse>> nodesData; - for (ui32 nodeId = 1; nodeId <= 1000; ++nodeId) { - THolder<TEvWhiteboard::TEvTabletStateResponse>& nodeData = nodesData[nodeId] = MakeHolder<TEvWhiteboard::TEvTabletStateResponse>(); - nodeData->Record.MutableTabletStateInfo()->Reserve(10000); - for (ui32 tabletId = 1; tabletId <= 10000; ++tabletId) { - NKikimrWhiteboard::TTabletStateInfo* tabletData = nodeData->Record.AddTabletStateInfo(); - tabletData->SetTabletId(tabletId); - tabletData->SetLeader(true); - tabletData->SetGeneration(13); - tabletData->SetChangeTime(TInstant::Now().MilliSeconds()); + THPTimer timer; + { + TMap<ui32, TString> nodesBlob; + timer.Reset(); + for (ui32 nodeId = 1; nodeId <= 10000; ++nodeId) { + THolder<TEvWhiteboard::TEvTabletStateResponse> nodeData = MakeHolder<TEvWhiteboard::TEvTabletStateResponse>(); + nodeData->Record.MutableTabletStateInfo()->Reserve(10000); + for (ui32 tabletId = 1; tabletId <= 10000; ++tabletId) { + NKikimrWhiteboard::TTabletStateInfo* tabletData = nodeData->Record.AddTabletStateInfo(); + tabletData->SetTabletId(tabletId); + tabletData->SetLeader(true); + tabletData->SetGeneration(13); + tabletData->SetChangeTime(TInstant::Now().MilliSeconds()); + tabletData->MutableTenantId()->SetSchemeShard(8); + tabletData->MutableTenantId()->SetPathId(14); + tabletData->MutableChannelGroupIDs()->Add(9); + tabletData->MutableChannelGroupIDs()->Add(10); + tabletData->MutableChannelGroupIDs()->Add(11); + } + nodesBlob[nodeId] = nodeData->Record.SerializeAsString(); + } + Ctest << "Build = " << timer.Passed() << Endl; + timer.Reset(); + TMap<ui32, THolder<TEvWhiteboard::TEvTabletStateResponse>> nodesData; + for (const auto& [nodeId, nodeBlob] : nodesBlob) { + THolder<TEvWhiteboard::TEvTabletStateResponse> nodeData = MakeHolder<TEvWhiteboard::TEvTabletStateResponse>(); + bool res = nodeData->Record.ParseFromString(nodesBlob[nodeId]); + Y_UNUSED(res); + nodesData[nodeId] = std::move(nodeData); } + THolder<TEvWhiteboard::TEvTabletStateResponse> result = MergeWhiteboardResponses(nodesData); + Ctest << "Merge = " << timer.Passed() << Endl; + UNIT_ASSERT_LT(timer.Passed(), 30); + UNIT_ASSERT_VALUES_EQUAL(result->Record.TabletStateInfoSize(), 10000); + timer.Reset(); } - Ctest << "Data has built" << Endl; + Ctest << "Destroy = " << timer.Passed() << Endl; + } + + Y_UNIT_TEST(TabletMergingPacked) { THPTimer timer; - THolder<TEvWhiteboard::TEvTabletStateResponse> result = MergeWhiteboardResponses(nodesData); - Ctest << "Merge = " << timer.Passed() << Endl; - UNIT_ASSERT_LT(timer.Passed(), 10); - UNIT_ASSERT_VALUES_EQUAL(result->Record.TabletStateInfoSize(), 10000); - Ctest << "Data has merged" << Endl; + { + TMap<ui32, TString> nodesBlob; + timer.Reset(); + for (ui32 nodeId = 1; nodeId <= 10000; ++nodeId) { + THolder<TEvWhiteboard::TEvTabletStateResponse> nodeData = MakeHolder<TEvWhiteboard::TEvTabletStateResponse>(); + auto* tabletData = nodeData->AllocatePackedResponse(10000); + for (ui32 tabletId = 1; tabletId <= 10000; ++tabletId) { + tabletData->TabletId = tabletId; + tabletData->FollowerId = 0; + tabletData->Generation = 13; + //tabletData->SetChangeTime(TInstant::Now().MilliSeconds()); + ++tabletData; + } + nodesBlob[nodeId] = nodeData->Record.SerializeAsString(); + } + Ctest << "Build = " << timer.Passed() << Endl; + TMap<ui32, THolder<TEvWhiteboard::TEvTabletStateResponse>> nodesData; + for (const auto& [nodeId, nodeBlob] : nodesBlob) { + THolder<TEvWhiteboard::TEvTabletStateResponse> nodeData = MakeHolder<TEvWhiteboard::TEvTabletStateResponse>(); + bool res = nodeData->Record.ParseFromString(nodesBlob[nodeId]); + Y_UNUSED(res); + nodesData[nodeId] = std::move(nodeData); + } + THolder<TEvWhiteboard::TEvTabletStateResponse> result = MergeWhiteboardResponses(nodesData); + Ctest << "Merge = " << timer.Passed() << Endl; + UNIT_ASSERT_LT(timer.Passed(), 10); + UNIT_ASSERT_VALUES_EQUAL(result->Record.TabletStateInfoSize(), 10000); + timer.Reset(); + } + Ctest << "Destroy = " << timer.Passed() << Endl; } Y_UNIT_TEST(VDiskMerging) { diff --git a/ydb/core/viewer/wb_filter.h b/ydb/core/viewer/wb_filter.h index 99c279b4fa1..a09ec3571fd 100644 --- a/ydb/core/viewer/wb_filter.h +++ b/ydb/core/viewer/wb_filter.h @@ -230,10 +230,10 @@ public: static THolder<ResponseType> FilterResponse(THolder<TResponseType>& source, const TVector<THolder<IFieldProtoFilter>>& filters) { THolder<TResponseType> result = MakeHolder<TResponseType>(); - auto* field = TWhiteboardInfo<ResponseType>::GetElementsField(result.Get()); - auto* sourceField = TWhiteboardInfo<ResponseType>::GetElementsField(source.Get()); - field->Reserve(sourceField->size()); - for (TElementType& info : *sourceField) { + auto& field = TWhiteboardInfo<ResponseType>::GetElementsField(result.Get()); + auto& sourceField = TWhiteboardInfo<ResponseType>::GetElementsField(source.Get()); + field.Reserve(sourceField.size()); + for (TElementType& info : sourceField) { size_t cnt = 0; for (const THolder<IFieldProtoFilter>& filter : filters) { if (!filter->CheckFilter(info)) @@ -242,7 +242,7 @@ public: } if (cnt == filters.size()) { // TODO: swap already allocated element of repeatedptr field - auto* element = field->Add(); + auto* element = field.Add(); element->Swap(&info); } } diff --git a/ydb/core/viewer/wb_group.h b/ydb/core/viewer/wb_group.h index 9735d8f29ec..32357ed99f4 100644 --- a/ydb/core/viewer/wb_group.h +++ b/ydb/core/viewer/wb_group.h @@ -244,22 +244,22 @@ public: static THolder<ResponseType> GroupResponse(THolder<TResponseType>& source, const TVector<const FieldDescriptor*>& groupFields, bool allEnums = false) { THolder<TResponseType> result = MakeHolder<TResponseType>(); - TElementsFieldType* field = TWhiteboardInfo<ResponseType>::GetElementsField(result.Get()); + TElementsFieldType& field = TWhiteboardInfo<ResponseType>::GetElementsField(result.Get()); bool allKeys = allEnums && IsEnum(groupFields); TMap<TPartProtoKey, ui32> counters; TMap<TPartProtoKey, TElementType*> elements; if (allKeys) { TPartProtoKeyEnum keyEnum(groupFields); do { - auto* element = field->Add(); + auto* element = field.Add(); TPartProtoKey key(*element, groupFields); key = keyEnum; element->SetCount(0); elements.emplace(key, element); } while (++keyEnum); } - auto* sourceField = TWhiteboardInfo<ResponseType>::GetElementsField(source.Get()); - for (TElementType& info : *sourceField) { + auto& sourceField = TWhiteboardInfo<ResponseType>::GetElementsField(source.Get()); + for (TElementType& info : sourceField) { TPartProtoKey key(info, groupFields); if (key.Exists()) { counters[key]++; @@ -270,7 +270,7 @@ public: if (allKeys) { elements[pr.first]->SetCount(pr.second); } else { - auto* element = field->Add(); + auto* element = field.Add(); TPartProtoKey(*element, groupFields) = pr.first; element->SetCount(pr.second); } diff --git a/ydb/core/viewer/wb_merge.h b/ydb/core/viewer/wb_merge.h index ca8e269e7c8..40a5b247c70 100644 --- a/ydb/core/viewer/wb_merge.h +++ b/ydb/core/viewer/wb_merge.h @@ -74,6 +74,16 @@ public: static void ProtoMerge(::google::protobuf::Message& protoTo, const ::google::protobuf::Message& protoFrom); }; +template<typename ResponseType> +struct TStaticMergeKey { + using KeyType = typename TWhiteboardInfo<ResponseType>::TElementKeyType; + + template<typename ElementType> + KeyType GetKey(const ElementType& info) const { + return TWhiteboardInfo<ResponseType>::GetElementKey(info); + } +}; + template <typename ResponseType> class TWhiteboardMerger : public TWhiteboardMergerBase { public: @@ -138,16 +148,106 @@ public: } }; - template <typename ElementKeyType> - struct TStaticMergeKey { - using KeyType = ElementKeyType; + template<typename MergeKey> + static THolder<TResponseType> MergeResponsesBaseHybrid(TMap<ui32, THolder<TResponseType>>& responses, const MergeKey& mergeKey) { + using TElementType = typename TWhiteboardInfo<ResponseType>::TElementType; + using TElementTypePacked5 = typename TWhiteboardInfo<ResponseType>::TElementTypePacked5; + + std::unordered_map<typename MergeKey::KeyType, TElementType*> mergedData; + + struct TPackedDataCtx { + const TElementTypePacked5* Element; + ui32 NodeId; + }; + + std::unordered_map<typename MergeKey::KeyType, TPackedDataCtx> mergedDataPacked5; - ElementKeyType GetKey(TElementType& info) const { - return TWhiteboardInfo<ResponseType>::GetElementKey(info); + size_t projectedSize = 0; + for (auto it = responses.begin(); it != responses.end(); ++it) { + if (it->second != nullptr) { + projectedSize += TWhiteboardInfo<ResponseType>::GetElementsCount(it->second.Get()); + } } - }; + mergedData.reserve(projectedSize); + mergedDataPacked5.reserve(projectedSize); + + ui64 minResponseTime = 0; + ui64 maxResponseDuration = 0; + ui64 sumProcessDuration = 0; + + for (auto it = responses.begin(); it != responses.end(); ++it) { + if (it->second != nullptr) { + { + TWhiteboardMergerComparator<TElementType> comparator; + auto& stateInfo = TWhiteboardInfo<ResponseType>::GetElementsField(it->second.Get()); + for (TElementType& info : stateInfo) { + if (!info.HasNodeId()) { + info.SetNodeId(it->first); + } + auto key = mergeKey.GetKey(info); + auto inserted = mergedData.emplace(key, &info); + if (!inserted.second) { + if (comparator(*inserted.first->second, info)) { + inserted.first->second = &info; + } + } + } + } + { + TWhiteboardMergerComparator<TElementTypePacked5> comparator; + auto stateInfo = TWhiteboardInfo<ResponseType>::GetElementsFieldPacked5(it->second.Get()); + for (auto& info : stateInfo) { + auto key = mergeKey.GetKey(info); + auto inserted = mergedDataPacked5.emplace(key, TPackedDataCtx{ + .Element = &info, + .NodeId = it->first + }); + if (!inserted.second) { + if (comparator(*inserted.first->second.Element, info)) { + inserted.first->second = { + .Element = &info, + .NodeId = it->first + }; + } + } + } + } + if (minResponseTime == 0 || it->second->Record.GetResponseTime() < minResponseTime) { + minResponseTime = it->second->Record.GetResponseTime(); + } + if (maxResponseDuration == 0 || it->second->Record.GetResponseDuration() > maxResponseDuration) { + maxResponseDuration = it->second->Record.GetResponseDuration(); + } + sumProcessDuration += it->second->Record.GetProcessDuration(); + } + } + + THolder<TResponseType> result = MakeHolder<TResponseType>(); + auto& field = TWhiteboardInfo<ResponseType>::GetElementsField(result.Get()); + field.Reserve(mergedData.size() + mergedDataPacked5.size()); + for (auto it = mergedDataPacked5.begin(); it != mergedDataPacked5.end(); ++it) { + auto* element = field.Add(); + it->second.Element->Fill(*element); + element->SetNodeId(it->second.NodeId); + mergedData.erase(it->first); + } + for (auto it = mergedData.begin(); it != mergedData.end(); ++it) { + auto* element = field.Add(); + element->Swap(it->second); + } + if (minResponseTime) { + result->Record.SetResponseTime(minResponseTime); + } + if (maxResponseDuration) { + result->Record.SetResponseDuration(maxResponseDuration); + } + if (sumProcessDuration) { + result->Record.SetProcessDuration(sumProcessDuration); + } + return result; + } - template <typename MergeKey> + template<typename MergeKey> static THolder<TResponseType> MergeResponsesBase(TMap<ui32, THolder<TResponseType>>& responses, const MergeKey& mergeKey) { std::unordered_map<typename MergeKey::KeyType, TElementType*> mergedData; ui64 minResponseTime = 0; @@ -156,8 +256,8 @@ public: TWhiteboardMergerComparator<TElementType> comparator; for (auto it = responses.begin(); it != responses.end(); ++it) { if (it->second != nullptr) { - auto* stateInfo = TWhiteboardInfo<ResponseType>::GetElementsField(it->second.Get()); - for (TElementType& info : *stateInfo) { + auto& stateInfo = TWhiteboardInfo<ResponseType>::GetElementsField(it->second.Get()); + for (TElementType& info : stateInfo) { if (!info.HasNodeId()) { info.SetNodeId(it->first); } @@ -180,10 +280,10 @@ public: } THolder<TResponseType> result = MakeHolder<TResponseType>(); - auto* field = TWhiteboardInfo<ResponseType>::GetElementsField(result.Get()); - field->Reserve(mergedData.size()); + auto& field = TWhiteboardInfo<ResponseType>::GetElementsField(result.Get()); + field.Reserve(mergedData.size()); for (auto it = mergedData.begin(); it != mergedData.end(); ++it) { - auto* element = field->Add(); + auto* element = field.Add(); element->Swap(it->second); } if (minResponseTime) { @@ -199,7 +299,7 @@ public: } static THolder<TResponseType> MergeResponsesElementKey(TMap<ui32, THolder<TResponseType>>& responses) { - TStaticMergeKey<typename TWhiteboardInfo<ResponseType>::TElementKeyType> mergeKey; + TStaticMergeKey<ResponseType> mergeKey; return MergeResponsesBase(responses, mergeKey); } |