diff options
author | Aleksandr Dmitriev <alexd.65536@gmail.com> | 2022-07-05 21:00:40 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-07-05 21:00:40 +0300 |
commit | 014d4040c4b1be95e8260f823efcdae39cbbd439 (patch) | |
tree | 7d9d0629f782f1d3baabe5679bcdf495c7e6d0d1 | |
parent | 3337ada763695431c2af0c1f1105b608bbcd8c01 (diff) | |
download | ydb-014d4040c4b1be95e8260f823efcdae39cbbd439.tar.gz |
top partition tables KIKIMR-15061
REVIEW: 2671355
REVIEW: 2692125
x-ydb-stable-ref: be6405bab36a14ec899951f200cfd7f0e30ace03
35 files changed, 1361 insertions, 377 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h index ad781ba56c..3b336373c3 100644 --- a/ydb/core/base/appdata.h +++ b/ydb/core/base/appdata.h @@ -166,6 +166,7 @@ struct TAppData { bool EnableKqpSpilling = false; bool AllowShadowDataInSchemeShardForTests = false; bool EnableMvccSnapshotWithLegacyDomainRoot = false; + bool UsePartitionStatsCollectorForTests = false; TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks TVector<TString> DefaultUserSIDs; TString AllAuthenticatedUsers; diff --git a/ydb/core/protos/counters_sysview_processor.proto b/ydb/core/protos/counters_sysview_processor.proto index 63c03cb653..c2077a6d53 100644 --- a/ydb/core/protos/counters_sysview_processor.proto +++ b/ydb/core/protos/counters_sysview_processor.proto @@ -14,4 +14,5 @@ enum ETxTypes { TXTYPE_AGGREGATE = 4 [(TxTypeOpts) = {Name: "TxAggregate"}]; TXTYPE_INTERVAL_SUMMARY = 5 [(TxTypeOpts) = {Name: "TxIntervalSummary"}]; TXTYPE_INTERVAL_METRICS = 6 [(TxTypeOpts) = {Name: "TxIntervalMetrics"}]; + TXTYPE_TOP_PARTITIONS = 7 [(TxTypeOpts) = {Name: "TxTopPartitions"}]; } diff --git a/ydb/core/protos/sys_view.proto b/ydb/core/protos/sys_view.proto index 4d495d6b60..a3ff84b707 100644 --- a/ydb/core/protos/sys_view.proto +++ b/ydb/core/protos/sys_view.proto @@ -123,6 +123,8 @@ enum EStatsType { METRICS_ONE_HOUR = 8; TOP_REQUEST_UNITS_ONE_MINUTE = 9; TOP_REQUEST_UNITS_ONE_HOUR = 10; + TOP_PARTITIONS_ONE_MINUTE = 11; + TOP_PARTITIONS_ONE_HOUR = 12; } message TEvGetQueryStats { @@ -550,3 +552,51 @@ message TEvSendDbCountersResponse { optional uint64 Generation = 2; // confirmed generation } +// ---- Top partitions + +message TTopPartitionsKey { + optional uint64 IntervalEndUs = 1; + optional uint64 Rank = 2; +} + +message TTopPartitionsInfo { + optional fixed64 TabletId = 1; + optional string Path = 2; + optional uint64 PeakTimeUs = 3; + optional double CPUCores = 4; + optional uint32 NodeId = 5; + optional uint64 DataSize = 6; + optional uint64 RowCount = 7; + optional uint64 IndexSize = 8; + optional uint32 InFlightTxCount = 9; +} + +message TTopPartitionsEntry { + optional TTopPartitionsKey Key = 1; + optional TTopPartitionsInfo Info = 2; +} + +message TEvGetTopPartitionsRequest { + optional TTopPartitionsKey From = 1; + optional bool InclusiveFrom = 2; + + optional TTopPartitionsKey To = 3; + optional bool InclusiveTo = 4; + + optional EStatsType Type = 5; +} + +message TEvGetTopPartitionsResponse { + repeated TTopPartitionsEntry Entries = 1; + optional TTopPartitionsKey Next = 2; + optional bool LastBatch = 3; + optional bool Overloaded = 4; +} + +// partitions stats collector -> SVP +message TEvSendTopPartitions { + repeated TTopPartitionsInfo Partitions = 1; + optional uint64 TimeUs = 2; +} + + diff --git a/ydb/core/sys_view/common/common.h b/ydb/core/sys_view/common/common.h index c88107063a..65256b8b6d 100644 --- a/ydb/core/sys_view/common/common.h +++ b/ydb/core/sys_view/common/common.h @@ -28,5 +28,7 @@ enum class EProcessorMode { FAST // fast mode for tests }; +constexpr size_t TOP_PARTITIONS_COUNT = 10; + } // NSysView } // NKikimr diff --git a/ydb/core/sys_view/common/events.h b/ydb/core/sys_view/common/events.h index df73e745f5..314511d184 100644 --- a/ydb/core/sys_view/common/events.h +++ b/ydb/core/sys_view/common/events.h @@ -66,6 +66,10 @@ struct TEvSysView { EvGetStorageStatsRequest, EvGetStorageStatsResponse, + EvSendTopPartitions, + EvGetTopPartitionsRequest, + EvGetTopPartitionsResponse, + EvEnd, }; @@ -361,6 +365,24 @@ struct TEvSysView { struct TEvGetStorageStatsResponse : TEventPB<TEvGetStorageStatsResponse, NKikimrSysView::TEvGetStorageStatsResponse, EvGetStorageStatsResponse> {}; + + struct TEvSendTopPartitions : public TEventPB< + TEvSendTopPartitions, + NKikimrSysView::TEvSendTopPartitions, + EvSendTopPartitions> + {}; + + struct TEvGetTopPartitionsRequest : public TEventPB< + TEvGetTopPartitionsRequest, + NKikimrSysView::TEvGetTopPartitionsRequest, + EvGetTopPartitionsRequest> + {}; + + struct TEvGetTopPartitionsResponse : public TEventPB< + TEvGetTopPartitionsResponse, + NKikimrSysView::TEvGetTopPartitionsResponse, + EvGetTopPartitionsResponse> + {}; }; } // NSysView diff --git a/ydb/core/sys_view/common/processor_scan.h b/ydb/core/sys_view/common/processor_scan.h new file mode 100644 index 0000000000..56aeb70a0a --- /dev/null +++ b/ydb/core/sys_view/common/processor_scan.h @@ -0,0 +1,124 @@ +#include "common.h" +#include "events.h" +#include "keys.h" +#include "schema.h" +#include "scan_actor_base_impl.h" + +#include <ydb/core/base/tablet_pipecache.h> + +#include <library/cpp/actors/core/hfunc.h> + +namespace NKikimr::NSysView { + +using namespace NActors; + +template <typename TEntry, typename TRequest, typename TResponse, + typename TEvRequest, typename TEvResponse, typename TExtractorMap, typename... T> +class TProcessorScan : public TScanActorBase< + TProcessorScan<TEntry, TRequest, TResponse, TEvRequest, TEvResponse, TExtractorMap, T...>> +{ +public: + using TScan = TProcessorScan<TEntry, TRequest, TResponse, TEvRequest, TEvResponse, TExtractorMap, T...>; + using TBase = TScanActorBase<TScan>; + + static constexpr auto ActorActivityType() { + return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN; + } + + TProcessorScan( + const TActorId& ownerId, + ui32 scanId, + const TTableId& tableId, + const TTableRange& tableRange, + const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, + NKikimrSysView::EStatsType type) + : TBase(ownerId, scanId, tableId, tableRange, columns) + { + ConvertKeyRange<TRequest, T...>(Request, this->TableRange); + Request.SetType(type); + } + + STATEFN(StateScan) { + switch (ev->GetTypeRewrite()) { + hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + hFunc(TEvResponse, Handle); + hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(NKqp::TEvKqp::TEvAbortExecution, this->HandleAbortExecution); + cFunc(TEvents::TEvWakeup::EventType, this->HandleTimeout); + cFunc(TEvents::TEvPoison::EventType, PassAway); + default: + SVLOG_CRIT("NSysView::TProcessorScan: unexpected event# " << ev->GetTypeRewrite()); + } + } + +private: + void ProceedToScan() override { + TBase::Become(&TScan::StateScan); + if (this->AckReceived) { + RequestBatch(); + } + } + + void RequestBatch() { + if (!this->SysViewProcessorId) { + SVLOG_W("NSysView::TProcessorScan: no sysview processor for database " << this->TenantName + << ", sending empty response"); + this->ReplyEmptyAndDie(); + return; + } + + if (this->BatchRequestInFlight) { + return; + } + + auto req = MakeHolder<TEvRequest>(); + req->Record.CopyFrom(Request); + + TBase::Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward(req.Release(), this->SysViewProcessorId, true), + IEventHandle::FlagTrackDelivery); + + this->BatchRequestInFlight = true; + } + + void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) { + RequestBatch(); + } + + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) { + this->ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, + "NSysView::TProcessorScan: delivery problem"); + } + + void Handle(typename TEvResponse::TPtr& ev) { + const auto& record = ev->Get()->Record; + if (record.HasOverloaded() && record.GetOverloaded()) { + this->ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, + "NSysView::TProcessorScan: SysViewProcessor is overloaded"); + return; + } + + this->template ReplyBatch<TEvResponse, TEntry, TExtractorMap, true>(ev); + + if (!record.GetLastBatch()) { + Y_VERIFY(record.HasNext()); + Request.MutableFrom()->CopyFrom(record.GetNext()); + Request.SetInclusiveFrom(true); + } + + this->BatchRequestInFlight = false; + } + + void PassAway() override { + TBase::Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); + TBase::PassAway(); + } + +private: + TRequest Request; +}; + +template <typename TEntry> +using TExtractorFunc = std::function<TCell(const TEntry&)>; + +} // NKikimr::NSysView diff --git a/ydb/core/sys_view/common/schema.cpp b/ydb/core/sys_view/common/schema.cpp index 1917ea35a4..d45af9c9d4 100644 --- a/ydb/core/sys_view/common/schema.cpp +++ b/ydb/core/sys_view/common/schema.cpp @@ -216,6 +216,9 @@ private: RegisterOlapStoreSystemView<Schema::PrimaryIndexStats>(StorePrimaryIndexStatsName); RegisterOlapTableSystemView<Schema::PrimaryIndexStats>(TablePrimaryIndexStatsName); + + RegisterSystemView<Schema::TopPartitions>(TopPartitions1MinuteName); + RegisterSystemView<Schema::TopPartitions>(TopPartitions1HourName); } private: diff --git a/ydb/core/sys_view/common/schema.h b/ydb/core/sys_view/common/schema.h index 29ba8b9b0f..cf7830073f 100644 --- a/ydb/core/sys_view/common/schema.h +++ b/ydb/core/sys_view/common/schema.h @@ -33,6 +33,9 @@ constexpr TStringBuf QueryMetricsName = "query_metrics_one_minute"; constexpr TStringBuf StorePrimaryIndexStatsName = "store_primary_index_stats"; constexpr TStringBuf TablePrimaryIndexStatsName = "primary_index_stats"; +constexpr TStringBuf TopPartitions1MinuteName = "top_partitions_one_minute"; +constexpr TStringBuf TopPartitions1HourName = "top_partitions_one_hour"; + struct Schema : NIceDb::Schema { struct PartitionStats : Table<1> { struct OwnerId : Column<1, NScheme::NTypeIds::Uint64> {}; @@ -420,6 +423,34 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<PDiskFilter, ErasureSpecies, CurrentGroupsCreated, CurrentAllocatedSize, CurrentAvailableSize, AvailableGroupsToCreate, AvailableSizeToCreate>; }; + + struct TopPartitions : Table<12> { + struct IntervalEnd : Column<1, NScheme::NTypeIds::Timestamp> {}; + struct Rank : Column<2, NScheme::NTypeIds::Uint32> {}; + struct TabletId : Column<3, NScheme::NTypeIds::Uint64> {}; + struct Path : Column<4, NScheme::NTypeIds::Utf8> {}; + struct PeakTime : Column<5, NScheme::NTypeIds::Timestamp> {}; + struct CPUCores : Column<6, NScheme::NTypeIds::Double> {}; + struct NodeId : Column<7, NScheme::NTypeIds::Uint32> {}; + struct DataSize : Column<8, NScheme::NTypeIds::Uint64> {}; + struct RowCount : Column<9, NScheme::NTypeIds::Uint64> {}; + struct IndexSize : Column<10, NScheme::NTypeIds::Uint64> {}; + struct InFlightTxCount : Column<11, NScheme::NTypeIds::Uint32> {}; + + using TKey = TableKey<IntervalEnd, Rank>; + using TColumns = TableColumns< + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount>; + }; }; bool MaybeSystemViewPath(const TVector<TString>& path); diff --git a/ydb/core/sys_view/common/ya.make b/ydb/core/sys_view/common/ya.make index b4ec91b3d8..b94acc176d 100644 --- a/ydb/core/sys_view/common/ya.make +++ b/ydb/core/sys_view/common/ya.make @@ -14,6 +14,7 @@ SRCS( schema.h schema.cpp utils.h + processor_scan.h ) PEERDIR( diff --git a/ydb/core/sys_view/partition_stats/partition_stats.cpp b/ydb/core/sys_view/partition_stats/partition_stats.cpp index d6ea9ff73b..ae18049e97 100644 --- a/ydb/core/sys_view/partition_stats/partition_stats.cpp +++ b/ydb/core/sys_view/partition_stats/partition_stats.cpp @@ -1,5 +1,6 @@ #include "partition_stats.h" +#include <ydb/core/sys_view/common/common.h> #include <ydb/core/sys_view/common/events.h> #include <ydb/core/sys_view/common/schema.h> #include <ydb/core/sys_view/common/scan_actor_base_impl.h> @@ -14,19 +15,40 @@ namespace NSysView { using namespace NActors; -class TPartitionStatsCollector : public TActor<TPartitionStatsCollector> { +class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollector> { public: + using TBase = TActorBootstrapped<TPartitionStatsCollector>; + static constexpr auto ActorActivityType() { return NKikimrServices::TActivity::SYSTEM_VIEW_PART_STATS_COLLECTOR; } - explicit TPartitionStatsCollector(size_t batchSize, size_t pendingRequestsLimit) - : TActor(&TPartitionStatsCollector::StateWork) + explicit TPartitionStatsCollector(TPathId domainKey, ui64 sysViewProcessorId, + size_t batchSize, size_t pendingRequestsLimit) + : DomainKey(domainKey) + , SysViewProcessorId(sysViewProcessorId) , BatchSize(batchSize) , PendingRequestsLimit(pendingRequestsLimit) {} - STFUNC(StateWork) { + void Bootstrap() { + SVLOG_D("NSysView::TPartitionStatsCollector bootstrapped: " + << "domain key# " << DomainKey + << ", sysview processor id# " << SysViewProcessorId); + + if (AppData()->UsePartitionStatsCollectorForTests) { + OverloadedPartitionBound = 0.0; + ProcessOverloadedInterval = TDuration::Seconds(1); + } + + if (SysViewProcessorId) { + Schedule(ProcessOverloadedInterval, new TEvPrivate::TEvProcessOverloaded); + } + + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { hFunc(TEvSysView::TEvSetPartitioning, Handle); hFunc(TEvSysView::TEvRemoveTable, Handle); @@ -34,10 +56,11 @@ public: hFunc(TEvSysView::TEvUpdateTtlStats, Handle); hFunc(TEvSysView::TEvGetPartitionStats, Handle); hFunc(TEvPrivate::TEvProcess, Handle); + hFunc(TEvPrivate::TEvProcessOverloaded, Handle); + IgnoreFunc(TEvPipeCache::TEvDeliveryProblem); cFunc(TEvents::TEvPoison::EventType, PassAway); default: - LOG_CRIT(ctx, NKikimrServices::SYSTEM_VIEWS, - "NSysView::TPartitionStatsCollector: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + SVLOG_CRIT("NSysView::TPartitionStatsCollector: unexpected event " << ev->GetTypeRewrite()); } } @@ -45,11 +68,14 @@ private: struct TEvPrivate { enum EEv { EvProcess = EventSpaceBegin(TEvents::ES_PRIVATE), + EvProcessOverloaded, EvEnd }; struct TEvProcess : public TEventLocal<TEvProcess, EvProcess> {}; + + struct TEvProcessOverloaded : public TEventLocal<TEvProcessOverloaded, EvProcessOverloaded> {}; }; void Handle(TEvSysView::TEvSetPartitioning::TPtr& ev) { @@ -57,24 +83,36 @@ private: const auto& pathId = ev->Get()->PathId; auto& tables = DomainTables[domainKey]; - auto it = tables.find(pathId); - if (it != tables.end()) { - auto& oldPartitions = it->second.Partitions; - THashMap<TShardIdx, NKikimrSysView::TPartitionStats> newPartitions; + auto tableFound = tables.Stats.find(pathId); + if (tableFound != tables.Stats.end()) { + auto& table = tableFound->second; + + auto& oldPartitions = table.Partitions; + std::unordered_map<TShardIdx, NKikimrSysView::TPartitionStats> newPartitions; + std::unordered_set<TShardIdx> overloaded; for (auto shardIdx : ev->Get()->ShardIndices) { auto old = oldPartitions.find(shardIdx); if (old != oldPartitions.end()) { newPartitions[shardIdx] = old->second; + if (IsPartitionOverloaded(old->second)) { + overloaded.insert(shardIdx); + } } } + if (!overloaded.empty()) { + tables.Overloaded[pathId].swap(overloaded); + } else { + tables.Overloaded.erase(pathId); + } + oldPartitions.swap(newPartitions); - it->second.ShardIndices.swap(ev->Get()->ShardIndices); - it->second.Path = ev->Get()->Path; + table.ShardIndices.swap(ev->Get()->ShardIndices); + table.Path = ev->Get()->Path; } else { - auto& table = tables[pathId]; + auto& table = tables.Stats[pathId]; table.ShardIndices.swap(ev->Get()->ShardIndices); table.Path = ev->Get()->Path; } @@ -85,7 +123,8 @@ private: const auto& pathId = ev->Get()->PathId; auto& tables = DomainTables[domainKey]; - tables.erase(pathId); + tables.Stats.erase(pathId); + tables.Overloaded.erase(pathId); } void Handle(TEvSysView::TEvSendPartitionStats::TPtr& ev) { @@ -94,14 +133,27 @@ private: const auto& shardIdx = ev->Get()->ShardIdx; auto& tables = DomainTables[domainKey]; - auto it = tables.find(pathId); - if (it == tables.end()) { + auto tableFound = tables.Stats.find(pathId); + if (tableFound == tables.Stats.end()) { return; } - auto& oldStats = it->second.Partitions[shardIdx]; + auto& table = tableFound->second; + auto& oldStats = table.Partitions[shardIdx]; auto& newStats = ev->Get()->Stats; + if (IsPartitionOverloaded(newStats)) { + tables.Overloaded[pathId].insert(shardIdx); + } else { + auto overloadedFound = tables.Overloaded.find(pathId); + if (overloadedFound != tables.Overloaded.end()) { + overloadedFound->second.erase(shardIdx); + if (overloadedFound->second.empty()) { + tables.Overloaded.erase(pathId); + } + } + } + if (oldStats.HasTtlStats()) { newStats.MutableTtlStats()->Swap(oldStats.MutableTtlStats()); } @@ -115,12 +167,12 @@ private: const auto& shardIdx = ev->Get()->ShardIdx; auto& tables = DomainTables[domainKey]; - auto it = tables.find(pathId); - if (it == tables.end()) { + auto tableFound = tables.Stats.find(pathId); + if (tableFound == tables.Stats.end()) { return; } - it->second.Partitions[shardIdx].MutableTtlStats()->Swap(&ev->Get()->Stats); + tableFound->second.Partitions[shardIdx].MutableTtlStats()->Swap(&ev->Get()->Stats); } void Handle(TEvSysView::TEvGetPartitionStats::TPtr& ev) { @@ -170,7 +222,7 @@ private: Send(request->Sender, std::move(result)); return; } - auto& tables = itTables->second; + auto& tables = itTables->second.Stats; auto it = tables.begin(); auto itEnd = tables.end(); @@ -290,27 +342,115 @@ private: Send(request->Sender, std::move(result)); } + void Handle(TEvPrivate::TEvProcessOverloaded::TPtr&) { + if (!SysViewProcessorId) { + return; + } + + Schedule(ProcessOverloadedInterval, new TEvPrivate::TEvProcessOverloaded); + + auto domainFound = DomainTables.find(DomainKey); + if (domainFound == DomainTables.end()) { + SVLOG_D("NSysView::TPartitionStatsCollector: TEvProcessOverloaded: no tables"); + return; + } + auto& domainTables = domainFound->second; + + struct TPartition { + TPathId PathId; + TShardIdx ShardIdx; + double CPUCores; + }; + std::vector<TPartition> sorted; + + for (const auto& [pathId, shardIndices] : domainTables.Overloaded) { + for (const auto& shardIdx : shardIndices) { + auto& table = domainTables.Stats[pathId]; + auto& partition = table.Partitions[shardIdx]; + sorted.emplace_back(TPartition{pathId, shardIdx, partition.GetCPUCores()}); + } + } + + std::sort(sorted.begin(), sorted.end(), + [] (const auto& l, const auto& r) { return l.CPUCores > r.CPUCores; }); + + auto now = TActivationContext::Now(); + auto nowUs = now.MicroSeconds(); + + size_t count = 0; + auto sendEvent = MakeHolder<TEvSysView::TEvSendTopPartitions>(); + for (const auto& entry : sorted) { + auto& table = domainTables.Stats[entry.PathId]; + auto& partition = table.Partitions[entry.ShardIdx]; + + auto* result = sendEvent->Record.AddPartitions(); + result->SetTabletId(partition.GetTabletId()); + result->SetPath(table.Path); + result->SetPeakTimeUs(nowUs); + result->SetCPUCores(partition.GetCPUCores()); + result->SetNodeId(partition.GetNodeId()); + result->SetDataSize(partition.GetDataSize()); + result->SetRowCount(partition.GetRowCount()); + result->SetIndexSize(partition.GetIndexSize()); + result->SetInFlightTxCount(partition.GetInFlightTxCount()); + + if (++count == TOP_PARTITIONS_COUNT) { + break; + } + } + + sendEvent->Record.SetTimeUs(nowUs); + + SVLOG_D("NSysView::TPartitionStatsCollector: TEvProcessOverloaded " + << "top size# " << sorted.size() + << ", time# " << now); + + Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward(sendEvent.Release(), SysViewProcessorId, true)); + } + + void PassAway() override { + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); + TBase::PassAway(); + } + + bool IsPartitionOverloaded(NKikimrSysView::TPartitionStats& stats) { + return stats.GetCPUCores() >= OverloadedPartitionBound; + } + private: + const TPathId DomainKey; + const ui64 SysViewProcessorId; const size_t BatchSize; const size_t PendingRequestsLimit; + double OverloadedPartitionBound = 0.7; + TDuration ProcessOverloadedInterval = TDuration::Seconds(15); + struct TTableStats { - THashMap<TShardIdx, NKikimrSysView::TPartitionStats> Partitions; // shardIdx -> stats - TVector<TShardIdx> ShardIndices; + std::unordered_map<TShardIdx, NKikimrSysView::TPartitionStats> Partitions; // shardIdx -> stats + std::vector<TShardIdx> ShardIndices; TString Path; }; - using TDomainTables = TMap<TPathId, TTableStats>; - THashMap<TPathId, TDomainTables> DomainTables; + struct TDomainTables { + std::map<TPathId, TTableStats> Stats; + std::unordered_map<TPathId, std::unordered_set<TShardIdx>> Overloaded; + }; + std::unordered_map<TPathId, TDomainTables> DomainTables; TQueue<TEvSysView::TEvGetPartitionStats::TPtr> PendingRequests; bool ProcessInFly = false; }; -THolder<IActor> CreatePartitionStatsCollector(size_t batchSize, size_t pendingRequestsLimit) { - return MakeHolder<TPartitionStatsCollector>(batchSize, pendingRequestsLimit); +THolder<IActor> CreatePartitionStatsCollector( + TPathId domainKey, ui64 sysViewProcessorId, size_t batchSize, size_t pendingRequestsLimit) +{ + return MakeHolder<TPartitionStatsCollector>( + domainKey, sysViewProcessorId, batchSize, pendingRequestsLimit); } + class TPartitionStatsScan : public TScanActorBase<TPartitionStatsScan> { public: using TBase = TScanActorBase<TPartitionStatsScan>; diff --git a/ydb/core/sys_view/partition_stats/partition_stats.h b/ydb/core/sys_view/partition_stats/partition_stats.h index dedf62906f..f1f192b9db 100644 --- a/ydb/core/sys_view/partition_stats/partition_stats.h +++ b/ydb/core/sys_view/partition_stats/partition_stats.h @@ -9,6 +9,8 @@ constexpr size_t STATS_COLLECTOR_BATCH_SIZE = 5000; constexpr size_t STATS_COLLECTOR_QUEUE_SIZE_LIMIT = 10; THolder<IActor> CreatePartitionStatsCollector( + TPathId domainKey, + ui64 sysViewProcessorId, size_t batchSize = STATS_COLLECTOR_BATCH_SIZE, size_t pendingRequestsCount = STATS_COLLECTOR_QUEUE_SIZE_LIMIT); diff --git a/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp b/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp index c123012948..17a8e03aa6 100644 --- a/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp +++ b/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp @@ -16,12 +16,20 @@ Y_UNIT_TEST_SUITE(PartitionStats) { return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr }; } + void WaitForBootstrap(TTestActorRuntime &runtime) { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); + UNIT_ASSERT(runtime.DispatchEvents(options)); + } + void TestCollector(size_t batchSize) { TTestActorRuntime runtime; runtime.Initialize(MakeEgg()); - auto collector = CreatePartitionStatsCollector(batchSize); + auto collector = CreatePartitionStatsCollector(TPathId(), 0, batchSize); auto collectorId = runtime.Register(collector.Release()); + WaitForBootstrap(runtime); + auto sender = runtime.AllocateEdgeActor(); auto domainKey = TPathId(1, 1); @@ -175,8 +183,10 @@ Y_UNIT_TEST_SUITE(PartitionStats) { TTestActorRuntime runtime; runtime.Initialize(MakeEgg()); - auto collector = CreatePartitionStatsCollector(1, 0); + auto collector = CreatePartitionStatsCollector(TPathId(), 0, 1, 0); auto collectorId = runtime.Register(collector.Release()); + WaitForBootstrap(runtime); + auto sender = runtime.AllocateEdgeActor(); auto domainKey = TPathId(1, 1); diff --git a/ydb/core/sys_view/partition_stats/top_partitions.cpp b/ydb/core/sys_view/partition_stats/top_partitions.cpp new file mode 100644 index 0000000000..57a719da46 --- /dev/null +++ b/ydb/core/sys_view/partition_stats/top_partitions.cpp @@ -0,0 +1,91 @@ +#include "top_partitions.h" + +#include <ydb/core/sys_view/common/events.h> +#include <ydb/core/sys_view/common/processor_scan.h> + +namespace NKikimr::NSysView { + +using namespace NActors; + +template <> +void SetField<0>(NKikimrSysView::TTopPartitionsKey& key, ui64 value) { + key.SetIntervalEndUs(value); +} + +template <> +void SetField<1>(NKikimrSysView::TTopPartitionsKey& key, ui32 value) { + key.SetRank(value); +} + +struct TTopPartitionsExtractorMap : + public std::unordered_map<NTable::TTag, TExtractorFunc<NKikimrSysView::TTopPartitionsEntry>> +{ + using S = Schema::TopPartitions; + using E = NKikimrSysView::TTopPartitionsEntry; + + TTopPartitionsExtractorMap() { + insert({S::IntervalEnd::ColumnId, [] (const E& entry) { + return TCell::Make<ui64>(entry.GetKey().GetIntervalEndUs()); + }}); + insert({S::Rank::ColumnId, [] (const E& entry) { + return TCell::Make<ui32>(entry.GetKey().GetRank()); + }}); + insert({S::TabletId::ColumnId, [] (const E& entry) { + return TCell::Make<ui64>(entry.GetInfo().GetTabletId()); + }}); + insert({S::Path::ColumnId, [] (const E& entry) { + const auto& text = entry.GetInfo().GetPath(); + return TCell(text.data(), text.size()); + }}); + insert({S::PeakTime::ColumnId, [] (const E& entry) { + return TCell::Make<ui64>(entry.GetInfo().GetPeakTimeUs()); + }}); + insert({S::CPUCores::ColumnId, [] (const E& entry) { + return TCell::Make<double>(entry.GetInfo().GetCPUCores()); + }}); + insert({S::NodeId::ColumnId, [] (const E& entry) { + return TCell::Make<ui32>(entry.GetInfo().GetNodeId()); + }}); + insert({S::DataSize::ColumnId, [] (const E& entry) { + return TCell::Make<ui64>(entry.GetInfo().GetDataSize()); + }}); + insert({S::RowCount::ColumnId, [] (const E& entry) { + return TCell::Make<ui64>(entry.GetInfo().GetRowCount()); + }}); + insert({S::IndexSize::ColumnId, [] (const E& entry) { + return TCell::Make<ui64>(entry.GetInfo().GetIndexSize()); + }}); + insert({S::InFlightTxCount::ColumnId, [] (const E& entry) { + return TCell::Make<ui32>(entry.GetInfo().GetInFlightTxCount()); + }}); + } +}; + +THolder<IActor> CreateTopPartitionsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, + const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns) +{ + using TTopPartitionsScan = TProcessorScan< + NKikimrSysView::TTopPartitionsEntry, + NKikimrSysView::TEvGetTopPartitionsRequest, + NKikimrSysView::TEvGetTopPartitionsResponse, + TEvSysView::TEvGetTopPartitionsRequest, + TEvSysView::TEvGetTopPartitionsResponse, + TTopPartitionsExtractorMap, + ui64, + ui32 + >; + + auto viewName = tableId.SysViewInfo; + + if (viewName == TopPartitions1MinuteName) { + return MakeHolder<TTopPartitionsScan>(ownerId, scanId, tableId, tableRange, columns, + NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE); + + } else if (viewName == TopPartitions1HourName) { + return MakeHolder<TTopPartitionsScan>(ownerId, scanId, tableId, tableRange, columns, + NKikimrSysView::TOP_PARTITIONS_ONE_HOUR); + } + return {}; +} + +} // NKikimr::NSysView diff --git a/ydb/core/sys_view/partition_stats/top_partitions.h b/ydb/core/sys_view/partition_stats/top_partitions.h new file mode 100644 index 0000000000..dcf49a89a5 --- /dev/null +++ b/ydb/core/sys_view/partition_stats/top_partitions.h @@ -0,0 +1,11 @@ +#pragma once + +#include <ydb/core/kqp/runtime/kqp_compute.h> + +namespace NKikimr::NSysView { + +THolder<IActor> CreateTopPartitionsScan(const TActorId& ownerId, ui32 scanId, + const TTableId& tableId, const TTableRange& tableRange, + const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns); + +} // NKikimr::NSysView diff --git a/ydb/core/sys_view/partition_stats/ya.make b/ydb/core/sys_view/partition_stats/ya.make index d24bf5c89d..4528f1f527 100644 --- a/ydb/core/sys_view/partition_stats/ya.make +++ b/ydb/core/sys_view/partition_stats/ya.make @@ -8,6 +8,8 @@ OWNER( SRCS( partition_stats.h partition_stats.cpp + top_partitions.h + top_partitions.cpp ) PEERDIR( diff --git a/ydb/core/sys_view/processor/processor_impl.cpp b/ydb/core/sys_view/processor/processor_impl.cpp index 1bdb84ffb0..f6566b07e3 100644 --- a/ydb/core/sys_view/processor/processor_impl.cpp +++ b/ydb/core/sys_view/processor/processor_impl.cpp @@ -11,8 +11,8 @@ namespace NSysView { TSysViewProcessor::TSysViewProcessor(const TActorId& tablet, TTabletStorageInfo* info, EProcessorMode processorMode) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) - , TotalInterval(TDuration::Seconds(processorMode == EProcessorMode::FAST ? 6 : 60)) - , CollectInterval(TDuration::Seconds(processorMode == EProcessorMode::FAST ? 3 : 30)) + , TotalInterval(TDuration::Seconds(processorMode == EProcessorMode::FAST ? 1 : 60)) + , CollectInterval(TotalInterval / 2) , ExternalGroup(new NMonitoring::TDynamicCounters) { InternalGroups["kqp_serverless"] = new NMonitoring::TDynamicCounters; @@ -70,8 +70,8 @@ void TSysViewProcessor::PersistIntervalEnd(NIceDb::TNiceDb& db) { } template <typename TSchema> -void TSysViewProcessor::PersistTopResults(NIceDb::TNiceDb& db, - TTop& top, TResultStatsMap& results, TInstant intervalEnd) +void TSysViewProcessor::PersistQueryTopResults(NIceDb::TNiceDb& db, + TQueryTop& top, TResultStatsMap& results, TInstant intervalEnd) { ui64 intervalEndUs = intervalEnd.MicroSeconds(); ui32 rank = 0; @@ -95,14 +95,14 @@ void TSysViewProcessor::PersistTopResults(NIceDb::TNiceDb& db, } } - SVLOG_D("[" << TabletID() << "] PersistTopResults: " + SVLOG_D("[" << TabletID() << "] PersistQueryTopResults: " << "table id# " << TSchema::TableId << ", interval end# " << intervalEnd << ", query count# " << top.size() << ", persisted# " << rank); } -void TSysViewProcessor::PersistResults(NIceDb::TNiceDb& db) { +void TSysViewProcessor::PersistQueryResults(NIceDb::TNiceDb& db) { std::vector<std::pair<ui64, TQueryHash>> sorted; sorted.reserve(QueryMetrics.size()); for (const auto& [queryHash, metrics] : QueryMetrics) { @@ -128,33 +128,69 @@ void TSysViewProcessor::PersistResults(NIceDb::TNiceDb& db) { NIceDb::TUpdate<Schema::MetricsOneMinute::Data>(serialized)); } - SVLOG_D("[" << TabletID() << "] PersistResults: " + SVLOG_D("[" << TabletID() << "] PersistQueryResults: " << "interval end# " << IntervalEnd << ", query count# " << sorted.size()); // TODO: metrics one hour? - PersistTopResults<Schema::TopByDurationOneMinute>( + PersistQueryTopResults<Schema::TopByDurationOneMinute>( db, ByDurationMinute, TopByDurationOneMinute, IntervalEnd); - PersistTopResults<Schema::TopByReadBytesOneMinute>( + PersistQueryTopResults<Schema::TopByReadBytesOneMinute>( db, ByReadBytesMinute, TopByReadBytesOneMinute, IntervalEnd); - PersistTopResults<Schema::TopByCpuTimeOneMinute>( + PersistQueryTopResults<Schema::TopByCpuTimeOneMinute>( db, ByCpuTimeMinute, TopByCpuTimeOneMinute, IntervalEnd); - PersistTopResults<Schema::TopByRequestUnitsOneMinute>( + PersistQueryTopResults<Schema::TopByRequestUnitsOneMinute>( db, ByRequestUnitsMinute, TopByRequestUnitsOneMinute, IntervalEnd); auto hourEnd = EndOfHourInterval(IntervalEnd); - PersistTopResults<Schema::TopByDurationOneHour>( + PersistQueryTopResults<Schema::TopByDurationOneHour>( db, ByDurationHour, TopByDurationOneHour, hourEnd); - PersistTopResults<Schema::TopByReadBytesOneHour>( + PersistQueryTopResults<Schema::TopByReadBytesOneHour>( db, ByReadBytesHour, TopByReadBytesOneHour, hourEnd); - PersistTopResults<Schema::TopByCpuTimeOneHour>( + PersistQueryTopResults<Schema::TopByCpuTimeOneHour>( db, ByCpuTimeHour, TopByCpuTimeOneHour, hourEnd); - PersistTopResults<Schema::TopByRequestUnitsOneHour>( + PersistQueryTopResults<Schema::TopByRequestUnitsOneHour>( db, ByRequestUnitsHour, TopByRequestUnitsOneHour, hourEnd); } +template <typename TSchema> +void TSysViewProcessor::PersistPartitionTopResults(NIceDb::TNiceDb& db, + TPartitionTop& top, TResultPartitionsMap& results, TInstant intervalEnd) +{ + ui64 intervalEndUs = intervalEnd.MicroSeconds(); + ui32 rank = 0; + + for (const auto& partition : top) { + auto key = std::make_pair(intervalEndUs, ++rank); + auto& info = results[key]; + info.CopyFrom(*partition); + + TString data; + Y_PROTOBUF_SUPPRESS_NODISCARD info.SerializeToString(&data); + db.Table<TSchema>().Key(key).Update( + NIceDb::TUpdate<typename TSchema::Data>(data)); + } + + SVLOG_D("[" << TabletID() << "] PersistPartitionTopResults: " + << "table id# " << TSchema::TableId + << ", partition interval end# " << intervalEnd + << ", partition count# " << top.size()); +} + +void TSysViewProcessor::PersistPartitionResults(NIceDb::TNiceDb& db) { + auto intervalEnd = IntervalEnd + TotalInterval; + + PersistPartitionTopResults<Schema::TopPartitionsOneMinute>( + db, PartitionTopMinute, TopPartitionsOneMinute, intervalEnd); + + auto hourEnd = EndOfHourInterval(intervalEnd); + + PersistPartitionTopResults<Schema::TopPartitionsOneHour>( + db, PartitionTopHour, TopPartitionsOneHour, hourEnd); +} + void TSysViewProcessor::ScheduleAggregate() { auto rangeUs = RandomNumber<ui64>(TotalInterval.MicroSeconds() / 12); auto deadline = IntervalEnd + CollectInterval + TDuration::MicroSeconds(rangeUs); @@ -182,18 +218,18 @@ void TSysViewProcessor::ScheduleSendNavigate() { Schedule(SendNavigateInterval, new TEvPrivate::TEvSendNavigate); } -template <typename TSchema, typename TEntry> -void TSysViewProcessor::CutHistory(NIceDb::TNiceDb& db, - TResultMap<TEntry>& metricsMap, TDuration historySize) -{ +template <typename TSchema, typename TMap> +void TSysViewProcessor::CutHistory(NIceDb::TNiceDb& db, TMap& results, TDuration historySize) { auto past = IntervalEnd - historySize; - auto key = std::make_pair(past.MicroSeconds(), 0); + typename TMap::key_type key; + key.first = past.MicroSeconds(); + key.second = 0; - auto bound = metricsMap.lower_bound(key); - for (auto it = metricsMap.begin(); it != bound; ++it) { + auto bound = results.lower_bound(key); + for (auto it = results.begin(); it != bound; ++it) { db.Table<TSchema>().Key(it->first).Delete(); } - metricsMap.erase(metricsMap.begin(), bound); + results.erase(results.begin(), bound); } TInstant TSysViewProcessor::EndOfHourInterval(TInstant intervalEnd) { @@ -218,8 +254,6 @@ void TSysViewProcessor::ClearIntervalSummaries(NIceDb::TNiceDb& db) { } void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) { - // TODO: efficient delete? - ClearIntervalSummaries(db); for (const auto& [queryHash, _] : QueryMetrics) { @@ -233,22 +267,32 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) { NodesToRequest.clear(); NodesInFlight.clear(); - auto clearTop = [&] (NKikimrSysView::EStatsType type, TTop& top) { + auto clearQueryTop = [&] (NKikimrSysView::EStatsType type, TQueryTop& top) { for (const auto& query : top) { db.Table<Schema::IntervalTops>().Key((ui32)type, query.Hash).Delete(); } top.clear(); }; - clearTop(NKikimrSysView::TOP_DURATION_ONE_MINUTE, ByDurationMinute); - clearTop(NKikimrSysView::TOP_READ_BYTES_ONE_MINUTE, ByReadBytesMinute); - clearTop(NKikimrSysView::TOP_CPU_TIME_ONE_MINUTE, ByCpuTimeMinute); - clearTop(NKikimrSysView::TOP_REQUEST_UNITS_ONE_MINUTE, ByRequestUnitsMinute); + auto clearPartitionTop = [&] (NKikimrSysView::EStatsType type, TPartitionTop& top) { + for (const auto& partition : top) { + db.Table<Schema::IntervalPartitionTops>().Key((ui32)type, partition->GetTabletId()).Delete(); + } + top.clear(); + }; + + clearQueryTop(NKikimrSysView::TOP_DURATION_ONE_MINUTE, ByDurationMinute); + clearQueryTop(NKikimrSysView::TOP_READ_BYTES_ONE_MINUTE, ByReadBytesMinute); + clearQueryTop(NKikimrSysView::TOP_CPU_TIME_ONE_MINUTE, ByCpuTimeMinute); + clearQueryTop(NKikimrSysView::TOP_REQUEST_UNITS_ONE_MINUTE, ByRequestUnitsMinute); + + clearPartitionTop(NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE, PartitionTopMinute); CurrentStage = COLLECT; PersistStage(db); auto oldHourEnd = EndOfHourInterval(IntervalEnd); + auto partitionOldHourEnd = EndOfHourInterval(IntervalEnd + TotalInterval); auto now = ctx.Now(); auto intervalSize = TotalInterval.MicroSeconds(); @@ -257,12 +301,17 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) { PersistIntervalEnd(db); auto newHourEnd = EndOfHourInterval(IntervalEnd); + auto partitionNewHourEnd = EndOfHourInterval(IntervalEnd + TotalInterval); if (oldHourEnd != newHourEnd) { - clearTop(NKikimrSysView::TOP_DURATION_ONE_HOUR, ByDurationHour); - clearTop(NKikimrSysView::TOP_READ_BYTES_ONE_HOUR, ByReadBytesHour); - clearTop(NKikimrSysView::TOP_CPU_TIME_ONE_HOUR, ByCpuTimeHour); - clearTop(NKikimrSysView::TOP_REQUEST_UNITS_ONE_HOUR, ByRequestUnitsHour); + clearQueryTop(NKikimrSysView::TOP_DURATION_ONE_HOUR, ByDurationHour); + clearQueryTop(NKikimrSysView::TOP_READ_BYTES_ONE_HOUR, ByReadBytesHour); + clearQueryTop(NKikimrSysView::TOP_CPU_TIME_ONE_HOUR, ByCpuTimeHour); + clearQueryTop(NKikimrSysView::TOP_REQUEST_UNITS_ONE_HOUR, ByRequestUnitsHour); + } + + if (partitionOldHourEnd != partitionNewHourEnd) { + clearPartitionTop(NKikimrSysView::TOP_PARTITIONS_ONE_HOUR, PartitionTopHour); } SVLOG_D("[" << TabletID() << "] Reset: interval end# " << IntervalEnd); @@ -270,18 +319,20 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) { const auto minuteHistorySize = TotalInterval * ONE_MINUTE_BUCKET_COUNT; const auto hourHistorySize = ONE_HOUR_BUCKET_SIZE * ONE_HOUR_BUCKET_COUNT; - CutHistory<Schema::MetricsOneMinute, TQueryToMetrics>(db, MetricsOneMinute, minuteHistorySize); - CutHistory<Schema::MetricsOneHour, TQueryToMetrics>(db, MetricsOneHour, hourHistorySize); + CutHistory<Schema::MetricsOneMinute>(db, MetricsOneMinute, minuteHistorySize); + CutHistory<Schema::MetricsOneHour>(db, MetricsOneHour, hourHistorySize); - using TStats = NKikimrSysView::TQueryStats; - CutHistory<Schema::TopByDurationOneMinute, TStats>(db, TopByDurationOneMinute, minuteHistorySize); - CutHistory<Schema::TopByDurationOneHour, TStats>(db, TopByDurationOneHour, hourHistorySize); - CutHistory<Schema::TopByReadBytesOneMinute, TStats>(db, TopByReadBytesOneMinute, minuteHistorySize); - CutHistory<Schema::TopByReadBytesOneHour, TStats>(db, TopByReadBytesOneHour, hourHistorySize); - CutHistory<Schema::TopByCpuTimeOneMinute, TStats>(db, TopByCpuTimeOneMinute, minuteHistorySize); - CutHistory<Schema::TopByCpuTimeOneHour, TStats>(db, TopByCpuTimeOneHour, hourHistorySize); - CutHistory<Schema::TopByRequestUnitsOneMinute, TStats>(db, TopByRequestUnitsOneMinute, minuteHistorySize); - CutHistory<Schema::TopByRequestUnitsOneHour, TStats>(db, TopByRequestUnitsOneHour, hourHistorySize); + CutHistory<Schema::TopByDurationOneMinute>(db, TopByDurationOneMinute, minuteHistorySize); + CutHistory<Schema::TopByDurationOneHour>(db, TopByDurationOneHour, hourHistorySize); + CutHistory<Schema::TopByReadBytesOneMinute>(db, TopByReadBytesOneMinute, minuteHistorySize); + CutHistory<Schema::TopByReadBytesOneHour>(db, TopByReadBytesOneHour, hourHistorySize); + CutHistory<Schema::TopByCpuTimeOneMinute>(db, TopByCpuTimeOneMinute, minuteHistorySize); + CutHistory<Schema::TopByCpuTimeOneHour>(db, TopByCpuTimeOneHour, hourHistorySize); + CutHistory<Schema::TopByRequestUnitsOneMinute>(db, TopByRequestUnitsOneMinute, minuteHistorySize); + CutHistory<Schema::TopByRequestUnitsOneHour>(db, TopByRequestUnitsOneHour, hourHistorySize); + + CutHistory<Schema::TopPartitionsOneMinute>(db, TopPartitionsOneMinute, minuteHistorySize); + CutHistory<Schema::TopPartitionsOneHour>(db, TopPartitionsOneHour, hourHistorySize); } void TSysViewProcessor::SendRequests() { @@ -349,13 +400,13 @@ void TSysViewProcessor::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { void TSysViewProcessor::Handle(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) { const auto& record = ev->Get()->Record; - auto type = record.GetType(); if (PendingRequests.size() >= PendingRequestsLimit) { + auto type = record.GetType(); if (type == NKikimrSysView::METRICS_ONE_MINUTE || type == NKikimrSysView::METRICS_ONE_HOUR) { - ReplyOverloaded<TEvSysView::TEvGetQueryMetricsResponse>(ev); + ReplyOverloaded<TEvSysView::TEvGetQueryMetricsResponse>(ev->Sender); } else { - ReplyOverloaded<TEvSysView::TEvGetQueryStatsResponse>(ev); + ReplyOverloaded<TEvSysView::TEvGetQueryStatsResponse>(ev->Sender); } return; } @@ -368,6 +419,20 @@ void TSysViewProcessor::Handle(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) } } +void TSysViewProcessor::Handle(TEvSysView::TEvGetTopPartitionsRequest::TPtr& ev) { + if (PendingRequests.size() >= PendingRequestsLimit) { + ReplyOverloaded<TEvSysView::TEvGetTopPartitionsResponse>(ev->Sender); + return; + } + + PendingRequests.push(std::move(ev)); + + if (!ProcessInFly) { + Send(SelfId(), new TEvPrivate::TEvProcess()); + ProcessInFly = true; + } +} + void TSysViewProcessor::Handle(TEvPrivate::TEvProcess::TPtr&) { ProcessInFly = false; @@ -375,7 +440,7 @@ void TSysViewProcessor::Handle(TEvPrivate::TEvProcess::TPtr&) { return; } - TEvSysView::TEvGetQueryMetricsRequest::TPtr request = std::move(PendingRequests.front()); + TVariantRequestPtr request = std::move(PendingRequests.front()); PendingRequests.pop(); if (!PendingRequests.empty()) { @@ -383,31 +448,71 @@ void TSysViewProcessor::Handle(TEvPrivate::TEvProcess::TPtr&) { ProcessInFly = true; } - const auto& record = request->Get()->Record; - auto type = record.GetType(); + if (auto* req = std::get_if<TEvSysView::TEvGetTopPartitionsRequest::TPtr>(&request)) { + Reply<TResultPartitionsMap, + TEvSysView::TEvGetTopPartitionsRequest, + TEvSysView::TEvGetTopPartitionsResponse>(*req); + + } else if (auto* req = std::get_if<TEvSysView::TEvGetQueryMetricsRequest::TPtr>(&request)) { + const auto& record = (*req)->Get()->Record; + auto type = record.GetType(); - if (type == NKikimrSysView::METRICS_ONE_MINUTE || type == NKikimrSysView::METRICS_ONE_HOUR) { - Reply<TQueryToMetrics, TEvSysView::TEvGetQueryMetricsResponse>(request); + if (type == NKikimrSysView::METRICS_ONE_MINUTE || type == NKikimrSysView::METRICS_ONE_HOUR) { + Reply<TResultMetricsMap, + TEvSysView::TEvGetQueryMetricsRequest, + TEvSysView::TEvGetQueryMetricsResponse>(*req); + } else { + Reply<TResultStatsMap, + TEvSysView::TEvGetQueryMetricsRequest, + TEvSysView::TEvGetQueryStatsResponse>(*req); + } } else { - Reply<NKikimrSysView::TQueryStats, TEvSysView::TEvGetQueryStatsResponse>(request); + Y_FAIL("unknown SVP request"); } } +void TSysViewProcessor::EntryToProto(NKikimrSysView::TQueryMetricsEntry& dst, const TQueryToMetrics& src) { + dst.MutableMetrics()->CopyFrom(src.Metrics); + dst.SetQueryText(src.Text); +} + +void TSysViewProcessor::EntryToProto(NKikimrSysView::TQueryStatsEntry& dst, const NKikimrSysView::TQueryStats& src) { + dst.MutableStats()->CopyFrom(src); +} + +void TSysViewProcessor::EntryToProto(NKikimrSysView::TTopPartitionsEntry& dst, const NKikimrSysView::TTopPartitionsInfo& src) { + dst.MutableInfo()->CopyFrom(src); +} + template <typename TResponse> -void TSysViewProcessor::ReplyOverloaded(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) { +void TSysViewProcessor::ReplyOverloaded(const TActorId& sender) { auto response = MakeHolder<TResponse>(); response->Record.SetOverloaded(true); - Send(ev->Sender, std::move(response)); + Send(sender, std::move(response)); } -template <typename TEntry, typename TResponse> -void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) { +template <typename TMap, typename TRequest, typename TResponse> +void TSysViewProcessor::Reply(typename TRequest::TPtr& ev) { const auto& record = ev->Get()->Record; auto response = MakeHolder<TResponse>(); response->Record.SetLastBatch(true); - TResultMap<TEntry>* entries = nullptr; - if constexpr (std::is_same<TEntry, TQueryToMetrics>::value) { + using TEntry = typename TMap::mapped_type; + TMap* entries = nullptr; + if constexpr (std::is_same<TEntry, NKikimrSysView::TTopPartitionsInfo>::value) { + switch (record.GetType()) { + case NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE: + entries = &TopPartitionsOneMinute; + break; + case NKikimrSysView::TOP_PARTITIONS_ONE_HOUR: + entries = &TopPartitionsOneHour; + break; + default: + SVLOG_CRIT("[" << TabletID() << "] unexpected stats type: " << (size_t)record.GetType()); + Send(ev->Sender, std::move(response)); + return; + } + } else if constexpr (std::is_same<TEntry, TQueryToMetrics>::value) { switch (record.GetType()) { case NKikimrSysView::METRICS_ONE_MINUTE: entries = &MetricsOneMinute; @@ -458,7 +563,6 @@ void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) { auto from = entries->begin(); auto to = entries->end(); - TString fromStr("[]"); if (record.HasFrom()) { auto key = std::make_pair(record.GetFrom().GetIntervalEndUs(), record.GetFrom().GetRank()); if (!record.HasInclusiveFrom() || record.GetInclusiveFrom()) { @@ -466,14 +570,8 @@ void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) { } else { from = entries->upper_bound(key); } - TStringBuilder str; - str << "[" << record.GetFrom().GetIntervalEndUs() - << ", " << record.GetFrom().GetRank() - << ", " << (record.GetInclusiveFrom() ? "inc]" : "exc]"); - fromStr = str; } - TString toStr("[]"); if (record.HasTo()) { auto key = std::make_pair(record.GetTo().GetIntervalEndUs(), record.GetTo().GetRank()); if (!record.HasInclusiveTo() || !record.GetInclusiveTo()) { @@ -481,33 +579,19 @@ void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) { } else { to = entries->upper_bound(key); } - TStringBuilder str; - str << "[" << record.GetTo().GetIntervalEndUs() - << ", " << record.GetTo().GetRank() - << ", " << (record.GetInclusiveTo() ? "inc]" : "exc]"); - toStr = str; } - TString nextStr("[]"); size_t size = 0; size_t count = 0; for (auto it = from; it != to; ++it) { const auto& key = it->first; auto& entry = *response->Record.AddEntries(); - auto& entryKey = *entry.MutableKey(); entryKey.SetIntervalEndUs(key.first); entryKey.SetRank(key.second); - if constexpr (std::is_same<TEntry, TQueryToMetrics>::value) { - const auto& metrics = it->second; - entry.MutableMetrics()->CopyFrom(metrics.Metrics); - entry.SetQueryText(metrics.Text); - } else { - const auto& stats = it->second; - entry.MutableStats()->CopyFrom(stats); - } + EntryToProto(entry, it->second); size += entry.ByteSizeLong(); ++count; @@ -517,19 +601,22 @@ void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) { next->SetIntervalEndUs(key.first); next->SetRank(key.second + 1); response->Record.SetLastBatch(false); - - TStringBuilder str; - str << "[" << next->GetIntervalEndUs() - << ", " << next->GetRank() - << "]"; - nextStr = str; break; } } - SVLOG_D("[" << TabletID() << "] Reply to TEvGetQueryMetricsRequest: " - << "from# " << fromStr - << ", to# " << toStr + TString rangeStr, nextStr; + google::protobuf::TextFormat::Printer range; + range.SetSingleLineMode(true); + range.PrintToString(record, &rangeStr); + if (response->Record.HasNext()) { + google::protobuf::TextFormat::Printer next; + next.SetSingleLineMode(true); + next.PrintToString(response->Record.GetNext(), &nextStr); + } + + SVLOG_D("[" << TabletID() << "] Reply batch: " + << "range# " << rangeStr << ", rows# " << count << ", bytes# " << size << ", next# " << nextStr); @@ -622,7 +709,7 @@ bool TSysViewProcessor::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, str << Endl; } { - auto printTop = [&str] (const TTop& top) { + auto printTop = [&str] (const TQueryTop& top) { for (const auto& query : top) { str << " Hash: " << query.Hash << ", Value: " << query.Value @@ -675,6 +762,10 @@ bool TSysViewProcessor::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, << " Count: " << TopByRequestUnitsOneMinute.size() << Endl << Endl; str << "TopByRequestUnitsOneHour" << Endl << " Count: " << TopByRequestUnitsOneHour.size() << Endl << Endl; + str << "TopPartitionsOneMinute" << Endl + << " Count: " << TopPartitionsOneMinute.size() << Endl << Endl; + str << "TopPartitionsOneHour" << Endl + << " Count: " << TopPartitionsOneHour.size() << Endl << Endl; } } } diff --git a/ydb/core/sys_view/processor/processor_impl.h b/ydb/core/sys_view/processor/processor_impl.h index 6db54e815c..1211671c30 100644 --- a/ydb/core/sys_view/processor/processor_impl.h +++ b/ydb/core/sys_view/processor/processor_impl.h @@ -41,6 +41,7 @@ private: struct TTxAggregate; struct TTxIntervalSummary; struct TTxIntervalMetrics; + struct TTxTopPartitions; struct TEvPrivate { enum EEv { @@ -72,15 +73,25 @@ private: TNodeId NodeId; THolder<NKikimrSysView::TQueryStats> Stats; }; - using TTop = std::vector<TTopQuery>; + using TQueryTop = std::vector<TTopQuery>; - using TResultKey = std::pair<ui64, ui32>; + using TPartitionTop = std::vector<THolder<NKikimrSysView::TTopPartitionsInfo>>; + + using THistoryKey = std::pair<ui64, ui32>; template <typename TEntry> - using TResultMap = std::map<TResultKey, TEntry, std::less<TResultKey>, - NActors::NMemory::TAlloc<std::pair<const TResultKey, TEntry>, MemoryLabelResults>>; + using TResultMap = std::map<THistoryKey, TEntry, std::less<THistoryKey>, + NActors::NMemory::TAlloc<std::pair<const THistoryKey, TEntry>, MemoryLabelResults>>; + using TResultStatsMap = TResultMap<NKikimrSysView::TQueryStats>; + using TResultPartitionsMap = TResultMap<NKikimrSysView::TTopPartitionsInfo>; + + struct TQueryToMetrics { + NKikimrSysView::TQueryMetrics Metrics; + TString Text; + }; + private: static bool TopQueryCompare(const TTopQuery& l, const TTopQuery& r) { return l.Value == r.Value ? l.Hash > r.Hash : l.Value > r.Value; @@ -102,7 +113,10 @@ private: void Handle(TEvPrivate::TEvProcess::TPtr& ev); void Handle(TEvSysView::TEvIntervalQuerySummary::TPtr& ev); void Handle(TEvSysView::TEvGetIntervalMetricsResponse::TPtr& ev); + void Handle(TEvSysView::TEvSendTopPartitions::TPtr& ev); + void Handle(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev); + void Handle(TEvSysView::TEvGetTopPartitionsRequest::TPtr& ev); void Handle(TEvSysView::TEvSendDbCountersRequest::TPtr& ev); void Handle(TEvPrivate::TEvApplyCounters::TPtr& ev); @@ -121,10 +135,14 @@ private: void PersistIntervalEnd(NIceDb::TNiceDb& db); template <typename TSchema> - void PersistTopResults(NIceDb::TNiceDb& db, - TTop& top, TResultStatsMap& results, TInstant intervalEnd); + void PersistQueryTopResults(NIceDb::TNiceDb& db, + TQueryTop& top, TResultStatsMap& results, TInstant intervalEnd); + void PersistQueryResults(NIceDb::TNiceDb& db); - void PersistResults(NIceDb::TNiceDb& db); + template <typename TSchema> + void PersistPartitionTopResults(NIceDb::TNiceDb& db, + TPartitionTop& top, TResultPartitionsMap& results, TInstant intervalEnd); + void PersistPartitionResults(NIceDb::TNiceDb& db); void ScheduleAggregate(); void ScheduleCollect(); @@ -132,9 +150,8 @@ private: void ScheduleApplyCounters(); void ScheduleSendNavigate(); - template <typename TSchema, typename TEntry> - void CutHistory(NIceDb::TNiceDb& db, TResultMap<TEntry>& results, - TDuration historySize); + template <typename TSchema, typename TMap> + void CutHistory(NIceDb::TNiceDb& db, TMap& results, TDuration historySize); static TInstant EndOfHourInterval(TInstant intervalEnd); @@ -145,11 +162,16 @@ private: void SendRequests(); void IgnoreFailure(TNodeId nodeId); + static void EntryToProto(NKikimrSysView::TQueryMetricsEntry& dst, const TQueryToMetrics& src); + static void EntryToProto(NKikimrSysView::TQueryStatsEntry& dst, const NKikimrSysView::TQueryStats& src); + static void EntryToProto(NKikimrSysView::TTopPartitionsEntry& dst, const NKikimrSysView::TTopPartitionsInfo& src); + template <typename TResponse> - void ReplyOverloaded(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev); + void ReplyOverloaded(const TActorId& sender); + + template <typename TMap, typename TRequest, typename TResponse> + void Reply(typename TRequest::TPtr& ev); - template <typename TEntry, typename TResponse> - void Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev); TIntrusivePtr<IDbCounters> CreateCountersForService(NKikimrSysView::EDbCountersService service); void AttachExternalCounters(); @@ -165,6 +187,8 @@ private: IgnoreFunc(TEvSysView::TEvIntervalQuerySummary); IgnoreFunc(TEvSysView::TEvGetIntervalMetricsResponse); IgnoreFunc(TEvSysView::TEvGetQueryMetricsRequest); + IgnoreFunc(TEvSysView::TEvSendTopPartitions); + IgnoreFunc(TEvSysView::TEvGetTopPartitionsRequest); IgnoreFunc(TEvSysView::TEvSendDbCountersRequest); default: if (!HandleDefaultEvents(ev, ctx)) { @@ -181,6 +205,8 @@ private: IgnoreFunc(TEvSysView::TEvIntervalQuerySummary); IgnoreFunc(TEvSysView::TEvGetIntervalMetricsResponse); IgnoreFunc(TEvSysView::TEvGetQueryMetricsRequest); + IgnoreFunc(TEvSysView::TEvSendTopPartitions); + IgnoreFunc(TEvSysView::TEvGetTopPartitionsRequest); IgnoreFunc(TEvSysView::TEvSendDbCountersRequest); default: if (!HandleDefaultEvents(ev, ctx)) { @@ -200,6 +226,8 @@ private: hFunc(TEvSysView::TEvIntervalQuerySummary, Handle); hFunc(TEvSysView::TEvGetIntervalMetricsResponse, Handle); hFunc(TEvSysView::TEvGetQueryMetricsRequest, Handle); + hFunc(TEvSysView::TEvSendTopPartitions, Handle); + hFunc(TEvSysView::TEvGetTopPartitionsRequest, Handle); hFunc(TEvSysView::TEvSendDbCountersRequest, Handle); hFunc(TEvPrivate::TEvApplyCounters, Handle); hFunc(TEvPrivate::TEvSendNavigate, Handle); @@ -261,10 +289,6 @@ private: std::unordered_set<TNodeId> SummaryNodes; // IntervalMetrics - struct TQueryToMetrics { - NKikimrSysView::TQueryMetrics Metrics; - TString Text; - }; std::unordered_map<TQueryHash, TQueryToMetrics> QueryMetrics; // NodesToRequest @@ -282,14 +306,14 @@ private: std::unordered_map<TNodeId, TNodeToQueries> NodesInFlight; // IntervalTops - TTop ByDurationMinute; - TTop ByReadBytesMinute; - TTop ByCpuTimeMinute; - TTop ByRequestUnitsMinute; - TTop ByDurationHour; - TTop ByReadBytesHour; - TTop ByCpuTimeHour; - TTop ByRequestUnitsHour; + TQueryTop ByDurationMinute; + TQueryTop ByReadBytesMinute; + TQueryTop ByCpuTimeMinute; + TQueryTop ByRequestUnitsMinute; + TQueryTop ByDurationHour; + TQueryTop ByReadBytesHour; + TQueryTop ByCpuTimeHour; + TQueryTop ByRequestUnitsHour; // Metrics... using TResultMetricsMap = TResultMap<TQueryToMetrics>; @@ -307,9 +331,22 @@ private: TResultStatsMap TopByRequestUnitsOneMinute; TResultStatsMap TopByRequestUnitsOneHour; + // IntervalPartitionTops + TPartitionTop PartitionTopMinute; + TPartitionTop PartitionTopHour; + + // TopPartitions... + TResultPartitionsMap TopPartitionsOneMinute; + TResultPartitionsMap TopPartitionsOneHour; + // limited queue of user requests static constexpr size_t PendingRequestsLimit = 5; - std::queue<TEvSysView::TEvGetQueryMetricsRequest::TPtr> PendingRequests; + + using TVariantRequestPtr = std::variant< + TEvSysView::TEvGetQueryMetricsRequest::TPtr, + TEvSysView::TEvGetTopPartitionsRequest::TPtr>; + + std::queue<TVariantRequestPtr> PendingRequests; bool ProcessInFly = false; // db counters diff --git a/ydb/core/sys_view/processor/schema.h b/ydb/core/sys_view/processor/schema.h index 159ad84f58..e1684b3aaa 100644 --- a/ydb/core/sys_view/processor/schema.h +++ b/ydb/core/sys_view/processor/schema.h @@ -60,7 +60,7 @@ struct TProcessorSchema : NIceDb::Schema { ByDuration, ByReadBytes, ByCpuTime, ByRequestUnits>; }; -#define RESULT_TABLE(TableName, TableID) \ +#define RESULT_QUERY_TABLE(TableName, TableID) \ struct TableName : Table<TableID> { \ struct IntervalEnd : Column<1, NScheme::NTypeIds::Timestamp> {}; \ struct Rank : Column<2, NScheme::NTypeIds::Uint32> {}; \ @@ -71,18 +71,44 @@ struct TProcessorSchema : NIceDb::Schema { using TColumns = TableColumns<IntervalEnd, Rank, Text, Data>; \ }; - RESULT_TABLE(MetricsOneMinute, 6) - RESULT_TABLE(MetricsOneHour, 7) - RESULT_TABLE(TopByDurationOneMinute, 8) - RESULT_TABLE(TopByDurationOneHour, 9) - RESULT_TABLE(TopByReadBytesOneMinute, 10) - RESULT_TABLE(TopByReadBytesOneHour, 11) - RESULT_TABLE(TopByCpuTimeOneMinute, 12) - RESULT_TABLE(TopByCpuTimeOneHour, 13) - RESULT_TABLE(TopByRequestUnitsOneMinute, 14) - RESULT_TABLE(TopByRequestUnitsOneHour, 15) + RESULT_QUERY_TABLE(MetricsOneMinute, 6) + RESULT_QUERY_TABLE(MetricsOneHour, 7) + RESULT_QUERY_TABLE(TopByDurationOneMinute, 8) + RESULT_QUERY_TABLE(TopByDurationOneHour, 9) + RESULT_QUERY_TABLE(TopByReadBytesOneMinute, 10) + RESULT_QUERY_TABLE(TopByReadBytesOneHour, 11) + RESULT_QUERY_TABLE(TopByCpuTimeOneMinute, 12) + RESULT_QUERY_TABLE(TopByCpuTimeOneHour, 13) + RESULT_QUERY_TABLE(TopByRequestUnitsOneMinute, 14) + RESULT_QUERY_TABLE(TopByRequestUnitsOneHour, 15) + +#undef RESULT_QUERY_TABLE + + struct IntervalPartitionTops : Table<16> { + struct TypeCol : Column<1, NScheme::NTypeIds::Uint32> { + static TString GetColumnName(const TString&) { return "Type"; } + }; + struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {}; + struct Data : Column<3, NScheme::NTypeIds::String> {}; + + using TKey = TableKey<TypeCol, TabletId>; + using TColumns = TableColumns<TypeCol, TabletId, Data>; + }; + +#define RESULT_PARTITION_TABLE(TableName, TableID) \ + struct TableName : Table<TableID> { \ + struct IntervalEnd : Column<1, NScheme::NTypeIds::Timestamp> {}; \ + struct Rank : Column<2, NScheme::NTypeIds::Uint32> {}; \ + struct Data : Column<3, NScheme::NTypeIds::String> {}; \ + \ + using TKey = TableKey<IntervalEnd, Rank>; \ + using TColumns = TableColumns<IntervalEnd, Rank, Data>; \ + }; + + RESULT_PARTITION_TABLE(TopPartitionsOneMinute, 17) + RESULT_PARTITION_TABLE(TopPartitionsOneHour, 18) -#undef RESULT_TABLE +#undef RESULT_PARTITION_TABLE using TTables = SchemaTables< SysParams, @@ -99,7 +125,10 @@ struct TProcessorSchema : NIceDb::Schema { TopByCpuTimeOneMinute, TopByCpuTimeOneHour, TopByRequestUnitsOneMinute, - TopByRequestUnitsOneHour + TopByRequestUnitsOneHour, + IntervalPartitionTops, + TopPartitionsOneMinute, + TopPartitionsOneHour >; using TSettings = SchemaSettings< diff --git a/ydb/core/sys_view/processor/tx_aggregate.cpp b/ydb/core/sys_view/processor/tx_aggregate.cpp index 654b47a689..ef00be464c 100644 --- a/ydb/core/sys_view/processor/tx_aggregate.cpp +++ b/ydb/core/sys_view/processor/tx_aggregate.cpp @@ -97,7 +97,7 @@ struct TSysViewProcessor::TTxAggregate : public TTxBase { Self->ClearIntervalSummaries(db); if (Self->NodesToRequest.empty()) { - Self->PersistResults(db); + Self->PersistQueryResults(db); } Self->CurrentStage = AGGREGATE; diff --git a/ydb/core/sys_view/processor/tx_collect.cpp b/ydb/core/sys_view/processor/tx_collect.cpp index 67a197c366..d04cc5edb0 100644 --- a/ydb/core/sys_view/processor/tx_collect.cpp +++ b/ydb/core/sys_view/processor/tx_collect.cpp @@ -16,8 +16,9 @@ struct TSysViewProcessor::TTxCollect : public TTxBase { NIceDb::TNiceDb db(txc.DB); if (!Self->NodesInFlight.empty() || !Self->NodesToRequest.empty()) { - Self->PersistResults(db); + Self->PersistQueryResults(db); } + Self->PersistPartitionResults(db); Self->Reset(db, ctx); diff --git a/ydb/core/sys_view/processor/tx_init.cpp b/ydb/core/sys_view/processor/tx_init.cpp index 89c7fd637f..71a5a13581 100644 --- a/ydb/core/sys_view/processor/tx_init.cpp +++ b/ydb/core/sys_view/processor/tx_init.cpp @@ -10,8 +10,8 @@ struct TSysViewProcessor::TTxInit : public TTxBase { TTxType GetTxType() const override { return TXTYPE_INIT; } - template <typename TSchema, typename TEntry> - bool LoadResults(NIceDb::TNiceDb& db, TResultMap<TEntry>& results) { + template <typename TSchema, typename TMap> + bool LoadQueryResults(NIceDb::TNiceDb& db, TMap& results) { results.clear(); auto rowset = db.Table<TSchema>().Range().Select(); @@ -27,7 +27,8 @@ struct TSysViewProcessor::TTxInit : public TTxBase { auto key = std::make_pair(intervalEnd, rank); auto& result = results[key]; - if constexpr (std::is_same<TEntry, TQueryToMetrics>::value) { + + if constexpr (std::is_same<typename TMap::mapped_type, TQueryToMetrics>::value) { result.Text = std::move(text); if (data) { Y_PROTOBUF_SUPPRESS_NODISCARD result.Metrics.ParseFromString(data); @@ -46,11 +47,43 @@ struct TSysViewProcessor::TTxInit : public TTxBase { SVLOG_D("[" << Self->TabletID() << "] Loading results: " << "table# " << TSchema::TableId - << ", results count# " << results.size()); + << ", result count# " << results.size()); + + return true; + }; + + template <typename S> + bool LoadPartitionResults(NIceDb::TNiceDb& db, TSelf::TResultPartitionsMap& results) { + results.clear(); + auto rowset = db.Table<S>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + ui64 intervalEnd = rowset.template GetValue<typename S::IntervalEnd>(); + ui32 rank = rowset.template GetValue<typename S::Rank>(); + TString data = rowset.template GetValue<typename S::Data>(); + + auto key = std::make_pair(intervalEnd, rank); + auto& result = results[key]; + if (data) { + Y_PROTOBUF_SUPPRESS_NODISCARD result.ParseFromString(data); + } + + if (!rowset.Next()) { + return false; + } + } + + SVLOG_D("[" << Self->TabletID() << "] Loading results: " + << "table# " << S::TableId + << ", result count# " << results.size()); return true; }; + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { SVLOG_D("[" << Self->TabletID() << "] TTxInit::Execute"); @@ -72,6 +105,9 @@ struct TSysViewProcessor::TTxInit : public TTxBase { auto cpuTimeOneHourRowset = db.Table<Schema::TopByCpuTimeOneHour>().Range().Select(); auto reqUnitsOneMinuteRowset = db.Table<Schema::TopByRequestUnitsOneMinute>().Range().Select(); auto reqUnitsOneHourRowset = db.Table<Schema::TopByRequestUnitsOneHour>().Range().Select(); + auto intervalPartitionTopsRowset = db.Table<Schema::IntervalPartitionTops>().Range().Select(); + auto topPartitionsOneMinuteRowset = db.Table<Schema::TopPartitionsOneMinute>().Range().Select(); + auto topPartitionsOneHourRowset = db.Table<Schema::TopPartitionsOneHour>().Range().Select(); if (!sysParamsRowset.IsReady() || !intervalSummariesRowset.IsReady() || @@ -87,7 +123,10 @@ struct TSysViewProcessor::TTxInit : public TTxBase { !cpuTimeOneMinuteRowset.IsReady() || !cpuTimeOneHourRowset.IsReady() || !reqUnitsOneMinuteRowset.IsReady() || - !reqUnitsOneHourRowset.IsReady()) + !reqUnitsOneHourRowset.IsReady() || + !intervalPartitionTopsRowset.IsReady() || + !topPartitionsOneMinuteRowset.IsReady() || + !topPartitionsOneHourRowset.IsReady()) { return false; } @@ -251,7 +290,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase { Self->ByRequestUnitsHour.emplace_back(std::move(query)); break; default: - SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected stats type: " << type); + SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected query stats type: " << type); } ++queryCount; @@ -269,7 +308,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase { std::sort(Self->ByRequestUnitsMinute.begin(), Self->ByRequestUnitsMinute.end(), TopQueryCompare); std::sort(Self->ByRequestUnitsHour.begin(), Self->ByRequestUnitsHour.end(), TopQueryCompare); - SVLOG_D("[" << Self->TabletID() << "] Loading interval tops: " + SVLOG_D("[" << Self->TabletID() << "] Loading interval query tops: " << "total query count# " << queryCount); } @@ -320,28 +359,84 @@ struct TSysViewProcessor::TTxInit : public TTxBase { } // Metrics... - if (!LoadResults<Schema::MetricsOneMinute, TQueryToMetrics>(db, Self->MetricsOneMinute)) + if (!LoadQueryResults<Schema::MetricsOneMinute>(db, Self->MetricsOneMinute)) return false; - if (!LoadResults<Schema::MetricsOneHour, TQueryToMetrics>(db, Self->MetricsOneHour)) + if (!LoadQueryResults<Schema::MetricsOneHour>(db, Self->MetricsOneHour)) return false; // TopBy... - using TStats = NKikimrSysView::TQueryStats; - if (!LoadResults<Schema::TopByDurationOneMinute, TStats>(db, Self->TopByDurationOneMinute)) + if (!LoadQueryResults<Schema::TopByDurationOneMinute>(db, Self->TopByDurationOneMinute)) + return false; + if (!LoadQueryResults<Schema::TopByDurationOneHour>(db, Self->TopByDurationOneHour)) return false; - if (!LoadResults<Schema::TopByDurationOneHour, TStats>(db, Self->TopByDurationOneHour)) + if (!LoadQueryResults<Schema::TopByReadBytesOneMinute>(db, Self->TopByReadBytesOneMinute)) return false; - if (!LoadResults<Schema::TopByReadBytesOneMinute, TStats>(db, Self->TopByReadBytesOneMinute)) + if (!LoadQueryResults<Schema::TopByReadBytesOneHour>(db, Self->TopByReadBytesOneHour)) return false; - if (!LoadResults<Schema::TopByReadBytesOneHour, TStats>(db, Self->TopByReadBytesOneHour)) + if (!LoadQueryResults<Schema::TopByCpuTimeOneMinute>(db, Self->TopByCpuTimeOneMinute)) return false; - if (!LoadResults<Schema::TopByCpuTimeOneMinute, TStats>(db, Self->TopByCpuTimeOneMinute)) + if (!LoadQueryResults<Schema::TopByCpuTimeOneHour>(db, Self->TopByCpuTimeOneHour)) return false; - if (!LoadResults<Schema::TopByCpuTimeOneHour, TStats>(db, Self->TopByCpuTimeOneHour)) + if (!LoadQueryResults<Schema::TopByRequestUnitsOneMinute>(db, Self->TopByRequestUnitsOneMinute)) return false; - if (!LoadResults<Schema::TopByRequestUnitsOneMinute, TStats>(db, Self->TopByRequestUnitsOneMinute)) + if (!LoadQueryResults<Schema::TopByRequestUnitsOneHour>(db, Self->TopByRequestUnitsOneHour)) + return false; + + // IntervalPartitionTops + { + Self->PartitionTopMinute.clear(); + Self->PartitionTopMinute.reserve(TOP_PARTITIONS_COUNT); + Self->PartitionTopHour.clear(); + Self->PartitionTopHour.reserve(TOP_PARTITIONS_COUNT); + + auto rowset = db.Table<Schema::IntervalPartitionTops>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + size_t partCount = 0; + while (!rowset.EndOfSet()) { + ui32 type = rowset.GetValue<Schema::IntervalPartitionTops::TypeCol>(); + TString data = rowset.GetValue<Schema::IntervalPartitionTops::Data>(); + + if (data) { + auto partition = MakeHolder<NKikimrSysView::TTopPartitionsInfo>(); + Y_PROTOBUF_SUPPRESS_NODISCARD partition->ParseFromString(data); + + switch ((NKikimrSysView::EStatsType)type) { + case NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE: + Self->PartitionTopMinute.emplace_back(std::move(partition)); + break; + case NKikimrSysView::TOP_PARTITIONS_ONE_HOUR: + Self->PartitionTopHour.emplace_back(std::move(partition)); + break; + default: + SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected partition stats type: " << type); + } + ++partCount; + } + + if (!rowset.Next()) { + return false; + } + } + + auto compare = [] (const auto& l, const auto& r) { + return l->GetCPUCores() == r->GetCPUCores() ? + l->GetTabletId() < r->GetTabletId() : l->GetCPUCores() > r->GetCPUCores(); + }; + + std::sort(Self->PartitionTopMinute.begin(), Self->PartitionTopMinute.end(), compare); + std::sort(Self->PartitionTopHour.begin(), Self->PartitionTopHour.end(), compare); + + SVLOG_D("[" << Self->TabletID() << "] Loading interval partition tops: " + << "partition count# " << partCount); + } + + // TopPartitions... + if (!LoadPartitionResults<Schema::TopPartitionsOneMinute>(db, Self->TopPartitionsOneMinute)) return false; - if (!LoadResults<Schema::TopByRequestUnitsOneHour, TStats>(db, Self->TopByRequestUnitsOneHour)) + if (!LoadPartitionResults<Schema::TopPartitionsOneHour>(db, Self->TopPartitionsOneHour)) return false; auto deadline = Self->IntervalEnd + Self->TotalInterval; diff --git a/ydb/core/sys_view/processor/tx_init_schema.cpp b/ydb/core/sys_view/processor/tx_init_schema.cpp index e0485d741f..8a1df28d68 100644 --- a/ydb/core/sys_view/processor/tx_init_schema.cpp +++ b/ydb/core/sys_view/processor/tx_init_schema.cpp @@ -25,7 +25,9 @@ struct TSysViewProcessor::TTxInitSchema : public TTxBase { Schema::TopByCpuTimeOneMinute::TableId, Schema::TopByCpuTimeOneHour::TableId, Schema::TopByRequestUnitsOneMinute::TableId, - Schema::TopByRequestUnitsOneHour::TableId + Schema::TopByRequestUnitsOneHour::TableId, + Schema::TopPartitionsOneMinute::TableId, + Schema::TopPartitionsOneHour::TableId }; for (auto id : resultTableIds) { diff --git a/ydb/core/sys_view/processor/tx_interval_metrics.cpp b/ydb/core/sys_view/processor/tx_interval_metrics.cpp index 4c6d992487..41cd7eeb5a 100644 --- a/ydb/core/sys_view/processor/tx_interval_metrics.cpp +++ b/ydb/core/sys_view/processor/tx_interval_metrics.cpp @@ -52,7 +52,7 @@ struct TSysViewProcessor::TTxIntervalMetrics : public TTxBase { NIceDb::TUpdate<Schema::IntervalMetrics::Metrics>(serialized)); } - auto fillTops = [&] (TTop& minuteTop, TTop& hourTop, + auto fillTops = [&] (TQueryTop& minuteTop, TQueryTop& hourTop, NKikimrSysView::EStatsType minuteType, NKikimrSysView::EStatsType hourType, const NProtoBuf::RepeatedPtrField<NKikimrSysView::TQueryStats>& queryStats) { @@ -102,14 +102,16 @@ struct TSysViewProcessor::TTxIntervalMetrics : public TTxBase { db.Table<Schema::NodesToRequest>().Key(NodeId).Delete(); if (Self->NodesInFlight.empty() && Self->NodesToRequest.empty()) { - Self->PersistResults(db); - } else { - Self->SendRequests(); + Self->PersistQueryResults(db); } return true; } void Complete(const TActorContext&) override { + if (!Self->NodesToRequest.empty()) { + Self->SendRequests(); + } + SVLOG_D("[" << Self->TabletID() << "] TTxIntervalMetrics::Complete"); } }; diff --git a/ydb/core/sys_view/processor/tx_interval_summary.cpp b/ydb/core/sys_view/processor/tx_interval_summary.cpp index 35fe2789e1..6a89374eb8 100644 --- a/ydb/core/sys_view/processor/tx_interval_summary.cpp +++ b/ydb/core/sys_view/processor/tx_interval_summary.cpp @@ -68,9 +68,9 @@ struct TSysViewProcessor::TTxIntervalSummary : public TTxBase { TNodeId nodeId, NKikimrSysView::EStatsType statsType, const NKikimrSysView::TEvIntervalQuerySummary::TQuerySet& queries, - TTop& top) + TQueryTop& top) { - TTop result; + TQueryTop result; std::unordered_set<TQueryHash> seenHashes; size_t queryIndex = 0; auto topIt = top.begin(); diff --git a/ydb/core/sys_view/processor/tx_top_partitions.cpp b/ydb/core/sys_view/processor/tx_top_partitions.cpp new file mode 100644 index 0000000000..2048e40e83 --- /dev/null +++ b/ydb/core/sys_view/processor/tx_top_partitions.cpp @@ -0,0 +1,121 @@ +#include "processor_impl.h" + +namespace NKikimr::NSysView { + +struct TSysViewProcessor::TTxTopPartitions : public TTxBase { + NKikimrSysView::TEvSendTopPartitions Record; + + TTxTopPartitions(TSelf* self, NKikimrSysView::TEvSendTopPartitions&& record) + : TTxBase(self) + , Record(std::move(record)) + {} + + TTxType GetTxType() const override { return TXTYPE_TOP_PARTITIONS; } + + void ProcessTop(NIceDb::TNiceDb& db, NKikimrSysView::EStatsType statsType, + TPartitionTop& top) + { + TPartitionTop result; + result.reserve(TOP_PARTITIONS_COUNT); + std::unordered_set<ui64> seen; + size_t index = 0; + auto topIt = top.begin(); + + auto copyNewPartition = [&] () { + const auto& newPartition = Record.GetPartitions(index); + auto tabletId = newPartition.GetTabletId(); + + TString data; + Y_PROTOBUF_SUPPRESS_NODISCARD newPartition.SerializeToString(&data); + + auto partition = MakeHolder<NKikimrSysView::TTopPartitionsInfo>(); + partition->CopyFrom(newPartition); + result.emplace_back(std::move(partition)); + + db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, tabletId).Update( + NIceDb::TUpdate<Schema::IntervalPartitionTops::Data>(data)); + + seen.insert(tabletId); + ++index; + }; + + while (result.size() < TOP_PARTITIONS_COUNT) { + if (topIt == top.end()) { + if (index == Record.PartitionsSize()) { + break; + } + auto tabletId = Record.GetPartitions(index).GetTabletId(); + if (seen.find(tabletId) != seen.end()) { + ++index; + continue; + } + copyNewPartition(); + } else { + auto topTabletId = (*topIt)->GetTabletId(); + if (seen.find(topTabletId) != seen.end()) { + ++topIt; + continue; + } + if (index == Record.PartitionsSize()) { + result.emplace_back(std::move(*topIt++)); + seen.insert(topTabletId); + continue; + } + auto& newPartition = Record.GetPartitions(index); + auto tabletId = newPartition.GetTabletId(); + if (seen.find(tabletId) != seen.end()) { + ++index; + continue; + } + if ((*topIt)->GetCPUCores() >= newPartition.GetCPUCores()) { + result.emplace_back(std::move(*topIt++)); + seen.insert(topTabletId); + } else { + copyNewPartition(); + } + } + } + + for (; topIt != top.end(); ++topIt) { + auto topTabletId = (*topIt)->GetTabletId(); + if (seen.find(topTabletId) != seen.end()) { + continue; + } + db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, topTabletId).Delete(); + } + + top.swap(result); + } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + SVLOG_D("[" << Self->TabletID() << "] TTxTopPartitions::Execute: " + << "partition count# " << Record.PartitionsSize()); + + NIceDb::TNiceDb db(txc.DB); + ProcessTop(db, NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE, Self->PartitionTopMinute); + ProcessTop(db, NKikimrSysView::TOP_PARTITIONS_ONE_HOUR, Self->PartitionTopHour); + + return true; + } + + void Complete(const TActorContext&) override { + SVLOG_D("[" << Self->TabletID() << "] TTxTopPartitions::Complete"); + } +}; + +void TSysViewProcessor::Handle(TEvSysView::TEvSendTopPartitions::TPtr& ev) { + auto& record = ev->Get()->Record; + auto timeUs = record.GetTimeUs(); + auto partitionIntervalEnd = IntervalEnd + TotalInterval; + + if (timeUs < IntervalEnd.MicroSeconds() || timeUs >= partitionIntervalEnd.MicroSeconds()) { + SVLOG_W("[" << TabletID() << "] TEvSendTopPartitions, time mismath: " + << ", partition interval end# " << partitionIntervalEnd + << ", event time# " << TInstant::MicroSeconds(timeUs)); + return; + } + + Execute(new TTxTopPartitions(this, std::move(record)), TActivationContext::AsActorContext()); +} + +} // NKikimr::NSysView diff --git a/ydb/core/sys_view/processor/ya.make b/ydb/core/sys_view/processor/ya.make index ddf4664cfa..d98787c9b2 100644 --- a/ydb/core/sys_view/processor/ya.make +++ b/ydb/core/sys_view/processor/ya.make @@ -20,6 +20,7 @@ SRCS( tx_aggregate.cpp tx_interval_summary.cpp tx_interval_metrics.cpp + tx_top_partitions.cpp ) PEERDIR( diff --git a/ydb/core/sys_view/query_stats/query_metrics.cpp b/ydb/core/sys_view/query_stats/query_metrics.cpp index 5f2acb5c23..d303ea3978 100644 --- a/ydb/core/sys_view/query_stats/query_metrics.cpp +++ b/ydb/core/sys_view/query_stats/query_metrics.cpp @@ -1,17 +1,9 @@ #include "query_metrics.h" -#include <ydb/core/sys_view/common/common.h> #include <ydb/core/sys_view/common/events.h> -#include <ydb/core/sys_view/common/keys.h> -#include <ydb/core/sys_view/common/schema.h> -#include <ydb/core/sys_view/common/scan_actor_base_impl.h> +#include <ydb/core/sys_view/common/processor_scan.h> -#include <ydb/core/base/tablet_pipecache.h> - -#include <library/cpp/actors/core/hfunc.h> - -namespace NKikimr { -namespace NSysView { +namespace NKikimr::NSysView { using namespace NActors; @@ -25,149 +17,66 @@ void SetField<1>(NKikimrSysView::TQueryMetricsKey& key, ui32 value) { key.SetRank(value); } -class TQueryMetricsScan : public TScanActorBase<TQueryMetricsScan> { -public: - using TBase = TScanActorBase<TQueryMetricsScan>; - - static constexpr auto ActorActivityType() { - return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN; - } - - TQueryMetricsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, - const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns) - : TBase(ownerId, scanId, tableId, tableRange, columns) - { - ConvertKeyRange<NKikimrSysView::TEvGetQueryMetricsRequest, ui64, ui32>(Request, TableRange); - Request.SetType(NKikimrSysView::METRICS_ONE_MINUTE); - } - - STFUNC(StateScan) { - switch (ev->GetTypeRewrite()) { - hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); - hFunc(TEvSysView::TEvGetQueryMetricsResponse, Handle); - hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); - hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution); - cFunc(TEvents::TEvWakeup::EventType, HandleTimeout); - cFunc(TEvents::TEvPoison::EventType, PassAway); - default: - LOG_CRIT(ctx, NKikimrServices::SYSTEM_VIEWS, - "NSysView::TQueryMetricsScan: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); - } - } - -private: - void ProceedToScan() override { - Become(&TThis::StateScan); - if (AckReceived) { - RequestBatch(); - } - } - - void RequestBatch() { - if (!SysViewProcessorId) { - SVLOG_W("No sysview processor for database " << TenantName - << ", sending empty response"); - ReplyEmptyAndDie(); - return; - } - - if (BatchRequestInFlight) { - return; - } - - auto request = MakeHolder<TEvSysView::TEvGetQueryMetricsRequest>(); - request->Record.CopyFrom(Request); - - Send(MakePipePeNodeCacheID(false), - new TEvPipeCache::TEvForward(request.Release(), SysViewProcessorId, true), - IEventHandle::FlagTrackDelivery); - - BatchRequestInFlight = true; - } - - void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) { - RequestBatch(); - } - - void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) { - ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, "Delivery problem in query metrics scan"); - } - - void Handle(TEvSysView::TEvGetQueryMetricsResponse::TPtr& ev) { - using TEntry = NKikimrSysView::TQueryMetricsEntry; - using TExtractor = std::function<TCell(const TEntry&)>; - using TSchema = Schema::QueryMetrics; - - struct TExtractorsMap : public THashMap<NTable::TTag, TExtractor> { - TExtractorsMap() { - insert({TSchema::IntervalEnd::ColumnId, [] (const TEntry& entry) { - return TCell::Make<ui64>(entry.GetKey().GetIntervalEndUs()); - }}); - insert({TSchema::Rank::ColumnId, [] (const TEntry& entry) { - return TCell::Make<ui32>(entry.GetKey().GetRank()); - }}); - insert({TSchema::QueryText::ColumnId, [] (const TEntry& entry) { - const auto& text = entry.GetQueryText(); - return TCell(text.data(), text.size()); - }}); - insert({TSchema::Count::ColumnId, [] (const TEntry& entry) { - return TCell::Make<ui64>(entry.GetMetrics().GetCount()); - }}); - -#define ADD_METRICS(FieldName, MetricsName) \ - insert({TSchema::Sum ## FieldName::ColumnId, [] (const TEntry& entry) { \ - return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetSum()); \ - }}); \ - insert({TSchema::Min ## FieldName::ColumnId, [] (const TEntry& entry) { \ - return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetMin()); \ - }}); \ - insert({TSchema::Max ## FieldName::ColumnId, [] (const TEntry& entry) { \ - return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetMax()); \ - }}); - - ADD_METRICS(CPUTime, CpuTimeUs); - ADD_METRICS(Duration, DurationUs); - ADD_METRICS(ReadRows, ReadRows); - ADD_METRICS(ReadBytes, ReadBytes); - ADD_METRICS(UpdateRows, UpdateRows); - ADD_METRICS(UpdateBytes, UpdateBytes); - ADD_METRICS(DeleteRows, DeleteRows); - ADD_METRICS(RequestUnits, RequestUnits); +struct TQueryMetricsExtractorsMap : + public std::unordered_map<NTable::TTag, TExtractorFunc<NKikimrSysView::TQueryMetricsEntry>> +{ + using S = Schema::QueryMetrics; + using E = NKikimrSysView::TQueryMetricsEntry; + + TQueryMetricsExtractorsMap() { + insert({S::IntervalEnd::ColumnId, [] (const E& entry) { + return TCell::Make<ui64>(entry.GetKey().GetIntervalEndUs()); + }}); + insert({S::Rank::ColumnId, [] (const E& entry) { + return TCell::Make<ui32>(entry.GetKey().GetRank()); + }}); + insert({S::QueryText::ColumnId, [] (const E& entry) { + const auto& text = entry.GetQueryText(); + return TCell(text.data(), text.size()); + }}); + insert({S::Count::ColumnId, [] (const E& entry) { + return TCell::Make<ui64>(entry.GetMetrics().GetCount()); + }}); + +#define ADD_METRICS(FieldName, MetricsName) \ + insert({S::Sum ## FieldName::ColumnId, [] (const E& entry) { \ + return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetSum()); \ + }}); \ + insert({S::Min ## FieldName::ColumnId, [] (const E& entry) { \ + return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetMin()); \ + }}); \ + insert({S::Max ## FieldName::ColumnId, [] (const E& entry) { \ + return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetMax()); \ + }}); + + ADD_METRICS(CPUTime, CpuTimeUs); + ADD_METRICS(Duration, DurationUs); + ADD_METRICS(ReadRows, ReadRows); + ADD_METRICS(ReadBytes, ReadBytes); + ADD_METRICS(UpdateRows, UpdateRows); + ADD_METRICS(UpdateBytes, UpdateBytes); + ADD_METRICS(DeleteRows, DeleteRows); + ADD_METRICS(RequestUnits, RequestUnits); #undef ADD_METRICS - } - }; - - const auto& record = ev->Get()->Record; - if (record.HasOverloaded() && record.GetOverloaded()) { - ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, "SysViewProcessor is overloaded"); - return; - } - - ReplyBatch<TEvSysView::TEvGetQueryMetricsResponse, TEntry, TExtractorsMap, true>(ev); - - if (!record.GetLastBatch()) { - Y_VERIFY(record.HasNext()); - Request.MutableFrom()->CopyFrom(record.GetNext()); - Request.SetInclusiveFrom(true); - } - - BatchRequestInFlight = false; - } - - void PassAway() override { - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); - TBase::PassAway(); } - -private: - NKikimrSysView::TEvGetQueryMetricsRequest Request; }; THolder<IActor> CreateQueryMetricsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns) { - return MakeHolder<TQueryMetricsScan>(ownerId, scanId, tableId, tableRange, columns); + using TQueryMetricsScan = TProcessorScan< + NKikimrSysView::TQueryMetricsEntry, + NKikimrSysView::TEvGetQueryMetricsRequest, + NKikimrSysView::TEvGetQueryMetricsResponse, + TEvSysView::TEvGetQueryMetricsRequest, + TEvSysView::TEvGetQueryMetricsResponse, + TQueryMetricsExtractorsMap, + ui64, + ui32 + >; + + return MakeHolder<TQueryMetricsScan>(ownerId, scanId, tableId, tableRange, columns, + NKikimrSysView::METRICS_ONE_MINUTE); } -} // NSysView -} // NKikimr +} // NKikimr::NSysView diff --git a/ydb/core/sys_view/query_stats/query_stats.cpp b/ydb/core/sys_view/query_stats/query_stats.cpp index 918c9b781e..99134412fd 100644 --- a/ydb/core/sys_view/query_stats/query_stats.cpp +++ b/ydb/core/sys_view/query_stats/query_stats.cpp @@ -506,9 +506,11 @@ private: NKikimrSysView::TEvGetQueryMetricsRequest Request; }; -THolder<IActor> CreateQueryStatsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TStringBuf viewName, +THolder<IActor> CreateQueryStatsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns) { + auto viewName = tableId.SysViewInfo; + if (viewName == TopQueriesByDuration1MinuteName) { return MakeHolder<TQueryStatsScan<TDurationGreater>>(ownerId, scanId, tableId, tableRange, columns, NKikimrSysView::TOP_DURATION_ONE_MINUTE, diff --git a/ydb/core/sys_view/query_stats/query_stats.h b/ydb/core/sys_view/query_stats/query_stats.h index b9fe4b7ecc..a7a76b9b57 100644 --- a/ydb/core/sys_view/query_stats/query_stats.h +++ b/ydb/core/sys_view/query_stats/query_stats.h @@ -17,8 +17,7 @@ struct TQueryStatsBucketRange { explicit TQueryStatsBucketRange(const TSerializedTableRange& range, const TDuration& bucketSize); }; -THolder<IActor> CreateQueryStatsScan(const TActorId& ownerId, ui32 scanId, - const TTableId& tableId, const TStringBuf viewName, +THolder<IActor> CreateQueryStatsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns); } // NSysView diff --git a/ydb/core/sys_view/scan.cpp b/ydb/core/sys_view/scan.cpp index f337cb6ba7..9a2489607e 100644 --- a/ydb/core/sys_view/scan.cpp +++ b/ydb/core/sys_view/scan.cpp @@ -11,6 +11,7 @@ #include <ydb/core/sys_view/storage/storage_pools.h> #include <ydb/core/sys_view/storage/storage_stats.h> #include <ydb/core/sys_view/tablets/tablets.h> +#include <ydb/core/sys_view/partition_stats/top_partitions.h> namespace NKikimr { namespace NSysView { @@ -35,7 +36,7 @@ THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const tableId.SysViewInfo == TopQueriesByRequestUnits1MinuteName || tableId.SysViewInfo == TopQueriesByRequestUnits1HourName) { - return CreateQueryStatsScan(ownerId, scanId, tableId, tableId.SysViewInfo, tableRange, columns); + return CreateQueryStatsScan(ownerId, scanId, tableId, tableRange, columns); } if (tableId.SysViewInfo == PDisksName) { @@ -66,7 +67,13 @@ THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const return CreateQueryMetricsScan(ownerId, scanId, tableId, tableRange, columns); } - return nullptr; + if (tableId.SysViewInfo == TopPartitions1MinuteName || + tableId.SysViewInfo == TopPartitions1HourName) + { + return CreateTopPartitionsScan(ownerId, scanId, tableId, tableRange, columns); + } + + return {}; } } // NSysView diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp index 525bc5787a..8f67963059 100644 --- a/ydb/core/sys_view/ut_common.cpp +++ b/ydb/core/sys_view/ut_common.cpp @@ -59,6 +59,11 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool Server = new Tests::TServer(*Settings); Server->EnableGRpc(grpcPort); + auto* runtime = Server->GetRuntime(); + for (ui32 i = 0; i < runtime->GetNodeCount(); ++i) { + runtime->GetAppData(i).UsePartitionStatsCollectorForTests = true; + } + Client = MakeHolder<Tests::TClient>(*Settings); Tenants = MakeHolder<Tests::TTenants>(Server); diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index 56eb5af269..7a44050218 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -49,31 +49,30 @@ void CreateTenants(TTestEnv& env, bool extSchemeShard = true) { CreateTenant(env, "Tenant2", extSchemeShard); } -void CreateTables(TTestEnv& env) { +void CreateTable(auto& session, const TString& name, ui64 partitionCount = 1) { + auto desc = TTableBuilder() + .AddNullableColumn("Key", EPrimitiveType::Uint64) + .AddNullableColumn("Value", EPrimitiveType::String) + .SetPrimaryKeyColumns({"Key"}) + .Build(); + + auto settings = TCreateTableSettings(); + settings.PartitioningPolicy(TPartitioningPolicy().UniformPartitions(partitionCount)); + + session.CreateTable(name, std::move(desc), std::move(settings)).GetValueSync(); +} + +void CreateTables(TTestEnv& env, ui64 partitionCount = 1) { TTableClient client(env.GetDriver()); auto session = client.CreateSession().GetValueSync().GetSession(); - NKqp::AssertSuccessResult(session.ExecuteSchemeQuery(R"( - CREATE TABLE `Root/Table0` ( - Key Uint64, - Value String, - PRIMARY KEY (Key) - ); - )").GetValueSync()); - + CreateTable(session, "Root/Table0", partitionCount); NKqp::AssertSuccessResult(session.ExecuteDataQuery(R"( REPLACE INTO `Root/Table0` (Key, Value) VALUES (0u, "Z"); )", TTxControl::BeginTx().CommitTx()).GetValueSync()); - NKqp::AssertSuccessResult(session.ExecuteSchemeQuery(R"( - CREATE TABLE `Root/Tenant1/Table1` ( - Key Uint64, - Value String, - PRIMARY KEY (Key) - ); - )").GetValueSync()); - + CreateTable(session, "Root/Tenant1/Table1", partitionCount); NKqp::AssertSuccessResult(session.ExecuteDataQuery(R"( REPLACE INTO `Root/Tenant1/Table1` (Key, Value) VALUES (1u, "A"), @@ -81,14 +80,7 @@ void CreateTables(TTestEnv& env) { (3u, "C"); )", TTxControl::BeginTx().CommitTx()).GetValueSync()); - NKqp::AssertSuccessResult(session.ExecuteSchemeQuery(R"( - CREATE TABLE `Root/Tenant2/Table2` ( - Key Uint64, - Value String, - PRIMARY KEY (Key) - ); - )").GetValueSync()); - + CreateTable(session, "Root/Tenant2/Table2", partitionCount); NKqp::AssertSuccessResult(session.ExecuteDataQuery(R"( REPLACE INTO `Root/Tenant2/Table2` (Key, Value) VALUES (4u, "D"), @@ -96,9 +88,9 @@ void CreateTables(TTestEnv& env) { )", TTxControl::BeginTx().CommitTx()).GetValueSync()); } -void CreateTenantsAndTables(TTestEnv& env, bool extSchemeShard = true) { +void CreateTenantsAndTables(TTestEnv& env, bool extSchemeShard = true, ui64 partitionCount = 1) { CreateTenants(env, extSchemeShard); - CreateTables(env); + CreateTables(env, partitionCount); } void CreateRootTable(TTestEnv& env, ui64 partitionCount = 1) { @@ -1151,6 +1143,198 @@ Y_UNIT_TEST_SUITE(SystemView) { } } + size_t GetRowCount(TTableClient& client, const TString& name) { + TStringBuilder query; + query << "SELECT * FROM `" << name << "`"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto ysonString = NKqp::StreamResultToYson(it); + auto node = NYT::NodeFromYsonString(ysonString, ::NYson::EYsonType::Node); + UNIT_ASSERT(node.IsList()); + return node.AsList().size(); + } + + ui64 GetIntervalEnd(TTableClient& client, const TString& name) { + TStringBuilder query; + query << "SELECT MAX(IntervalEnd) FROM `" << name << "`"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto ysonString = NKqp::StreamResultToYson(it); + auto node = NYT::NodeFromYsonString(ysonString, ::NYson::EYsonType::Node); + UNIT_ASSERT(node.IsList()); + UNIT_ASSERT(node.AsList().size() == 1); + auto row = node.AsList()[0]; + UNIT_ASSERT(row.IsList()); + UNIT_ASSERT(row.AsList().size() == 1); + auto value = row.AsList()[0]; + UNIT_ASSERT(value.IsList()); + UNIT_ASSERT(value.AsList().size() == 1); + return value.AsList()[0].AsUint64(); + } + + Y_UNIT_TEST(TopPartitionsFields) { + NDataShard::gDbStatsReportInterval = TDuration::Seconds(0); + + auto nowUs = TInstant::Now().MicroSeconds(); + + TTestEnv env(1, 4, 0, true); + CreateTenantsAndTables(env); + + TTableClient client(env.GetDriver()); + size_t rowCount = 0; + for (size_t iter = 0; iter < 30 && !rowCount; ++iter) { + rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); + if (!rowCount) { + Sleep(TDuration::Seconds(1)); + } + } + ui64 intervalEnd = GetIntervalEnd(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); + + TStringBuilder query; + query << R"( + SELECT + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount + FROM `/Root/Tenant1/.sys/top_partitions_one_minute`)" + << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp)"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto ysonString = NKqp::StreamResultToYson(it); + + TYsonFieldChecker check(ysonString, 11); + check.Uint64(intervalEnd); // IntervalEnd + check.Uint64(1); // Rank + check.Uint64Greater(0); // TabletId + check.String("/Root/Tenant1/Table1"); // Path + check.Uint64GreaterOrEquals(nowUs); // PeakTime + check.DoubleGreaterOrEquals(0.); // CPUCores + check.Uint64Greater(0); // NodeId + check.Uint64Greater(0); // DataSize + check.Uint64(3); // RowCount + check.Uint64(0); // IndexSize + check.Uint64(0); // InFlightTxCount + } + + Y_UNIT_TEST(TopPartitionsTables) { + NDataShard::gDbStatsReportInterval = TDuration::Seconds(0); + + constexpr ui64 partitionCount = 5; + + TTestEnv env(1, 4, 0, true); + CreateTenantsAndTables(env, true, partitionCount); + + TTableClient client(env.GetDriver()); + size_t rowCount = 0; + for (size_t iter = 0; iter < 30 && rowCount < partitionCount; ++iter) { + rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); + if (rowCount < partitionCount) { + Sleep(TDuration::Seconds(1)); + } + } + auto check = [&] (const TString& name) { + ui64 intervalEnd = GetIntervalEnd(client, name); + TStringBuilder query; + query << "SELECT Rank "; + query << "FROM `" << name << "` "; + query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) "; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + NKqp::CompareYson("[[[1u]];[[2u]];[[3u]];[[4u]];[[5u]]]", NKqp::StreamResultToYson(it)); + }; + check("/Root/Tenant1/.sys/top_partitions_one_minute"); + check("/Root/Tenant1/.sys/top_partitions_one_hour"); + } + + Y_UNIT_TEST(TopPartitionsRanges) { + NDataShard::gDbStatsReportInterval = TDuration::Seconds(0); + + constexpr ui64 partitionCount = 5; + + TTestEnv env(1, 4, 0, true); + CreateTenantsAndTables(env, true, partitionCount); + + TTableClient client(env.GetDriver()); + size_t rowCount = 0; + for (size_t iter = 0; iter < 30 && rowCount < partitionCount; ++iter) { + rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); + if (rowCount < partitionCount) { + Sleep(TDuration::Seconds(5)); + } + } + ui64 intervalEnd = GetIntervalEnd(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); + { + TStringBuilder query; + query << "SELECT IntervalEnd, Rank "; + query << "FROM `/Root/Tenant1/.sys/top_partitions_one_minute` "; + query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) "; + query << "AND Rank > 3u"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TStringBuilder result; + result << "["; + result << "[[" << intervalEnd << "u];[4u]];"; + result << "[[" << intervalEnd << "u];[5u]];"; + result << "]"; + NKqp::CompareYson(result, NKqp::StreamResultToYson(it)); + } + { + TStringBuilder query; + query << "SELECT IntervalEnd, Rank "; + query << "FROM `/Root/Tenant1/.sys/top_partitions_one_minute` "; + query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) "; + query << "AND Rank >= 3u"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TStringBuilder result; + result << "["; + result << "[[" << intervalEnd << "u];[3u]];"; + result << "[[" << intervalEnd << "u];[4u]];"; + result << "[[" << intervalEnd << "u];[5u]];"; + result << "]"; + NKqp::CompareYson(result, NKqp::StreamResultToYson(it)); + } + { + TStringBuilder query; + query << "SELECT IntervalEnd, Rank "; + query << "FROM `/Root/Tenant1/.sys/top_partitions_one_minute` "; + query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) "; + query << "AND Rank < 3u"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TStringBuilder result; + result << "["; + result << "[[" << intervalEnd << "u];[1u]];"; + result << "[[" << intervalEnd << "u];[2u]];"; + result << "]"; + NKqp::CompareYson(result, NKqp::StreamResultToYson(it)); + } + { + TStringBuilder query; + query << "SELECT IntervalEnd, Rank "; + query << "FROM `/Root/Tenant1/.sys/top_partitions_one_minute` "; + query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) "; + query << "AND Rank <= 3u"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TStringBuilder result; + result << "["; + result << "[[" << intervalEnd << "u];[1u]];"; + result << "[[" << intervalEnd << "u];[2u]];"; + result << "[[" << intervalEnd << "u];[3u]];"; + result << "]"; + NKqp::CompareYson(result, NKqp::StreamResultToYson(it)); + } + } + Y_UNIT_TEST(OldEngineSystemView) { TTestEnv env; CreateRootTable(env); @@ -1345,7 +1529,7 @@ Y_UNIT_TEST_SUITE(SystemView) { UNIT_ASSERT_VALUES_EQUAL(entry.Type, ESchemeEntryType::Directory); auto children = result.GetChildren(); - UNIT_ASSERT_VALUES_EQUAL(children.size(), 16); + UNIT_ASSERT_VALUES_EQUAL(children.size(), 18); THashSet<TString> names; for (const auto& child : children) { @@ -1363,7 +1547,7 @@ Y_UNIT_TEST_SUITE(SystemView) { UNIT_ASSERT_VALUES_EQUAL(entry.Type, ESchemeEntryType::Directory); auto children = result.GetChildren(); - UNIT_ASSERT_VALUES_EQUAL(children.size(), 10); + UNIT_ASSERT_VALUES_EQUAL(children.size(), 12); THashSet<TString> names; for (const auto& child : children) { diff --git a/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp b/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp index f4f5db4ba7..4d7f18456b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp @@ -451,7 +451,7 @@ struct TSchemeShard::TTxScheduleConditionalErase : public TTransactionBase<TSche LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxScheduleConditionalErase Complete" << ": at schemeshard: " << Self->TabletID()); - if (StatsCollectorEv) { + if (StatsCollectorEv && Self->SysPartitionStatsCollector) { ctx.Send(Self->SysPartitionStatsCollector, StatsCollectorEv.Release()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index 458bb20af0..eea4325f98 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -23,7 +23,9 @@ static ui64 GetIops(const T& c) { } void TSchemeShard::Handle(NSysView::TEvSysView::TEvGetPartitionStats::TPtr& ev, const TActorContext& ctx) { - ctx.Send(ev->Forward(SysPartitionStatsCollector)); + if (SysPartitionStatsCollector) { + ctx.Send(ev->Forward(SysPartitionStatsCollector)); + } } auto TSchemeShard::BuildStatsForCollector(TPathId pathId, TShardIdx shardIdx, TTabletId datashardId, @@ -299,9 +301,11 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const startTime = rec.GetStartTime(); } - PendingMessages.emplace_back( - Self->SysPartitionStatsCollector, - Self->BuildStatsForCollector(pathId, shardIdx, datashardId, nodeId, startTime, newStats).Release()); + if (Self->SysPartitionStatsCollector) { + PendingMessages.emplace_back( + Self->SysPartitionStatsCollector, + Self->BuildStatsForCollector(pathId, shardIdx, datashardId, nodeId, startTime, newStats).Release()); + } } if (isOlapStore) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 9592b57f97..4b3532944f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -67,6 +67,10 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, LoginProvider.Audience = TPath::Init(subDomainPathId, this).PathString(); domainPtr->UpdateSecurityState(LoginProvider.GetSecurityState()); + TTabletId sysViewProcessorId = domainPtr->GetTenantSysViewProcessorID(); + SysPartitionStatsCollector = Register(NSysView::CreatePartitionStatsCollector( + GetDomainKey(subDomainPathId), sysViewProcessorId ? sysViewProcessorId.GetValue() : 0).Release()); + Execute(CreateTxInitPopulator(std::move(delayPublications)), ctx); if (tablesToClean) { @@ -3285,7 +3289,7 @@ void TSchemeShard::PersistRemoveTable(NIceDb::TNiceDb& db, TPathId pathId, const Tables.erase(pathId); DecrementPathDbRefCount(pathId, "remove table"); - if (AppData()->FeatureFlags.GetEnableSystemViews()) { + if (AppData()->FeatureFlags.GetEnableSystemViews() && SysPartitionStatsCollector) { auto ev = MakeHolder<NSysView::TEvSysView::TEvRemoveTable>(GetDomainKey(pathId), pathId); Send(SysPartitionStatsCollector, ev.Release()); } @@ -3826,8 +3830,6 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { TxAllocatorClient = RegisterWithSameMailbox(CreateTxAllocatorClient(CollectTxAllocators(appData))); - SysPartitionStatsCollector = Register(NSysView::CreatePartitionStatsCollector().Release()); - SplitSettings.Register(appData->Icb); Executor()->RegisterExternalTabletCounters(TabletCountersPtr); @@ -5834,7 +5836,9 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T auto path = TPath::Init(pathId, this); auto ev = MakeHolder<NSysView::TEvSysView::TEvSetPartitioning>(GetDomainKey(pathId), pathId, path.PathString()); ev->ShardIndices.swap(shardIndices); - Send(SysPartitionStatsCollector, ev.Release()); + if (SysPartitionStatsCollector) { + Send(SysPartitionStatsCollector, ev.Release()); + } } if (!tableInfo->IsBackup) { |