aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Dmitriev <alexd.65536@gmail.com>2022-07-05 21:00:40 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-07-05 21:00:40 +0300
commit014d4040c4b1be95e8260f823efcdae39cbbd439 (patch)
tree7d9d0629f782f1d3baabe5679bcdf495c7e6d0d1
parent3337ada763695431c2af0c1f1105b608bbcd8c01 (diff)
downloadydb-014d4040c4b1be95e8260f823efcdae39cbbd439.tar.gz
top partition tables KIKIMR-15061
REVIEW: 2671355 REVIEW: 2692125 x-ydb-stable-ref: be6405bab36a14ec899951f200cfd7f0e30ace03
-rw-r--r--ydb/core/base/appdata.h1
-rw-r--r--ydb/core/protos/counters_sysview_processor.proto1
-rw-r--r--ydb/core/protos/sys_view.proto50
-rw-r--r--ydb/core/sys_view/common/common.h2
-rw-r--r--ydb/core/sys_view/common/events.h22
-rw-r--r--ydb/core/sys_view/common/processor_scan.h124
-rw-r--r--ydb/core/sys_view/common/schema.cpp3
-rw-r--r--ydb/core/sys_view/common/schema.h31
-rw-r--r--ydb/core/sys_view/common/ya.make1
-rw-r--r--ydb/core/sys_view/partition_stats/partition_stats.cpp194
-rw-r--r--ydb/core/sys_view/partition_stats/partition_stats.h2
-rw-r--r--ydb/core/sys_view/partition_stats/partition_stats_ut.cpp14
-rw-r--r--ydb/core/sys_view/partition_stats/top_partitions.cpp91
-rw-r--r--ydb/core/sys_view/partition_stats/top_partitions.h11
-rw-r--r--ydb/core/sys_view/partition_stats/ya.make2
-rw-r--r--ydb/core/sys_view/processor/processor_impl.cpp275
-rw-r--r--ydb/core/sys_view/processor/processor_impl.h89
-rw-r--r--ydb/core/sys_view/processor/schema.h55
-rw-r--r--ydb/core/sys_view/processor/tx_aggregate.cpp2
-rw-r--r--ydb/core/sys_view/processor/tx_collect.cpp3
-rw-r--r--ydb/core/sys_view/processor/tx_init.cpp131
-rw-r--r--ydb/core/sys_view/processor/tx_init_schema.cpp4
-rw-r--r--ydb/core/sys_view/processor/tx_interval_metrics.cpp10
-rw-r--r--ydb/core/sys_view/processor/tx_interval_summary.cpp4
-rw-r--r--ydb/core/sys_view/processor/tx_top_partitions.cpp121
-rw-r--r--ydb/core/sys_view/processor/ya.make1
-rw-r--r--ydb/core/sys_view/query_stats/query_metrics.cpp203
-rw-r--r--ydb/core/sys_view/query_stats/query_stats.cpp4
-rw-r--r--ydb/core/sys_view/query_stats/query_stats.h3
-rw-r--r--ydb/core/sys_view/scan.cpp11
-rw-r--r--ydb/core/sys_view/ut_common.cpp5
-rw-r--r--ydb/core/sys_view/ut_kqp.cpp242
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp12
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp12
35 files changed, 1361 insertions, 377 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h
index ad781ba56c..3b336373c3 100644
--- a/ydb/core/base/appdata.h
+++ b/ydb/core/base/appdata.h
@@ -166,6 +166,7 @@ struct TAppData {
bool EnableKqpSpilling = false;
bool AllowShadowDataInSchemeShardForTests = false;
bool EnableMvccSnapshotWithLegacyDomainRoot = false;
+ bool UsePartitionStatsCollectorForTests = false;
TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks
TVector<TString> DefaultUserSIDs;
TString AllAuthenticatedUsers;
diff --git a/ydb/core/protos/counters_sysview_processor.proto b/ydb/core/protos/counters_sysview_processor.proto
index 63c03cb653..c2077a6d53 100644
--- a/ydb/core/protos/counters_sysview_processor.proto
+++ b/ydb/core/protos/counters_sysview_processor.proto
@@ -14,4 +14,5 @@ enum ETxTypes {
TXTYPE_AGGREGATE = 4 [(TxTypeOpts) = {Name: "TxAggregate"}];
TXTYPE_INTERVAL_SUMMARY = 5 [(TxTypeOpts) = {Name: "TxIntervalSummary"}];
TXTYPE_INTERVAL_METRICS = 6 [(TxTypeOpts) = {Name: "TxIntervalMetrics"}];
+ TXTYPE_TOP_PARTITIONS = 7 [(TxTypeOpts) = {Name: "TxTopPartitions"}];
}
diff --git a/ydb/core/protos/sys_view.proto b/ydb/core/protos/sys_view.proto
index 4d495d6b60..a3ff84b707 100644
--- a/ydb/core/protos/sys_view.proto
+++ b/ydb/core/protos/sys_view.proto
@@ -123,6 +123,8 @@ enum EStatsType {
METRICS_ONE_HOUR = 8;
TOP_REQUEST_UNITS_ONE_MINUTE = 9;
TOP_REQUEST_UNITS_ONE_HOUR = 10;
+ TOP_PARTITIONS_ONE_MINUTE = 11;
+ TOP_PARTITIONS_ONE_HOUR = 12;
}
message TEvGetQueryStats {
@@ -550,3 +552,51 @@ message TEvSendDbCountersResponse {
optional uint64 Generation = 2; // confirmed generation
}
+// ---- Top partitions
+
+message TTopPartitionsKey {
+ optional uint64 IntervalEndUs = 1;
+ optional uint64 Rank = 2;
+}
+
+message TTopPartitionsInfo {
+ optional fixed64 TabletId = 1;
+ optional string Path = 2;
+ optional uint64 PeakTimeUs = 3;
+ optional double CPUCores = 4;
+ optional uint32 NodeId = 5;
+ optional uint64 DataSize = 6;
+ optional uint64 RowCount = 7;
+ optional uint64 IndexSize = 8;
+ optional uint32 InFlightTxCount = 9;
+}
+
+message TTopPartitionsEntry {
+ optional TTopPartitionsKey Key = 1;
+ optional TTopPartitionsInfo Info = 2;
+}
+
+message TEvGetTopPartitionsRequest {
+ optional TTopPartitionsKey From = 1;
+ optional bool InclusiveFrom = 2;
+
+ optional TTopPartitionsKey To = 3;
+ optional bool InclusiveTo = 4;
+
+ optional EStatsType Type = 5;
+}
+
+message TEvGetTopPartitionsResponse {
+ repeated TTopPartitionsEntry Entries = 1;
+ optional TTopPartitionsKey Next = 2;
+ optional bool LastBatch = 3;
+ optional bool Overloaded = 4;
+}
+
+// partitions stats collector -> SVP
+message TEvSendTopPartitions {
+ repeated TTopPartitionsInfo Partitions = 1;
+ optional uint64 TimeUs = 2;
+}
+
+
diff --git a/ydb/core/sys_view/common/common.h b/ydb/core/sys_view/common/common.h
index c88107063a..65256b8b6d 100644
--- a/ydb/core/sys_view/common/common.h
+++ b/ydb/core/sys_view/common/common.h
@@ -28,5 +28,7 @@ enum class EProcessorMode {
FAST // fast mode for tests
};
+constexpr size_t TOP_PARTITIONS_COUNT = 10;
+
} // NSysView
} // NKikimr
diff --git a/ydb/core/sys_view/common/events.h b/ydb/core/sys_view/common/events.h
index df73e745f5..314511d184 100644
--- a/ydb/core/sys_view/common/events.h
+++ b/ydb/core/sys_view/common/events.h
@@ -66,6 +66,10 @@ struct TEvSysView {
EvGetStorageStatsRequest,
EvGetStorageStatsResponse,
+ EvSendTopPartitions,
+ EvGetTopPartitionsRequest,
+ EvGetTopPartitionsResponse,
+
EvEnd,
};
@@ -361,6 +365,24 @@ struct TEvSysView {
struct TEvGetStorageStatsResponse
: TEventPB<TEvGetStorageStatsResponse, NKikimrSysView::TEvGetStorageStatsResponse, EvGetStorageStatsResponse>
{};
+
+ struct TEvSendTopPartitions : public TEventPB<
+ TEvSendTopPartitions,
+ NKikimrSysView::TEvSendTopPartitions,
+ EvSendTopPartitions>
+ {};
+
+ struct TEvGetTopPartitionsRequest : public TEventPB<
+ TEvGetTopPartitionsRequest,
+ NKikimrSysView::TEvGetTopPartitionsRequest,
+ EvGetTopPartitionsRequest>
+ {};
+
+ struct TEvGetTopPartitionsResponse : public TEventPB<
+ TEvGetTopPartitionsResponse,
+ NKikimrSysView::TEvGetTopPartitionsResponse,
+ EvGetTopPartitionsResponse>
+ {};
};
} // NSysView
diff --git a/ydb/core/sys_view/common/processor_scan.h b/ydb/core/sys_view/common/processor_scan.h
new file mode 100644
index 0000000000..56aeb70a0a
--- /dev/null
+++ b/ydb/core/sys_view/common/processor_scan.h
@@ -0,0 +1,124 @@
+#include "common.h"
+#include "events.h"
+#include "keys.h"
+#include "schema.h"
+#include "scan_actor_base_impl.h"
+
+#include <ydb/core/base/tablet_pipecache.h>
+
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NKikimr::NSysView {
+
+using namespace NActors;
+
+template <typename TEntry, typename TRequest, typename TResponse,
+ typename TEvRequest, typename TEvResponse, typename TExtractorMap, typename... T>
+class TProcessorScan : public TScanActorBase<
+ TProcessorScan<TEntry, TRequest, TResponse, TEvRequest, TEvResponse, TExtractorMap, T...>>
+{
+public:
+ using TScan = TProcessorScan<TEntry, TRequest, TResponse, TEvRequest, TEvResponse, TExtractorMap, T...>;
+ using TBase = TScanActorBase<TScan>;
+
+ static constexpr auto ActorActivityType() {
+ return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN;
+ }
+
+ TProcessorScan(
+ const TActorId& ownerId,
+ ui32 scanId,
+ const TTableId& tableId,
+ const TTableRange& tableRange,
+ const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns,
+ NKikimrSysView::EStatsType type)
+ : TBase(ownerId, scanId, tableId, tableRange, columns)
+ {
+ ConvertKeyRange<TRequest, T...>(Request, this->TableRange);
+ Request.SetType(type);
+ }
+
+ STATEFN(StateScan) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle);
+ hFunc(TEvResponse, Handle);
+ hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ hFunc(NKqp::TEvKqp::TEvAbortExecution, this->HandleAbortExecution);
+ cFunc(TEvents::TEvWakeup::EventType, this->HandleTimeout);
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ default:
+ SVLOG_CRIT("NSysView::TProcessorScan: unexpected event# " << ev->GetTypeRewrite());
+ }
+ }
+
+private:
+ void ProceedToScan() override {
+ TBase::Become(&TScan::StateScan);
+ if (this->AckReceived) {
+ RequestBatch();
+ }
+ }
+
+ void RequestBatch() {
+ if (!this->SysViewProcessorId) {
+ SVLOG_W("NSysView::TProcessorScan: no sysview processor for database " << this->TenantName
+ << ", sending empty response");
+ this->ReplyEmptyAndDie();
+ return;
+ }
+
+ if (this->BatchRequestInFlight) {
+ return;
+ }
+
+ auto req = MakeHolder<TEvRequest>();
+ req->Record.CopyFrom(Request);
+
+ TBase::Send(MakePipePeNodeCacheID(false),
+ new TEvPipeCache::TEvForward(req.Release(), this->SysViewProcessorId, true),
+ IEventHandle::FlagTrackDelivery);
+
+ this->BatchRequestInFlight = true;
+ }
+
+ void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) {
+ RequestBatch();
+ }
+
+ void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) {
+ this->ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE,
+ "NSysView::TProcessorScan: delivery problem");
+ }
+
+ void Handle(typename TEvResponse::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+ if (record.HasOverloaded() && record.GetOverloaded()) {
+ this->ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE,
+ "NSysView::TProcessorScan: SysViewProcessor is overloaded");
+ return;
+ }
+
+ this->template ReplyBatch<TEvResponse, TEntry, TExtractorMap, true>(ev);
+
+ if (!record.GetLastBatch()) {
+ Y_VERIFY(record.HasNext());
+ Request.MutableFrom()->CopyFrom(record.GetNext());
+ Request.SetInclusiveFrom(true);
+ }
+
+ this->BatchRequestInFlight = false;
+ }
+
+ void PassAway() override {
+ TBase::Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
+ TBase::PassAway();
+ }
+
+private:
+ TRequest Request;
+};
+
+template <typename TEntry>
+using TExtractorFunc = std::function<TCell(const TEntry&)>;
+
+} // NKikimr::NSysView
diff --git a/ydb/core/sys_view/common/schema.cpp b/ydb/core/sys_view/common/schema.cpp
index 1917ea35a4..d45af9c9d4 100644
--- a/ydb/core/sys_view/common/schema.cpp
+++ b/ydb/core/sys_view/common/schema.cpp
@@ -216,6 +216,9 @@ private:
RegisterOlapStoreSystemView<Schema::PrimaryIndexStats>(StorePrimaryIndexStatsName);
RegisterOlapTableSystemView<Schema::PrimaryIndexStats>(TablePrimaryIndexStatsName);
+
+ RegisterSystemView<Schema::TopPartitions>(TopPartitions1MinuteName);
+ RegisterSystemView<Schema::TopPartitions>(TopPartitions1HourName);
}
private:
diff --git a/ydb/core/sys_view/common/schema.h b/ydb/core/sys_view/common/schema.h
index 29ba8b9b0f..cf7830073f 100644
--- a/ydb/core/sys_view/common/schema.h
+++ b/ydb/core/sys_view/common/schema.h
@@ -33,6 +33,9 @@ constexpr TStringBuf QueryMetricsName = "query_metrics_one_minute";
constexpr TStringBuf StorePrimaryIndexStatsName = "store_primary_index_stats";
constexpr TStringBuf TablePrimaryIndexStatsName = "primary_index_stats";
+constexpr TStringBuf TopPartitions1MinuteName = "top_partitions_one_minute";
+constexpr TStringBuf TopPartitions1HourName = "top_partitions_one_hour";
+
struct Schema : NIceDb::Schema {
struct PartitionStats : Table<1> {
struct OwnerId : Column<1, NScheme::NTypeIds::Uint64> {};
@@ -420,6 +423,34 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<PDiskFilter, ErasureSpecies, CurrentGroupsCreated, CurrentAllocatedSize,
CurrentAvailableSize, AvailableGroupsToCreate, AvailableSizeToCreate>;
};
+
+ struct TopPartitions : Table<12> {
+ struct IntervalEnd : Column<1, NScheme::NTypeIds::Timestamp> {};
+ struct Rank : Column<2, NScheme::NTypeIds::Uint32> {};
+ struct TabletId : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct Path : Column<4, NScheme::NTypeIds::Utf8> {};
+ struct PeakTime : Column<5, NScheme::NTypeIds::Timestamp> {};
+ struct CPUCores : Column<6, NScheme::NTypeIds::Double> {};
+ struct NodeId : Column<7, NScheme::NTypeIds::Uint32> {};
+ struct DataSize : Column<8, NScheme::NTypeIds::Uint64> {};
+ struct RowCount : Column<9, NScheme::NTypeIds::Uint64> {};
+ struct IndexSize : Column<10, NScheme::NTypeIds::Uint64> {};
+ struct InFlightTxCount : Column<11, NScheme::NTypeIds::Uint32> {};
+
+ using TKey = TableKey<IntervalEnd, Rank>;
+ using TColumns = TableColumns<
+ IntervalEnd,
+ Rank,
+ TabletId,
+ Path,
+ PeakTime,
+ CPUCores,
+ NodeId,
+ DataSize,
+ RowCount,
+ IndexSize,
+ InFlightTxCount>;
+ };
};
bool MaybeSystemViewPath(const TVector<TString>& path);
diff --git a/ydb/core/sys_view/common/ya.make b/ydb/core/sys_view/common/ya.make
index b4ec91b3d8..b94acc176d 100644
--- a/ydb/core/sys_view/common/ya.make
+++ b/ydb/core/sys_view/common/ya.make
@@ -14,6 +14,7 @@ SRCS(
schema.h
schema.cpp
utils.h
+ processor_scan.h
)
PEERDIR(
diff --git a/ydb/core/sys_view/partition_stats/partition_stats.cpp b/ydb/core/sys_view/partition_stats/partition_stats.cpp
index d6ea9ff73b..ae18049e97 100644
--- a/ydb/core/sys_view/partition_stats/partition_stats.cpp
+++ b/ydb/core/sys_view/partition_stats/partition_stats.cpp
@@ -1,5 +1,6 @@
#include "partition_stats.h"
+#include <ydb/core/sys_view/common/common.h>
#include <ydb/core/sys_view/common/events.h>
#include <ydb/core/sys_view/common/schema.h>
#include <ydb/core/sys_view/common/scan_actor_base_impl.h>
@@ -14,19 +15,40 @@ namespace NSysView {
using namespace NActors;
-class TPartitionStatsCollector : public TActor<TPartitionStatsCollector> {
+class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollector> {
public:
+ using TBase = TActorBootstrapped<TPartitionStatsCollector>;
+
static constexpr auto ActorActivityType() {
return NKikimrServices::TActivity::SYSTEM_VIEW_PART_STATS_COLLECTOR;
}
- explicit TPartitionStatsCollector(size_t batchSize, size_t pendingRequestsLimit)
- : TActor(&TPartitionStatsCollector::StateWork)
+ explicit TPartitionStatsCollector(TPathId domainKey, ui64 sysViewProcessorId,
+ size_t batchSize, size_t pendingRequestsLimit)
+ : DomainKey(domainKey)
+ , SysViewProcessorId(sysViewProcessorId)
, BatchSize(batchSize)
, PendingRequestsLimit(pendingRequestsLimit)
{}
- STFUNC(StateWork) {
+ void Bootstrap() {
+ SVLOG_D("NSysView::TPartitionStatsCollector bootstrapped: "
+ << "domain key# " << DomainKey
+ << ", sysview processor id# " << SysViewProcessorId);
+
+ if (AppData()->UsePartitionStatsCollectorForTests) {
+ OverloadedPartitionBound = 0.0;
+ ProcessOverloadedInterval = TDuration::Seconds(1);
+ }
+
+ if (SysViewProcessorId) {
+ Schedule(ProcessOverloadedInterval, new TEvPrivate::TEvProcessOverloaded);
+ }
+
+ Become(&TThis::StateWork);
+ }
+
+ STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvSysView::TEvSetPartitioning, Handle);
hFunc(TEvSysView::TEvRemoveTable, Handle);
@@ -34,10 +56,11 @@ public:
hFunc(TEvSysView::TEvUpdateTtlStats, Handle);
hFunc(TEvSysView::TEvGetPartitionStats, Handle);
hFunc(TEvPrivate::TEvProcess, Handle);
+ hFunc(TEvPrivate::TEvProcessOverloaded, Handle);
+ IgnoreFunc(TEvPipeCache::TEvDeliveryProblem);
cFunc(TEvents::TEvPoison::EventType, PassAway);
default:
- LOG_CRIT(ctx, NKikimrServices::SYSTEM_VIEWS,
- "NSysView::TPartitionStatsCollector: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ SVLOG_CRIT("NSysView::TPartitionStatsCollector: unexpected event " << ev->GetTypeRewrite());
}
}
@@ -45,11 +68,14 @@ private:
struct TEvPrivate {
enum EEv {
EvProcess = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvProcessOverloaded,
EvEnd
};
struct TEvProcess : public TEventLocal<TEvProcess, EvProcess> {};
+
+ struct TEvProcessOverloaded : public TEventLocal<TEvProcessOverloaded, EvProcessOverloaded> {};
};
void Handle(TEvSysView::TEvSetPartitioning::TPtr& ev) {
@@ -57,24 +83,36 @@ private:
const auto& pathId = ev->Get()->PathId;
auto& tables = DomainTables[domainKey];
- auto it = tables.find(pathId);
- if (it != tables.end()) {
- auto& oldPartitions = it->second.Partitions;
- THashMap<TShardIdx, NKikimrSysView::TPartitionStats> newPartitions;
+ auto tableFound = tables.Stats.find(pathId);
+ if (tableFound != tables.Stats.end()) {
+ auto& table = tableFound->second;
+
+ auto& oldPartitions = table.Partitions;
+ std::unordered_map<TShardIdx, NKikimrSysView::TPartitionStats> newPartitions;
+ std::unordered_set<TShardIdx> overloaded;
for (auto shardIdx : ev->Get()->ShardIndices) {
auto old = oldPartitions.find(shardIdx);
if (old != oldPartitions.end()) {
newPartitions[shardIdx] = old->second;
+ if (IsPartitionOverloaded(old->second)) {
+ overloaded.insert(shardIdx);
+ }
}
}
+ if (!overloaded.empty()) {
+ tables.Overloaded[pathId].swap(overloaded);
+ } else {
+ tables.Overloaded.erase(pathId);
+ }
+
oldPartitions.swap(newPartitions);
- it->second.ShardIndices.swap(ev->Get()->ShardIndices);
- it->second.Path = ev->Get()->Path;
+ table.ShardIndices.swap(ev->Get()->ShardIndices);
+ table.Path = ev->Get()->Path;
} else {
- auto& table = tables[pathId];
+ auto& table = tables.Stats[pathId];
table.ShardIndices.swap(ev->Get()->ShardIndices);
table.Path = ev->Get()->Path;
}
@@ -85,7 +123,8 @@ private:
const auto& pathId = ev->Get()->PathId;
auto& tables = DomainTables[domainKey];
- tables.erase(pathId);
+ tables.Stats.erase(pathId);
+ tables.Overloaded.erase(pathId);
}
void Handle(TEvSysView::TEvSendPartitionStats::TPtr& ev) {
@@ -94,14 +133,27 @@ private:
const auto& shardIdx = ev->Get()->ShardIdx;
auto& tables = DomainTables[domainKey];
- auto it = tables.find(pathId);
- if (it == tables.end()) {
+ auto tableFound = tables.Stats.find(pathId);
+ if (tableFound == tables.Stats.end()) {
return;
}
- auto& oldStats = it->second.Partitions[shardIdx];
+ auto& table = tableFound->second;
+ auto& oldStats = table.Partitions[shardIdx];
auto& newStats = ev->Get()->Stats;
+ if (IsPartitionOverloaded(newStats)) {
+ tables.Overloaded[pathId].insert(shardIdx);
+ } else {
+ auto overloadedFound = tables.Overloaded.find(pathId);
+ if (overloadedFound != tables.Overloaded.end()) {
+ overloadedFound->second.erase(shardIdx);
+ if (overloadedFound->second.empty()) {
+ tables.Overloaded.erase(pathId);
+ }
+ }
+ }
+
if (oldStats.HasTtlStats()) {
newStats.MutableTtlStats()->Swap(oldStats.MutableTtlStats());
}
@@ -115,12 +167,12 @@ private:
const auto& shardIdx = ev->Get()->ShardIdx;
auto& tables = DomainTables[domainKey];
- auto it = tables.find(pathId);
- if (it == tables.end()) {
+ auto tableFound = tables.Stats.find(pathId);
+ if (tableFound == tables.Stats.end()) {
return;
}
- it->second.Partitions[shardIdx].MutableTtlStats()->Swap(&ev->Get()->Stats);
+ tableFound->second.Partitions[shardIdx].MutableTtlStats()->Swap(&ev->Get()->Stats);
}
void Handle(TEvSysView::TEvGetPartitionStats::TPtr& ev) {
@@ -170,7 +222,7 @@ private:
Send(request->Sender, std::move(result));
return;
}
- auto& tables = itTables->second;
+ auto& tables = itTables->second.Stats;
auto it = tables.begin();
auto itEnd = tables.end();
@@ -290,27 +342,115 @@ private:
Send(request->Sender, std::move(result));
}
+ void Handle(TEvPrivate::TEvProcessOverloaded::TPtr&) {
+ if (!SysViewProcessorId) {
+ return;
+ }
+
+ Schedule(ProcessOverloadedInterval, new TEvPrivate::TEvProcessOverloaded);
+
+ auto domainFound = DomainTables.find(DomainKey);
+ if (domainFound == DomainTables.end()) {
+ SVLOG_D("NSysView::TPartitionStatsCollector: TEvProcessOverloaded: no tables");
+ return;
+ }
+ auto& domainTables = domainFound->second;
+
+ struct TPartition {
+ TPathId PathId;
+ TShardIdx ShardIdx;
+ double CPUCores;
+ };
+ std::vector<TPartition> sorted;
+
+ for (const auto& [pathId, shardIndices] : domainTables.Overloaded) {
+ for (const auto& shardIdx : shardIndices) {
+ auto& table = domainTables.Stats[pathId];
+ auto& partition = table.Partitions[shardIdx];
+ sorted.emplace_back(TPartition{pathId, shardIdx, partition.GetCPUCores()});
+ }
+ }
+
+ std::sort(sorted.begin(), sorted.end(),
+ [] (const auto& l, const auto& r) { return l.CPUCores > r.CPUCores; });
+
+ auto now = TActivationContext::Now();
+ auto nowUs = now.MicroSeconds();
+
+ size_t count = 0;
+ auto sendEvent = MakeHolder<TEvSysView::TEvSendTopPartitions>();
+ for (const auto& entry : sorted) {
+ auto& table = domainTables.Stats[entry.PathId];
+ auto& partition = table.Partitions[entry.ShardIdx];
+
+ auto* result = sendEvent->Record.AddPartitions();
+ result->SetTabletId(partition.GetTabletId());
+ result->SetPath(table.Path);
+ result->SetPeakTimeUs(nowUs);
+ result->SetCPUCores(partition.GetCPUCores());
+ result->SetNodeId(partition.GetNodeId());
+ result->SetDataSize(partition.GetDataSize());
+ result->SetRowCount(partition.GetRowCount());
+ result->SetIndexSize(partition.GetIndexSize());
+ result->SetInFlightTxCount(partition.GetInFlightTxCount());
+
+ if (++count == TOP_PARTITIONS_COUNT) {
+ break;
+ }
+ }
+
+ sendEvent->Record.SetTimeUs(nowUs);
+
+ SVLOG_D("NSysView::TPartitionStatsCollector: TEvProcessOverloaded "
+ << "top size# " << sorted.size()
+ << ", time# " << now);
+
+ Send(MakePipePeNodeCacheID(false),
+ new TEvPipeCache::TEvForward(sendEvent.Release(), SysViewProcessorId, true));
+ }
+
+ void PassAway() override {
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
+ TBase::PassAway();
+ }
+
+ bool IsPartitionOverloaded(NKikimrSysView::TPartitionStats& stats) {
+ return stats.GetCPUCores() >= OverloadedPartitionBound;
+ }
+
private:
+ const TPathId DomainKey;
+ const ui64 SysViewProcessorId;
const size_t BatchSize;
const size_t PendingRequestsLimit;
+ double OverloadedPartitionBound = 0.7;
+ TDuration ProcessOverloadedInterval = TDuration::Seconds(15);
+
struct TTableStats {
- THashMap<TShardIdx, NKikimrSysView::TPartitionStats> Partitions; // shardIdx -> stats
- TVector<TShardIdx> ShardIndices;
+ std::unordered_map<TShardIdx, NKikimrSysView::TPartitionStats> Partitions; // shardIdx -> stats
+ std::vector<TShardIdx> ShardIndices;
TString Path;
};
- using TDomainTables = TMap<TPathId, TTableStats>;
- THashMap<TPathId, TDomainTables> DomainTables;
+ struct TDomainTables {
+ std::map<TPathId, TTableStats> Stats;
+ std::unordered_map<TPathId, std::unordered_set<TShardIdx>> Overloaded;
+ };
+ std::unordered_map<TPathId, TDomainTables> DomainTables;
TQueue<TEvSysView::TEvGetPartitionStats::TPtr> PendingRequests;
bool ProcessInFly = false;
};
-THolder<IActor> CreatePartitionStatsCollector(size_t batchSize, size_t pendingRequestsLimit) {
- return MakeHolder<TPartitionStatsCollector>(batchSize, pendingRequestsLimit);
+THolder<IActor> CreatePartitionStatsCollector(
+ TPathId domainKey, ui64 sysViewProcessorId, size_t batchSize, size_t pendingRequestsLimit)
+{
+ return MakeHolder<TPartitionStatsCollector>(
+ domainKey, sysViewProcessorId, batchSize, pendingRequestsLimit);
}
+
class TPartitionStatsScan : public TScanActorBase<TPartitionStatsScan> {
public:
using TBase = TScanActorBase<TPartitionStatsScan>;
diff --git a/ydb/core/sys_view/partition_stats/partition_stats.h b/ydb/core/sys_view/partition_stats/partition_stats.h
index dedf62906f..f1f192b9db 100644
--- a/ydb/core/sys_view/partition_stats/partition_stats.h
+++ b/ydb/core/sys_view/partition_stats/partition_stats.h
@@ -9,6 +9,8 @@ constexpr size_t STATS_COLLECTOR_BATCH_SIZE = 5000;
constexpr size_t STATS_COLLECTOR_QUEUE_SIZE_LIMIT = 10;
THolder<IActor> CreatePartitionStatsCollector(
+ TPathId domainKey,
+ ui64 sysViewProcessorId,
size_t batchSize = STATS_COLLECTOR_BATCH_SIZE,
size_t pendingRequestsCount = STATS_COLLECTOR_QUEUE_SIZE_LIMIT);
diff --git a/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp b/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp
index c123012948..17a8e03aa6 100644
--- a/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp
+++ b/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp
@@ -16,12 +16,20 @@ Y_UNIT_TEST_SUITE(PartitionStats) {
return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr };
}
+ void WaitForBootstrap(TTestActorRuntime &runtime) {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
+ UNIT_ASSERT(runtime.DispatchEvents(options));
+ }
+
void TestCollector(size_t batchSize) {
TTestActorRuntime runtime;
runtime.Initialize(MakeEgg());
- auto collector = CreatePartitionStatsCollector(batchSize);
+ auto collector = CreatePartitionStatsCollector(TPathId(), 0, batchSize);
auto collectorId = runtime.Register(collector.Release());
+ WaitForBootstrap(runtime);
+
auto sender = runtime.AllocateEdgeActor();
auto domainKey = TPathId(1, 1);
@@ -175,8 +183,10 @@ Y_UNIT_TEST_SUITE(PartitionStats) {
TTestActorRuntime runtime;
runtime.Initialize(MakeEgg());
- auto collector = CreatePartitionStatsCollector(1, 0);
+ auto collector = CreatePartitionStatsCollector(TPathId(), 0, 1, 0);
auto collectorId = runtime.Register(collector.Release());
+ WaitForBootstrap(runtime);
+
auto sender = runtime.AllocateEdgeActor();
auto domainKey = TPathId(1, 1);
diff --git a/ydb/core/sys_view/partition_stats/top_partitions.cpp b/ydb/core/sys_view/partition_stats/top_partitions.cpp
new file mode 100644
index 0000000000..57a719da46
--- /dev/null
+++ b/ydb/core/sys_view/partition_stats/top_partitions.cpp
@@ -0,0 +1,91 @@
+#include "top_partitions.h"
+
+#include <ydb/core/sys_view/common/events.h>
+#include <ydb/core/sys_view/common/processor_scan.h>
+
+namespace NKikimr::NSysView {
+
+using namespace NActors;
+
+template <>
+void SetField<0>(NKikimrSysView::TTopPartitionsKey& key, ui64 value) {
+ key.SetIntervalEndUs(value);
+}
+
+template <>
+void SetField<1>(NKikimrSysView::TTopPartitionsKey& key, ui32 value) {
+ key.SetRank(value);
+}
+
+struct TTopPartitionsExtractorMap :
+ public std::unordered_map<NTable::TTag, TExtractorFunc<NKikimrSysView::TTopPartitionsEntry>>
+{
+ using S = Schema::TopPartitions;
+ using E = NKikimrSysView::TTopPartitionsEntry;
+
+ TTopPartitionsExtractorMap() {
+ insert({S::IntervalEnd::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui64>(entry.GetKey().GetIntervalEndUs());
+ }});
+ insert({S::Rank::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui32>(entry.GetKey().GetRank());
+ }});
+ insert({S::TabletId::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui64>(entry.GetInfo().GetTabletId());
+ }});
+ insert({S::Path::ColumnId, [] (const E& entry) {
+ const auto& text = entry.GetInfo().GetPath();
+ return TCell(text.data(), text.size());
+ }});
+ insert({S::PeakTime::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui64>(entry.GetInfo().GetPeakTimeUs());
+ }});
+ insert({S::CPUCores::ColumnId, [] (const E& entry) {
+ return TCell::Make<double>(entry.GetInfo().GetCPUCores());
+ }});
+ insert({S::NodeId::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui32>(entry.GetInfo().GetNodeId());
+ }});
+ insert({S::DataSize::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui64>(entry.GetInfo().GetDataSize());
+ }});
+ insert({S::RowCount::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui64>(entry.GetInfo().GetRowCount());
+ }});
+ insert({S::IndexSize::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui64>(entry.GetInfo().GetIndexSize());
+ }});
+ insert({S::InFlightTxCount::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui32>(entry.GetInfo().GetInFlightTxCount());
+ }});
+ }
+};
+
+THolder<IActor> CreateTopPartitionsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
+ const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns)
+{
+ using TTopPartitionsScan = TProcessorScan<
+ NKikimrSysView::TTopPartitionsEntry,
+ NKikimrSysView::TEvGetTopPartitionsRequest,
+ NKikimrSysView::TEvGetTopPartitionsResponse,
+ TEvSysView::TEvGetTopPartitionsRequest,
+ TEvSysView::TEvGetTopPartitionsResponse,
+ TTopPartitionsExtractorMap,
+ ui64,
+ ui32
+ >;
+
+ auto viewName = tableId.SysViewInfo;
+
+ if (viewName == TopPartitions1MinuteName) {
+ return MakeHolder<TTopPartitionsScan>(ownerId, scanId, tableId, tableRange, columns,
+ NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE);
+
+ } else if (viewName == TopPartitions1HourName) {
+ return MakeHolder<TTopPartitionsScan>(ownerId, scanId, tableId, tableRange, columns,
+ NKikimrSysView::TOP_PARTITIONS_ONE_HOUR);
+ }
+ return {};
+}
+
+} // NKikimr::NSysView
diff --git a/ydb/core/sys_view/partition_stats/top_partitions.h b/ydb/core/sys_view/partition_stats/top_partitions.h
new file mode 100644
index 0000000000..dcf49a89a5
--- /dev/null
+++ b/ydb/core/sys_view/partition_stats/top_partitions.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <ydb/core/kqp/runtime/kqp_compute.h>
+
+namespace NKikimr::NSysView {
+
+THolder<IActor> CreateTopPartitionsScan(const TActorId& ownerId, ui32 scanId,
+ const TTableId& tableId, const TTableRange& tableRange,
+ const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns);
+
+} // NKikimr::NSysView
diff --git a/ydb/core/sys_view/partition_stats/ya.make b/ydb/core/sys_view/partition_stats/ya.make
index d24bf5c89d..4528f1f527 100644
--- a/ydb/core/sys_view/partition_stats/ya.make
+++ b/ydb/core/sys_view/partition_stats/ya.make
@@ -8,6 +8,8 @@ OWNER(
SRCS(
partition_stats.h
partition_stats.cpp
+ top_partitions.h
+ top_partitions.cpp
)
PEERDIR(
diff --git a/ydb/core/sys_view/processor/processor_impl.cpp b/ydb/core/sys_view/processor/processor_impl.cpp
index 1bdb84ffb0..f6566b07e3 100644
--- a/ydb/core/sys_view/processor/processor_impl.cpp
+++ b/ydb/core/sys_view/processor/processor_impl.cpp
@@ -11,8 +11,8 @@ namespace NSysView {
TSysViewProcessor::TSysViewProcessor(const TActorId& tablet, TTabletStorageInfo* info, EProcessorMode processorMode)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
- , TotalInterval(TDuration::Seconds(processorMode == EProcessorMode::FAST ? 6 : 60))
- , CollectInterval(TDuration::Seconds(processorMode == EProcessorMode::FAST ? 3 : 30))
+ , TotalInterval(TDuration::Seconds(processorMode == EProcessorMode::FAST ? 1 : 60))
+ , CollectInterval(TotalInterval / 2)
, ExternalGroup(new NMonitoring::TDynamicCounters)
{
InternalGroups["kqp_serverless"] = new NMonitoring::TDynamicCounters;
@@ -70,8 +70,8 @@ void TSysViewProcessor::PersistIntervalEnd(NIceDb::TNiceDb& db) {
}
template <typename TSchema>
-void TSysViewProcessor::PersistTopResults(NIceDb::TNiceDb& db,
- TTop& top, TResultStatsMap& results, TInstant intervalEnd)
+void TSysViewProcessor::PersistQueryTopResults(NIceDb::TNiceDb& db,
+ TQueryTop& top, TResultStatsMap& results, TInstant intervalEnd)
{
ui64 intervalEndUs = intervalEnd.MicroSeconds();
ui32 rank = 0;
@@ -95,14 +95,14 @@ void TSysViewProcessor::PersistTopResults(NIceDb::TNiceDb& db,
}
}
- SVLOG_D("[" << TabletID() << "] PersistTopResults: "
+ SVLOG_D("[" << TabletID() << "] PersistQueryTopResults: "
<< "table id# " << TSchema::TableId
<< ", interval end# " << intervalEnd
<< ", query count# " << top.size()
<< ", persisted# " << rank);
}
-void TSysViewProcessor::PersistResults(NIceDb::TNiceDb& db) {
+void TSysViewProcessor::PersistQueryResults(NIceDb::TNiceDb& db) {
std::vector<std::pair<ui64, TQueryHash>> sorted;
sorted.reserve(QueryMetrics.size());
for (const auto& [queryHash, metrics] : QueryMetrics) {
@@ -128,33 +128,69 @@ void TSysViewProcessor::PersistResults(NIceDb::TNiceDb& db) {
NIceDb::TUpdate<Schema::MetricsOneMinute::Data>(serialized));
}
- SVLOG_D("[" << TabletID() << "] PersistResults: "
+ SVLOG_D("[" << TabletID() << "] PersistQueryResults: "
<< "interval end# " << IntervalEnd
<< ", query count# " << sorted.size());
// TODO: metrics one hour?
- PersistTopResults<Schema::TopByDurationOneMinute>(
+ PersistQueryTopResults<Schema::TopByDurationOneMinute>(
db, ByDurationMinute, TopByDurationOneMinute, IntervalEnd);
- PersistTopResults<Schema::TopByReadBytesOneMinute>(
+ PersistQueryTopResults<Schema::TopByReadBytesOneMinute>(
db, ByReadBytesMinute, TopByReadBytesOneMinute, IntervalEnd);
- PersistTopResults<Schema::TopByCpuTimeOneMinute>(
+ PersistQueryTopResults<Schema::TopByCpuTimeOneMinute>(
db, ByCpuTimeMinute, TopByCpuTimeOneMinute, IntervalEnd);
- PersistTopResults<Schema::TopByRequestUnitsOneMinute>(
+ PersistQueryTopResults<Schema::TopByRequestUnitsOneMinute>(
db, ByRequestUnitsMinute, TopByRequestUnitsOneMinute, IntervalEnd);
auto hourEnd = EndOfHourInterval(IntervalEnd);
- PersistTopResults<Schema::TopByDurationOneHour>(
+ PersistQueryTopResults<Schema::TopByDurationOneHour>(
db, ByDurationHour, TopByDurationOneHour, hourEnd);
- PersistTopResults<Schema::TopByReadBytesOneHour>(
+ PersistQueryTopResults<Schema::TopByReadBytesOneHour>(
db, ByReadBytesHour, TopByReadBytesOneHour, hourEnd);
- PersistTopResults<Schema::TopByCpuTimeOneHour>(
+ PersistQueryTopResults<Schema::TopByCpuTimeOneHour>(
db, ByCpuTimeHour, TopByCpuTimeOneHour, hourEnd);
- PersistTopResults<Schema::TopByRequestUnitsOneHour>(
+ PersistQueryTopResults<Schema::TopByRequestUnitsOneHour>(
db, ByRequestUnitsHour, TopByRequestUnitsOneHour, hourEnd);
}
+template <typename TSchema>
+void TSysViewProcessor::PersistPartitionTopResults(NIceDb::TNiceDb& db,
+ TPartitionTop& top, TResultPartitionsMap& results, TInstant intervalEnd)
+{
+ ui64 intervalEndUs = intervalEnd.MicroSeconds();
+ ui32 rank = 0;
+
+ for (const auto& partition : top) {
+ auto key = std::make_pair(intervalEndUs, ++rank);
+ auto& info = results[key];
+ info.CopyFrom(*partition);
+
+ TString data;
+ Y_PROTOBUF_SUPPRESS_NODISCARD info.SerializeToString(&data);
+ db.Table<TSchema>().Key(key).Update(
+ NIceDb::TUpdate<typename TSchema::Data>(data));
+ }
+
+ SVLOG_D("[" << TabletID() << "] PersistPartitionTopResults: "
+ << "table id# " << TSchema::TableId
+ << ", partition interval end# " << intervalEnd
+ << ", partition count# " << top.size());
+}
+
+void TSysViewProcessor::PersistPartitionResults(NIceDb::TNiceDb& db) {
+ auto intervalEnd = IntervalEnd + TotalInterval;
+
+ PersistPartitionTopResults<Schema::TopPartitionsOneMinute>(
+ db, PartitionTopMinute, TopPartitionsOneMinute, intervalEnd);
+
+ auto hourEnd = EndOfHourInterval(intervalEnd);
+
+ PersistPartitionTopResults<Schema::TopPartitionsOneHour>(
+ db, PartitionTopHour, TopPartitionsOneHour, hourEnd);
+}
+
void TSysViewProcessor::ScheduleAggregate() {
auto rangeUs = RandomNumber<ui64>(TotalInterval.MicroSeconds() / 12);
auto deadline = IntervalEnd + CollectInterval + TDuration::MicroSeconds(rangeUs);
@@ -182,18 +218,18 @@ void TSysViewProcessor::ScheduleSendNavigate() {
Schedule(SendNavigateInterval, new TEvPrivate::TEvSendNavigate);
}
-template <typename TSchema, typename TEntry>
-void TSysViewProcessor::CutHistory(NIceDb::TNiceDb& db,
- TResultMap<TEntry>& metricsMap, TDuration historySize)
-{
+template <typename TSchema, typename TMap>
+void TSysViewProcessor::CutHistory(NIceDb::TNiceDb& db, TMap& results, TDuration historySize) {
auto past = IntervalEnd - historySize;
- auto key = std::make_pair(past.MicroSeconds(), 0);
+ typename TMap::key_type key;
+ key.first = past.MicroSeconds();
+ key.second = 0;
- auto bound = metricsMap.lower_bound(key);
- for (auto it = metricsMap.begin(); it != bound; ++it) {
+ auto bound = results.lower_bound(key);
+ for (auto it = results.begin(); it != bound; ++it) {
db.Table<TSchema>().Key(it->first).Delete();
}
- metricsMap.erase(metricsMap.begin(), bound);
+ results.erase(results.begin(), bound);
}
TInstant TSysViewProcessor::EndOfHourInterval(TInstant intervalEnd) {
@@ -218,8 +254,6 @@ void TSysViewProcessor::ClearIntervalSummaries(NIceDb::TNiceDb& db) {
}
void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) {
- // TODO: efficient delete?
-
ClearIntervalSummaries(db);
for (const auto& [queryHash, _] : QueryMetrics) {
@@ -233,22 +267,32 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) {
NodesToRequest.clear();
NodesInFlight.clear();
- auto clearTop = [&] (NKikimrSysView::EStatsType type, TTop& top) {
+ auto clearQueryTop = [&] (NKikimrSysView::EStatsType type, TQueryTop& top) {
for (const auto& query : top) {
db.Table<Schema::IntervalTops>().Key((ui32)type, query.Hash).Delete();
}
top.clear();
};
- clearTop(NKikimrSysView::TOP_DURATION_ONE_MINUTE, ByDurationMinute);
- clearTop(NKikimrSysView::TOP_READ_BYTES_ONE_MINUTE, ByReadBytesMinute);
- clearTop(NKikimrSysView::TOP_CPU_TIME_ONE_MINUTE, ByCpuTimeMinute);
- clearTop(NKikimrSysView::TOP_REQUEST_UNITS_ONE_MINUTE, ByRequestUnitsMinute);
+ auto clearPartitionTop = [&] (NKikimrSysView::EStatsType type, TPartitionTop& top) {
+ for (const auto& partition : top) {
+ db.Table<Schema::IntervalPartitionTops>().Key((ui32)type, partition->GetTabletId()).Delete();
+ }
+ top.clear();
+ };
+
+ clearQueryTop(NKikimrSysView::TOP_DURATION_ONE_MINUTE, ByDurationMinute);
+ clearQueryTop(NKikimrSysView::TOP_READ_BYTES_ONE_MINUTE, ByReadBytesMinute);
+ clearQueryTop(NKikimrSysView::TOP_CPU_TIME_ONE_MINUTE, ByCpuTimeMinute);
+ clearQueryTop(NKikimrSysView::TOP_REQUEST_UNITS_ONE_MINUTE, ByRequestUnitsMinute);
+
+ clearPartitionTop(NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE, PartitionTopMinute);
CurrentStage = COLLECT;
PersistStage(db);
auto oldHourEnd = EndOfHourInterval(IntervalEnd);
+ auto partitionOldHourEnd = EndOfHourInterval(IntervalEnd + TotalInterval);
auto now = ctx.Now();
auto intervalSize = TotalInterval.MicroSeconds();
@@ -257,12 +301,17 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) {
PersistIntervalEnd(db);
auto newHourEnd = EndOfHourInterval(IntervalEnd);
+ auto partitionNewHourEnd = EndOfHourInterval(IntervalEnd + TotalInterval);
if (oldHourEnd != newHourEnd) {
- clearTop(NKikimrSysView::TOP_DURATION_ONE_HOUR, ByDurationHour);
- clearTop(NKikimrSysView::TOP_READ_BYTES_ONE_HOUR, ByReadBytesHour);
- clearTop(NKikimrSysView::TOP_CPU_TIME_ONE_HOUR, ByCpuTimeHour);
- clearTop(NKikimrSysView::TOP_REQUEST_UNITS_ONE_HOUR, ByRequestUnitsHour);
+ clearQueryTop(NKikimrSysView::TOP_DURATION_ONE_HOUR, ByDurationHour);
+ clearQueryTop(NKikimrSysView::TOP_READ_BYTES_ONE_HOUR, ByReadBytesHour);
+ clearQueryTop(NKikimrSysView::TOP_CPU_TIME_ONE_HOUR, ByCpuTimeHour);
+ clearQueryTop(NKikimrSysView::TOP_REQUEST_UNITS_ONE_HOUR, ByRequestUnitsHour);
+ }
+
+ if (partitionOldHourEnd != partitionNewHourEnd) {
+ clearPartitionTop(NKikimrSysView::TOP_PARTITIONS_ONE_HOUR, PartitionTopHour);
}
SVLOG_D("[" << TabletID() << "] Reset: interval end# " << IntervalEnd);
@@ -270,18 +319,20 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) {
const auto minuteHistorySize = TotalInterval * ONE_MINUTE_BUCKET_COUNT;
const auto hourHistorySize = ONE_HOUR_BUCKET_SIZE * ONE_HOUR_BUCKET_COUNT;
- CutHistory<Schema::MetricsOneMinute, TQueryToMetrics>(db, MetricsOneMinute, minuteHistorySize);
- CutHistory<Schema::MetricsOneHour, TQueryToMetrics>(db, MetricsOneHour, hourHistorySize);
+ CutHistory<Schema::MetricsOneMinute>(db, MetricsOneMinute, minuteHistorySize);
+ CutHistory<Schema::MetricsOneHour>(db, MetricsOneHour, hourHistorySize);
- using TStats = NKikimrSysView::TQueryStats;
- CutHistory<Schema::TopByDurationOneMinute, TStats>(db, TopByDurationOneMinute, minuteHistorySize);
- CutHistory<Schema::TopByDurationOneHour, TStats>(db, TopByDurationOneHour, hourHistorySize);
- CutHistory<Schema::TopByReadBytesOneMinute, TStats>(db, TopByReadBytesOneMinute, minuteHistorySize);
- CutHistory<Schema::TopByReadBytesOneHour, TStats>(db, TopByReadBytesOneHour, hourHistorySize);
- CutHistory<Schema::TopByCpuTimeOneMinute, TStats>(db, TopByCpuTimeOneMinute, minuteHistorySize);
- CutHistory<Schema::TopByCpuTimeOneHour, TStats>(db, TopByCpuTimeOneHour, hourHistorySize);
- CutHistory<Schema::TopByRequestUnitsOneMinute, TStats>(db, TopByRequestUnitsOneMinute, minuteHistorySize);
- CutHistory<Schema::TopByRequestUnitsOneHour, TStats>(db, TopByRequestUnitsOneHour, hourHistorySize);
+ CutHistory<Schema::TopByDurationOneMinute>(db, TopByDurationOneMinute, minuteHistorySize);
+ CutHistory<Schema::TopByDurationOneHour>(db, TopByDurationOneHour, hourHistorySize);
+ CutHistory<Schema::TopByReadBytesOneMinute>(db, TopByReadBytesOneMinute, minuteHistorySize);
+ CutHistory<Schema::TopByReadBytesOneHour>(db, TopByReadBytesOneHour, hourHistorySize);
+ CutHistory<Schema::TopByCpuTimeOneMinute>(db, TopByCpuTimeOneMinute, minuteHistorySize);
+ CutHistory<Schema::TopByCpuTimeOneHour>(db, TopByCpuTimeOneHour, hourHistorySize);
+ CutHistory<Schema::TopByRequestUnitsOneMinute>(db, TopByRequestUnitsOneMinute, minuteHistorySize);
+ CutHistory<Schema::TopByRequestUnitsOneHour>(db, TopByRequestUnitsOneHour, hourHistorySize);
+
+ CutHistory<Schema::TopPartitionsOneMinute>(db, TopPartitionsOneMinute, minuteHistorySize);
+ CutHistory<Schema::TopPartitionsOneHour>(db, TopPartitionsOneHour, hourHistorySize);
}
void TSysViewProcessor::SendRequests() {
@@ -349,13 +400,13 @@ void TSysViewProcessor::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
void TSysViewProcessor::Handle(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) {
const auto& record = ev->Get()->Record;
- auto type = record.GetType();
if (PendingRequests.size() >= PendingRequestsLimit) {
+ auto type = record.GetType();
if (type == NKikimrSysView::METRICS_ONE_MINUTE || type == NKikimrSysView::METRICS_ONE_HOUR) {
- ReplyOverloaded<TEvSysView::TEvGetQueryMetricsResponse>(ev);
+ ReplyOverloaded<TEvSysView::TEvGetQueryMetricsResponse>(ev->Sender);
} else {
- ReplyOverloaded<TEvSysView::TEvGetQueryStatsResponse>(ev);
+ ReplyOverloaded<TEvSysView::TEvGetQueryStatsResponse>(ev->Sender);
}
return;
}
@@ -368,6 +419,20 @@ void TSysViewProcessor::Handle(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev)
}
}
+void TSysViewProcessor::Handle(TEvSysView::TEvGetTopPartitionsRequest::TPtr& ev) {
+ if (PendingRequests.size() >= PendingRequestsLimit) {
+ ReplyOverloaded<TEvSysView::TEvGetTopPartitionsResponse>(ev->Sender);
+ return;
+ }
+
+ PendingRequests.push(std::move(ev));
+
+ if (!ProcessInFly) {
+ Send(SelfId(), new TEvPrivate::TEvProcess());
+ ProcessInFly = true;
+ }
+}
+
void TSysViewProcessor::Handle(TEvPrivate::TEvProcess::TPtr&) {
ProcessInFly = false;
@@ -375,7 +440,7 @@ void TSysViewProcessor::Handle(TEvPrivate::TEvProcess::TPtr&) {
return;
}
- TEvSysView::TEvGetQueryMetricsRequest::TPtr request = std::move(PendingRequests.front());
+ TVariantRequestPtr request = std::move(PendingRequests.front());
PendingRequests.pop();
if (!PendingRequests.empty()) {
@@ -383,31 +448,71 @@ void TSysViewProcessor::Handle(TEvPrivate::TEvProcess::TPtr&) {
ProcessInFly = true;
}
- const auto& record = request->Get()->Record;
- auto type = record.GetType();
+ if (auto* req = std::get_if<TEvSysView::TEvGetTopPartitionsRequest::TPtr>(&request)) {
+ Reply<TResultPartitionsMap,
+ TEvSysView::TEvGetTopPartitionsRequest,
+ TEvSysView::TEvGetTopPartitionsResponse>(*req);
+
+ } else if (auto* req = std::get_if<TEvSysView::TEvGetQueryMetricsRequest::TPtr>(&request)) {
+ const auto& record = (*req)->Get()->Record;
+ auto type = record.GetType();
- if (type == NKikimrSysView::METRICS_ONE_MINUTE || type == NKikimrSysView::METRICS_ONE_HOUR) {
- Reply<TQueryToMetrics, TEvSysView::TEvGetQueryMetricsResponse>(request);
+ if (type == NKikimrSysView::METRICS_ONE_MINUTE || type == NKikimrSysView::METRICS_ONE_HOUR) {
+ Reply<TResultMetricsMap,
+ TEvSysView::TEvGetQueryMetricsRequest,
+ TEvSysView::TEvGetQueryMetricsResponse>(*req);
+ } else {
+ Reply<TResultStatsMap,
+ TEvSysView::TEvGetQueryMetricsRequest,
+ TEvSysView::TEvGetQueryStatsResponse>(*req);
+ }
} else {
- Reply<NKikimrSysView::TQueryStats, TEvSysView::TEvGetQueryStatsResponse>(request);
+ Y_FAIL("unknown SVP request");
}
}
+void TSysViewProcessor::EntryToProto(NKikimrSysView::TQueryMetricsEntry& dst, const TQueryToMetrics& src) {
+ dst.MutableMetrics()->CopyFrom(src.Metrics);
+ dst.SetQueryText(src.Text);
+}
+
+void TSysViewProcessor::EntryToProto(NKikimrSysView::TQueryStatsEntry& dst, const NKikimrSysView::TQueryStats& src) {
+ dst.MutableStats()->CopyFrom(src);
+}
+
+void TSysViewProcessor::EntryToProto(NKikimrSysView::TTopPartitionsEntry& dst, const NKikimrSysView::TTopPartitionsInfo& src) {
+ dst.MutableInfo()->CopyFrom(src);
+}
+
template <typename TResponse>
-void TSysViewProcessor::ReplyOverloaded(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) {
+void TSysViewProcessor::ReplyOverloaded(const TActorId& sender) {
auto response = MakeHolder<TResponse>();
response->Record.SetOverloaded(true);
- Send(ev->Sender, std::move(response));
+ Send(sender, std::move(response));
}
-template <typename TEntry, typename TResponse>
-void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) {
+template <typename TMap, typename TRequest, typename TResponse>
+void TSysViewProcessor::Reply(typename TRequest::TPtr& ev) {
const auto& record = ev->Get()->Record;
auto response = MakeHolder<TResponse>();
response->Record.SetLastBatch(true);
- TResultMap<TEntry>* entries = nullptr;
- if constexpr (std::is_same<TEntry, TQueryToMetrics>::value) {
+ using TEntry = typename TMap::mapped_type;
+ TMap* entries = nullptr;
+ if constexpr (std::is_same<TEntry, NKikimrSysView::TTopPartitionsInfo>::value) {
+ switch (record.GetType()) {
+ case NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE:
+ entries = &TopPartitionsOneMinute;
+ break;
+ case NKikimrSysView::TOP_PARTITIONS_ONE_HOUR:
+ entries = &TopPartitionsOneHour;
+ break;
+ default:
+ SVLOG_CRIT("[" << TabletID() << "] unexpected stats type: " << (size_t)record.GetType());
+ Send(ev->Sender, std::move(response));
+ return;
+ }
+ } else if constexpr (std::is_same<TEntry, TQueryToMetrics>::value) {
switch (record.GetType()) {
case NKikimrSysView::METRICS_ONE_MINUTE:
entries = &MetricsOneMinute;
@@ -458,7 +563,6 @@ void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) {
auto from = entries->begin();
auto to = entries->end();
- TString fromStr("[]");
if (record.HasFrom()) {
auto key = std::make_pair(record.GetFrom().GetIntervalEndUs(), record.GetFrom().GetRank());
if (!record.HasInclusiveFrom() || record.GetInclusiveFrom()) {
@@ -466,14 +570,8 @@ void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) {
} else {
from = entries->upper_bound(key);
}
- TStringBuilder str;
- str << "[" << record.GetFrom().GetIntervalEndUs()
- << ", " << record.GetFrom().GetRank()
- << ", " << (record.GetInclusiveFrom() ? "inc]" : "exc]");
- fromStr = str;
}
- TString toStr("[]");
if (record.HasTo()) {
auto key = std::make_pair(record.GetTo().GetIntervalEndUs(), record.GetTo().GetRank());
if (!record.HasInclusiveTo() || !record.GetInclusiveTo()) {
@@ -481,33 +579,19 @@ void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) {
} else {
to = entries->upper_bound(key);
}
- TStringBuilder str;
- str << "[" << record.GetTo().GetIntervalEndUs()
- << ", " << record.GetTo().GetRank()
- << ", " << (record.GetInclusiveTo() ? "inc]" : "exc]");
- toStr = str;
}
- TString nextStr("[]");
size_t size = 0;
size_t count = 0;
for (auto it = from; it != to; ++it) {
const auto& key = it->first;
auto& entry = *response->Record.AddEntries();
-
auto& entryKey = *entry.MutableKey();
entryKey.SetIntervalEndUs(key.first);
entryKey.SetRank(key.second);
- if constexpr (std::is_same<TEntry, TQueryToMetrics>::value) {
- const auto& metrics = it->second;
- entry.MutableMetrics()->CopyFrom(metrics.Metrics);
- entry.SetQueryText(metrics.Text);
- } else {
- const auto& stats = it->second;
- entry.MutableStats()->CopyFrom(stats);
- }
+ EntryToProto(entry, it->second);
size += entry.ByteSizeLong();
++count;
@@ -517,19 +601,22 @@ void TSysViewProcessor::Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev) {
next->SetIntervalEndUs(key.first);
next->SetRank(key.second + 1);
response->Record.SetLastBatch(false);
-
- TStringBuilder str;
- str << "[" << next->GetIntervalEndUs()
- << ", " << next->GetRank()
- << "]";
- nextStr = str;
break;
}
}
- SVLOG_D("[" << TabletID() << "] Reply to TEvGetQueryMetricsRequest: "
- << "from# " << fromStr
- << ", to# " << toStr
+ TString rangeStr, nextStr;
+ google::protobuf::TextFormat::Printer range;
+ range.SetSingleLineMode(true);
+ range.PrintToString(record, &rangeStr);
+ if (response->Record.HasNext()) {
+ google::protobuf::TextFormat::Printer next;
+ next.SetSingleLineMode(true);
+ next.PrintToString(response->Record.GetNext(), &nextStr);
+ }
+
+ SVLOG_D("[" << TabletID() << "] Reply batch: "
+ << "range# " << rangeStr
<< ", rows# " << count
<< ", bytes# " << size
<< ", next# " << nextStr);
@@ -622,7 +709,7 @@ bool TSysViewProcessor::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev,
str << Endl;
}
{
- auto printTop = [&str] (const TTop& top) {
+ auto printTop = [&str] (const TQueryTop& top) {
for (const auto& query : top) {
str << " Hash: " << query.Hash
<< ", Value: " << query.Value
@@ -675,6 +762,10 @@ bool TSysViewProcessor::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev,
<< " Count: " << TopByRequestUnitsOneMinute.size() << Endl << Endl;
str << "TopByRequestUnitsOneHour" << Endl
<< " Count: " << TopByRequestUnitsOneHour.size() << Endl << Endl;
+ str << "TopPartitionsOneMinute" << Endl
+ << " Count: " << TopPartitionsOneMinute.size() << Endl << Endl;
+ str << "TopPartitionsOneHour" << Endl
+ << " Count: " << TopPartitionsOneHour.size() << Endl << Endl;
}
}
}
diff --git a/ydb/core/sys_view/processor/processor_impl.h b/ydb/core/sys_view/processor/processor_impl.h
index 6db54e815c..1211671c30 100644
--- a/ydb/core/sys_view/processor/processor_impl.h
+++ b/ydb/core/sys_view/processor/processor_impl.h
@@ -41,6 +41,7 @@ private:
struct TTxAggregate;
struct TTxIntervalSummary;
struct TTxIntervalMetrics;
+ struct TTxTopPartitions;
struct TEvPrivate {
enum EEv {
@@ -72,15 +73,25 @@ private:
TNodeId NodeId;
THolder<NKikimrSysView::TQueryStats> Stats;
};
- using TTop = std::vector<TTopQuery>;
+ using TQueryTop = std::vector<TTopQuery>;
- using TResultKey = std::pair<ui64, ui32>;
+ using TPartitionTop = std::vector<THolder<NKikimrSysView::TTopPartitionsInfo>>;
+
+ using THistoryKey = std::pair<ui64, ui32>;
template <typename TEntry>
- using TResultMap = std::map<TResultKey, TEntry, std::less<TResultKey>,
- NActors::NMemory::TAlloc<std::pair<const TResultKey, TEntry>, MemoryLabelResults>>;
+ using TResultMap = std::map<THistoryKey, TEntry, std::less<THistoryKey>,
+ NActors::NMemory::TAlloc<std::pair<const THistoryKey, TEntry>, MemoryLabelResults>>;
+
using TResultStatsMap = TResultMap<NKikimrSysView::TQueryStats>;
+ using TResultPartitionsMap = TResultMap<NKikimrSysView::TTopPartitionsInfo>;
+
+ struct TQueryToMetrics {
+ NKikimrSysView::TQueryMetrics Metrics;
+ TString Text;
+ };
+
private:
static bool TopQueryCompare(const TTopQuery& l, const TTopQuery& r) {
return l.Value == r.Value ? l.Hash > r.Hash : l.Value > r.Value;
@@ -102,7 +113,10 @@ private:
void Handle(TEvPrivate::TEvProcess::TPtr& ev);
void Handle(TEvSysView::TEvIntervalQuerySummary::TPtr& ev);
void Handle(TEvSysView::TEvGetIntervalMetricsResponse::TPtr& ev);
+ void Handle(TEvSysView::TEvSendTopPartitions::TPtr& ev);
+
void Handle(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev);
+ void Handle(TEvSysView::TEvGetTopPartitionsRequest::TPtr& ev);
void Handle(TEvSysView::TEvSendDbCountersRequest::TPtr& ev);
void Handle(TEvPrivate::TEvApplyCounters::TPtr& ev);
@@ -121,10 +135,14 @@ private:
void PersistIntervalEnd(NIceDb::TNiceDb& db);
template <typename TSchema>
- void PersistTopResults(NIceDb::TNiceDb& db,
- TTop& top, TResultStatsMap& results, TInstant intervalEnd);
+ void PersistQueryTopResults(NIceDb::TNiceDb& db,
+ TQueryTop& top, TResultStatsMap& results, TInstant intervalEnd);
+ void PersistQueryResults(NIceDb::TNiceDb& db);
- void PersistResults(NIceDb::TNiceDb& db);
+ template <typename TSchema>
+ void PersistPartitionTopResults(NIceDb::TNiceDb& db,
+ TPartitionTop& top, TResultPartitionsMap& results, TInstant intervalEnd);
+ void PersistPartitionResults(NIceDb::TNiceDb& db);
void ScheduleAggregate();
void ScheduleCollect();
@@ -132,9 +150,8 @@ private:
void ScheduleApplyCounters();
void ScheduleSendNavigate();
- template <typename TSchema, typename TEntry>
- void CutHistory(NIceDb::TNiceDb& db, TResultMap<TEntry>& results,
- TDuration historySize);
+ template <typename TSchema, typename TMap>
+ void CutHistory(NIceDb::TNiceDb& db, TMap& results, TDuration historySize);
static TInstant EndOfHourInterval(TInstant intervalEnd);
@@ -145,11 +162,16 @@ private:
void SendRequests();
void IgnoreFailure(TNodeId nodeId);
+ static void EntryToProto(NKikimrSysView::TQueryMetricsEntry& dst, const TQueryToMetrics& src);
+ static void EntryToProto(NKikimrSysView::TQueryStatsEntry& dst, const NKikimrSysView::TQueryStats& src);
+ static void EntryToProto(NKikimrSysView::TTopPartitionsEntry& dst, const NKikimrSysView::TTopPartitionsInfo& src);
+
template <typename TResponse>
- void ReplyOverloaded(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev);
+ void ReplyOverloaded(const TActorId& sender);
+
+ template <typename TMap, typename TRequest, typename TResponse>
+ void Reply(typename TRequest::TPtr& ev);
- template <typename TEntry, typename TResponse>
- void Reply(TEvSysView::TEvGetQueryMetricsRequest::TPtr& ev);
TIntrusivePtr<IDbCounters> CreateCountersForService(NKikimrSysView::EDbCountersService service);
void AttachExternalCounters();
@@ -165,6 +187,8 @@ private:
IgnoreFunc(TEvSysView::TEvIntervalQuerySummary);
IgnoreFunc(TEvSysView::TEvGetIntervalMetricsResponse);
IgnoreFunc(TEvSysView::TEvGetQueryMetricsRequest);
+ IgnoreFunc(TEvSysView::TEvSendTopPartitions);
+ IgnoreFunc(TEvSysView::TEvGetTopPartitionsRequest);
IgnoreFunc(TEvSysView::TEvSendDbCountersRequest);
default:
if (!HandleDefaultEvents(ev, ctx)) {
@@ -181,6 +205,8 @@ private:
IgnoreFunc(TEvSysView::TEvIntervalQuerySummary);
IgnoreFunc(TEvSysView::TEvGetIntervalMetricsResponse);
IgnoreFunc(TEvSysView::TEvGetQueryMetricsRequest);
+ IgnoreFunc(TEvSysView::TEvSendTopPartitions);
+ IgnoreFunc(TEvSysView::TEvGetTopPartitionsRequest);
IgnoreFunc(TEvSysView::TEvSendDbCountersRequest);
default:
if (!HandleDefaultEvents(ev, ctx)) {
@@ -200,6 +226,8 @@ private:
hFunc(TEvSysView::TEvIntervalQuerySummary, Handle);
hFunc(TEvSysView::TEvGetIntervalMetricsResponse, Handle);
hFunc(TEvSysView::TEvGetQueryMetricsRequest, Handle);
+ hFunc(TEvSysView::TEvSendTopPartitions, Handle);
+ hFunc(TEvSysView::TEvGetTopPartitionsRequest, Handle);
hFunc(TEvSysView::TEvSendDbCountersRequest, Handle);
hFunc(TEvPrivate::TEvApplyCounters, Handle);
hFunc(TEvPrivate::TEvSendNavigate, Handle);
@@ -261,10 +289,6 @@ private:
std::unordered_set<TNodeId> SummaryNodes;
// IntervalMetrics
- struct TQueryToMetrics {
- NKikimrSysView::TQueryMetrics Metrics;
- TString Text;
- };
std::unordered_map<TQueryHash, TQueryToMetrics> QueryMetrics;
// NodesToRequest
@@ -282,14 +306,14 @@ private:
std::unordered_map<TNodeId, TNodeToQueries> NodesInFlight;
// IntervalTops
- TTop ByDurationMinute;
- TTop ByReadBytesMinute;
- TTop ByCpuTimeMinute;
- TTop ByRequestUnitsMinute;
- TTop ByDurationHour;
- TTop ByReadBytesHour;
- TTop ByCpuTimeHour;
- TTop ByRequestUnitsHour;
+ TQueryTop ByDurationMinute;
+ TQueryTop ByReadBytesMinute;
+ TQueryTop ByCpuTimeMinute;
+ TQueryTop ByRequestUnitsMinute;
+ TQueryTop ByDurationHour;
+ TQueryTop ByReadBytesHour;
+ TQueryTop ByCpuTimeHour;
+ TQueryTop ByRequestUnitsHour;
// Metrics...
using TResultMetricsMap = TResultMap<TQueryToMetrics>;
@@ -307,9 +331,22 @@ private:
TResultStatsMap TopByRequestUnitsOneMinute;
TResultStatsMap TopByRequestUnitsOneHour;
+ // IntervalPartitionTops
+ TPartitionTop PartitionTopMinute;
+ TPartitionTop PartitionTopHour;
+
+ // TopPartitions...
+ TResultPartitionsMap TopPartitionsOneMinute;
+ TResultPartitionsMap TopPartitionsOneHour;
+
// limited queue of user requests
static constexpr size_t PendingRequestsLimit = 5;
- std::queue<TEvSysView::TEvGetQueryMetricsRequest::TPtr> PendingRequests;
+
+ using TVariantRequestPtr = std::variant<
+ TEvSysView::TEvGetQueryMetricsRequest::TPtr,
+ TEvSysView::TEvGetTopPartitionsRequest::TPtr>;
+
+ std::queue<TVariantRequestPtr> PendingRequests;
bool ProcessInFly = false;
// db counters
diff --git a/ydb/core/sys_view/processor/schema.h b/ydb/core/sys_view/processor/schema.h
index 159ad84f58..e1684b3aaa 100644
--- a/ydb/core/sys_view/processor/schema.h
+++ b/ydb/core/sys_view/processor/schema.h
@@ -60,7 +60,7 @@ struct TProcessorSchema : NIceDb::Schema {
ByDuration, ByReadBytes, ByCpuTime, ByRequestUnits>;
};
-#define RESULT_TABLE(TableName, TableID) \
+#define RESULT_QUERY_TABLE(TableName, TableID) \
struct TableName : Table<TableID> { \
struct IntervalEnd : Column<1, NScheme::NTypeIds::Timestamp> {}; \
struct Rank : Column<2, NScheme::NTypeIds::Uint32> {}; \
@@ -71,18 +71,44 @@ struct TProcessorSchema : NIceDb::Schema {
using TColumns = TableColumns<IntervalEnd, Rank, Text, Data>; \
};
- RESULT_TABLE(MetricsOneMinute, 6)
- RESULT_TABLE(MetricsOneHour, 7)
- RESULT_TABLE(TopByDurationOneMinute, 8)
- RESULT_TABLE(TopByDurationOneHour, 9)
- RESULT_TABLE(TopByReadBytesOneMinute, 10)
- RESULT_TABLE(TopByReadBytesOneHour, 11)
- RESULT_TABLE(TopByCpuTimeOneMinute, 12)
- RESULT_TABLE(TopByCpuTimeOneHour, 13)
- RESULT_TABLE(TopByRequestUnitsOneMinute, 14)
- RESULT_TABLE(TopByRequestUnitsOneHour, 15)
+ RESULT_QUERY_TABLE(MetricsOneMinute, 6)
+ RESULT_QUERY_TABLE(MetricsOneHour, 7)
+ RESULT_QUERY_TABLE(TopByDurationOneMinute, 8)
+ RESULT_QUERY_TABLE(TopByDurationOneHour, 9)
+ RESULT_QUERY_TABLE(TopByReadBytesOneMinute, 10)
+ RESULT_QUERY_TABLE(TopByReadBytesOneHour, 11)
+ RESULT_QUERY_TABLE(TopByCpuTimeOneMinute, 12)
+ RESULT_QUERY_TABLE(TopByCpuTimeOneHour, 13)
+ RESULT_QUERY_TABLE(TopByRequestUnitsOneMinute, 14)
+ RESULT_QUERY_TABLE(TopByRequestUnitsOneHour, 15)
+
+#undef RESULT_QUERY_TABLE
+
+ struct IntervalPartitionTops : Table<16> {
+ struct TypeCol : Column<1, NScheme::NTypeIds::Uint32> {
+ static TString GetColumnName(const TString&) { return "Type"; }
+ };
+ struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct Data : Column<3, NScheme::NTypeIds::String> {};
+
+ using TKey = TableKey<TypeCol, TabletId>;
+ using TColumns = TableColumns<TypeCol, TabletId, Data>;
+ };
+
+#define RESULT_PARTITION_TABLE(TableName, TableID) \
+ struct TableName : Table<TableID> { \
+ struct IntervalEnd : Column<1, NScheme::NTypeIds::Timestamp> {}; \
+ struct Rank : Column<2, NScheme::NTypeIds::Uint32> {}; \
+ struct Data : Column<3, NScheme::NTypeIds::String> {}; \
+ \
+ using TKey = TableKey<IntervalEnd, Rank>; \
+ using TColumns = TableColumns<IntervalEnd, Rank, Data>; \
+ };
+
+ RESULT_PARTITION_TABLE(TopPartitionsOneMinute, 17)
+ RESULT_PARTITION_TABLE(TopPartitionsOneHour, 18)
-#undef RESULT_TABLE
+#undef RESULT_PARTITION_TABLE
using TTables = SchemaTables<
SysParams,
@@ -99,7 +125,10 @@ struct TProcessorSchema : NIceDb::Schema {
TopByCpuTimeOneMinute,
TopByCpuTimeOneHour,
TopByRequestUnitsOneMinute,
- TopByRequestUnitsOneHour
+ TopByRequestUnitsOneHour,
+ IntervalPartitionTops,
+ TopPartitionsOneMinute,
+ TopPartitionsOneHour
>;
using TSettings = SchemaSettings<
diff --git a/ydb/core/sys_view/processor/tx_aggregate.cpp b/ydb/core/sys_view/processor/tx_aggregate.cpp
index 654b47a689..ef00be464c 100644
--- a/ydb/core/sys_view/processor/tx_aggregate.cpp
+++ b/ydb/core/sys_view/processor/tx_aggregate.cpp
@@ -97,7 +97,7 @@ struct TSysViewProcessor::TTxAggregate : public TTxBase {
Self->ClearIntervalSummaries(db);
if (Self->NodesToRequest.empty()) {
- Self->PersistResults(db);
+ Self->PersistQueryResults(db);
}
Self->CurrentStage = AGGREGATE;
diff --git a/ydb/core/sys_view/processor/tx_collect.cpp b/ydb/core/sys_view/processor/tx_collect.cpp
index 67a197c366..d04cc5edb0 100644
--- a/ydb/core/sys_view/processor/tx_collect.cpp
+++ b/ydb/core/sys_view/processor/tx_collect.cpp
@@ -16,8 +16,9 @@ struct TSysViewProcessor::TTxCollect : public TTxBase {
NIceDb::TNiceDb db(txc.DB);
if (!Self->NodesInFlight.empty() || !Self->NodesToRequest.empty()) {
- Self->PersistResults(db);
+ Self->PersistQueryResults(db);
}
+ Self->PersistPartitionResults(db);
Self->Reset(db, ctx);
diff --git a/ydb/core/sys_view/processor/tx_init.cpp b/ydb/core/sys_view/processor/tx_init.cpp
index 89c7fd637f..71a5a13581 100644
--- a/ydb/core/sys_view/processor/tx_init.cpp
+++ b/ydb/core/sys_view/processor/tx_init.cpp
@@ -10,8 +10,8 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
TTxType GetTxType() const override { return TXTYPE_INIT; }
- template <typename TSchema, typename TEntry>
- bool LoadResults(NIceDb::TNiceDb& db, TResultMap<TEntry>& results) {
+ template <typename TSchema, typename TMap>
+ bool LoadQueryResults(NIceDb::TNiceDb& db, TMap& results) {
results.clear();
auto rowset = db.Table<TSchema>().Range().Select();
@@ -27,7 +27,8 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
auto key = std::make_pair(intervalEnd, rank);
auto& result = results[key];
- if constexpr (std::is_same<TEntry, TQueryToMetrics>::value) {
+
+ if constexpr (std::is_same<typename TMap::mapped_type, TQueryToMetrics>::value) {
result.Text = std::move(text);
if (data) {
Y_PROTOBUF_SUPPRESS_NODISCARD result.Metrics.ParseFromString(data);
@@ -46,11 +47,43 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
SVLOG_D("[" << Self->TabletID() << "] Loading results: "
<< "table# " << TSchema::TableId
- << ", results count# " << results.size());
+ << ", result count# " << results.size());
+
+ return true;
+ };
+
+ template <typename S>
+ bool LoadPartitionResults(NIceDb::TNiceDb& db, TSelf::TResultPartitionsMap& results) {
+ results.clear();
+ auto rowset = db.Table<S>().Range().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ ui64 intervalEnd = rowset.template GetValue<typename S::IntervalEnd>();
+ ui32 rank = rowset.template GetValue<typename S::Rank>();
+ TString data = rowset.template GetValue<typename S::Data>();
+
+ auto key = std::make_pair(intervalEnd, rank);
+ auto& result = results[key];
+ if (data) {
+ Y_PROTOBUF_SUPPRESS_NODISCARD result.ParseFromString(data);
+ }
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ SVLOG_D("[" << Self->TabletID() << "] Loading results: "
+ << "table# " << S::TableId
+ << ", result count# " << results.size());
return true;
};
+
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
SVLOG_D("[" << Self->TabletID() << "] TTxInit::Execute");
@@ -72,6 +105,9 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
auto cpuTimeOneHourRowset = db.Table<Schema::TopByCpuTimeOneHour>().Range().Select();
auto reqUnitsOneMinuteRowset = db.Table<Schema::TopByRequestUnitsOneMinute>().Range().Select();
auto reqUnitsOneHourRowset = db.Table<Schema::TopByRequestUnitsOneHour>().Range().Select();
+ auto intervalPartitionTopsRowset = db.Table<Schema::IntervalPartitionTops>().Range().Select();
+ auto topPartitionsOneMinuteRowset = db.Table<Schema::TopPartitionsOneMinute>().Range().Select();
+ auto topPartitionsOneHourRowset = db.Table<Schema::TopPartitionsOneHour>().Range().Select();
if (!sysParamsRowset.IsReady() ||
!intervalSummariesRowset.IsReady() ||
@@ -87,7 +123,10 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
!cpuTimeOneMinuteRowset.IsReady() ||
!cpuTimeOneHourRowset.IsReady() ||
!reqUnitsOneMinuteRowset.IsReady() ||
- !reqUnitsOneHourRowset.IsReady())
+ !reqUnitsOneHourRowset.IsReady() ||
+ !intervalPartitionTopsRowset.IsReady() ||
+ !topPartitionsOneMinuteRowset.IsReady() ||
+ !topPartitionsOneHourRowset.IsReady())
{
return false;
}
@@ -251,7 +290,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
Self->ByRequestUnitsHour.emplace_back(std::move(query));
break;
default:
- SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected stats type: " << type);
+ SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected query stats type: " << type);
}
++queryCount;
@@ -269,7 +308,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
std::sort(Self->ByRequestUnitsMinute.begin(), Self->ByRequestUnitsMinute.end(), TopQueryCompare);
std::sort(Self->ByRequestUnitsHour.begin(), Self->ByRequestUnitsHour.end(), TopQueryCompare);
- SVLOG_D("[" << Self->TabletID() << "] Loading interval tops: "
+ SVLOG_D("[" << Self->TabletID() << "] Loading interval query tops: "
<< "total query count# " << queryCount);
}
@@ -320,28 +359,84 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
}
// Metrics...
- if (!LoadResults<Schema::MetricsOneMinute, TQueryToMetrics>(db, Self->MetricsOneMinute))
+ if (!LoadQueryResults<Schema::MetricsOneMinute>(db, Self->MetricsOneMinute))
return false;
- if (!LoadResults<Schema::MetricsOneHour, TQueryToMetrics>(db, Self->MetricsOneHour))
+ if (!LoadQueryResults<Schema::MetricsOneHour>(db, Self->MetricsOneHour))
return false;
// TopBy...
- using TStats = NKikimrSysView::TQueryStats;
- if (!LoadResults<Schema::TopByDurationOneMinute, TStats>(db, Self->TopByDurationOneMinute))
+ if (!LoadQueryResults<Schema::TopByDurationOneMinute>(db, Self->TopByDurationOneMinute))
+ return false;
+ if (!LoadQueryResults<Schema::TopByDurationOneHour>(db, Self->TopByDurationOneHour))
return false;
- if (!LoadResults<Schema::TopByDurationOneHour, TStats>(db, Self->TopByDurationOneHour))
+ if (!LoadQueryResults<Schema::TopByReadBytesOneMinute>(db, Self->TopByReadBytesOneMinute))
return false;
- if (!LoadResults<Schema::TopByReadBytesOneMinute, TStats>(db, Self->TopByReadBytesOneMinute))
+ if (!LoadQueryResults<Schema::TopByReadBytesOneHour>(db, Self->TopByReadBytesOneHour))
return false;
- if (!LoadResults<Schema::TopByReadBytesOneHour, TStats>(db, Self->TopByReadBytesOneHour))
+ if (!LoadQueryResults<Schema::TopByCpuTimeOneMinute>(db, Self->TopByCpuTimeOneMinute))
return false;
- if (!LoadResults<Schema::TopByCpuTimeOneMinute, TStats>(db, Self->TopByCpuTimeOneMinute))
+ if (!LoadQueryResults<Schema::TopByCpuTimeOneHour>(db, Self->TopByCpuTimeOneHour))
return false;
- if (!LoadResults<Schema::TopByCpuTimeOneHour, TStats>(db, Self->TopByCpuTimeOneHour))
+ if (!LoadQueryResults<Schema::TopByRequestUnitsOneMinute>(db, Self->TopByRequestUnitsOneMinute))
return false;
- if (!LoadResults<Schema::TopByRequestUnitsOneMinute, TStats>(db, Self->TopByRequestUnitsOneMinute))
+ if (!LoadQueryResults<Schema::TopByRequestUnitsOneHour>(db, Self->TopByRequestUnitsOneHour))
+ return false;
+
+ // IntervalPartitionTops
+ {
+ Self->PartitionTopMinute.clear();
+ Self->PartitionTopMinute.reserve(TOP_PARTITIONS_COUNT);
+ Self->PartitionTopHour.clear();
+ Self->PartitionTopHour.reserve(TOP_PARTITIONS_COUNT);
+
+ auto rowset = db.Table<Schema::IntervalPartitionTops>().Range().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ size_t partCount = 0;
+ while (!rowset.EndOfSet()) {
+ ui32 type = rowset.GetValue<Schema::IntervalPartitionTops::TypeCol>();
+ TString data = rowset.GetValue<Schema::IntervalPartitionTops::Data>();
+
+ if (data) {
+ auto partition = MakeHolder<NKikimrSysView::TTopPartitionsInfo>();
+ Y_PROTOBUF_SUPPRESS_NODISCARD partition->ParseFromString(data);
+
+ switch ((NKikimrSysView::EStatsType)type) {
+ case NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE:
+ Self->PartitionTopMinute.emplace_back(std::move(partition));
+ break;
+ case NKikimrSysView::TOP_PARTITIONS_ONE_HOUR:
+ Self->PartitionTopHour.emplace_back(std::move(partition));
+ break;
+ default:
+ SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected partition stats type: " << type);
+ }
+ ++partCount;
+ }
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ auto compare = [] (const auto& l, const auto& r) {
+ return l->GetCPUCores() == r->GetCPUCores() ?
+ l->GetTabletId() < r->GetTabletId() : l->GetCPUCores() > r->GetCPUCores();
+ };
+
+ std::sort(Self->PartitionTopMinute.begin(), Self->PartitionTopMinute.end(), compare);
+ std::sort(Self->PartitionTopHour.begin(), Self->PartitionTopHour.end(), compare);
+
+ SVLOG_D("[" << Self->TabletID() << "] Loading interval partition tops: "
+ << "partition count# " << partCount);
+ }
+
+ // TopPartitions...
+ if (!LoadPartitionResults<Schema::TopPartitionsOneMinute>(db, Self->TopPartitionsOneMinute))
return false;
- if (!LoadResults<Schema::TopByRequestUnitsOneHour, TStats>(db, Self->TopByRequestUnitsOneHour))
+ if (!LoadPartitionResults<Schema::TopPartitionsOneHour>(db, Self->TopPartitionsOneHour))
return false;
auto deadline = Self->IntervalEnd + Self->TotalInterval;
diff --git a/ydb/core/sys_view/processor/tx_init_schema.cpp b/ydb/core/sys_view/processor/tx_init_schema.cpp
index e0485d741f..8a1df28d68 100644
--- a/ydb/core/sys_view/processor/tx_init_schema.cpp
+++ b/ydb/core/sys_view/processor/tx_init_schema.cpp
@@ -25,7 +25,9 @@ struct TSysViewProcessor::TTxInitSchema : public TTxBase {
Schema::TopByCpuTimeOneMinute::TableId,
Schema::TopByCpuTimeOneHour::TableId,
Schema::TopByRequestUnitsOneMinute::TableId,
- Schema::TopByRequestUnitsOneHour::TableId
+ Schema::TopByRequestUnitsOneHour::TableId,
+ Schema::TopPartitionsOneMinute::TableId,
+ Schema::TopPartitionsOneHour::TableId
};
for (auto id : resultTableIds) {
diff --git a/ydb/core/sys_view/processor/tx_interval_metrics.cpp b/ydb/core/sys_view/processor/tx_interval_metrics.cpp
index 4c6d992487..41cd7eeb5a 100644
--- a/ydb/core/sys_view/processor/tx_interval_metrics.cpp
+++ b/ydb/core/sys_view/processor/tx_interval_metrics.cpp
@@ -52,7 +52,7 @@ struct TSysViewProcessor::TTxIntervalMetrics : public TTxBase {
NIceDb::TUpdate<Schema::IntervalMetrics::Metrics>(serialized));
}
- auto fillTops = [&] (TTop& minuteTop, TTop& hourTop,
+ auto fillTops = [&] (TQueryTop& minuteTop, TQueryTop& hourTop,
NKikimrSysView::EStatsType minuteType, NKikimrSysView::EStatsType hourType,
const NProtoBuf::RepeatedPtrField<NKikimrSysView::TQueryStats>& queryStats)
{
@@ -102,14 +102,16 @@ struct TSysViewProcessor::TTxIntervalMetrics : public TTxBase {
db.Table<Schema::NodesToRequest>().Key(NodeId).Delete();
if (Self->NodesInFlight.empty() && Self->NodesToRequest.empty()) {
- Self->PersistResults(db);
- } else {
- Self->SendRequests();
+ Self->PersistQueryResults(db);
}
return true;
}
void Complete(const TActorContext&) override {
+ if (!Self->NodesToRequest.empty()) {
+ Self->SendRequests();
+ }
+
SVLOG_D("[" << Self->TabletID() << "] TTxIntervalMetrics::Complete");
}
};
diff --git a/ydb/core/sys_view/processor/tx_interval_summary.cpp b/ydb/core/sys_view/processor/tx_interval_summary.cpp
index 35fe2789e1..6a89374eb8 100644
--- a/ydb/core/sys_view/processor/tx_interval_summary.cpp
+++ b/ydb/core/sys_view/processor/tx_interval_summary.cpp
@@ -68,9 +68,9 @@ struct TSysViewProcessor::TTxIntervalSummary : public TTxBase {
TNodeId nodeId,
NKikimrSysView::EStatsType statsType,
const NKikimrSysView::TEvIntervalQuerySummary::TQuerySet& queries,
- TTop& top)
+ TQueryTop& top)
{
- TTop result;
+ TQueryTop result;
std::unordered_set<TQueryHash> seenHashes;
size_t queryIndex = 0;
auto topIt = top.begin();
diff --git a/ydb/core/sys_view/processor/tx_top_partitions.cpp b/ydb/core/sys_view/processor/tx_top_partitions.cpp
new file mode 100644
index 0000000000..2048e40e83
--- /dev/null
+++ b/ydb/core/sys_view/processor/tx_top_partitions.cpp
@@ -0,0 +1,121 @@
+#include "processor_impl.h"
+
+namespace NKikimr::NSysView {
+
+struct TSysViewProcessor::TTxTopPartitions : public TTxBase {
+ NKikimrSysView::TEvSendTopPartitions Record;
+
+ TTxTopPartitions(TSelf* self, NKikimrSysView::TEvSendTopPartitions&& record)
+ : TTxBase(self)
+ , Record(std::move(record))
+ {}
+
+ TTxType GetTxType() const override { return TXTYPE_TOP_PARTITIONS; }
+
+ void ProcessTop(NIceDb::TNiceDb& db, NKikimrSysView::EStatsType statsType,
+ TPartitionTop& top)
+ {
+ TPartitionTop result;
+ result.reserve(TOP_PARTITIONS_COUNT);
+ std::unordered_set<ui64> seen;
+ size_t index = 0;
+ auto topIt = top.begin();
+
+ auto copyNewPartition = [&] () {
+ const auto& newPartition = Record.GetPartitions(index);
+ auto tabletId = newPartition.GetTabletId();
+
+ TString data;
+ Y_PROTOBUF_SUPPRESS_NODISCARD newPartition.SerializeToString(&data);
+
+ auto partition = MakeHolder<NKikimrSysView::TTopPartitionsInfo>();
+ partition->CopyFrom(newPartition);
+ result.emplace_back(std::move(partition));
+
+ db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, tabletId).Update(
+ NIceDb::TUpdate<Schema::IntervalPartitionTops::Data>(data));
+
+ seen.insert(tabletId);
+ ++index;
+ };
+
+ while (result.size() < TOP_PARTITIONS_COUNT) {
+ if (topIt == top.end()) {
+ if (index == Record.PartitionsSize()) {
+ break;
+ }
+ auto tabletId = Record.GetPartitions(index).GetTabletId();
+ if (seen.find(tabletId) != seen.end()) {
+ ++index;
+ continue;
+ }
+ copyNewPartition();
+ } else {
+ auto topTabletId = (*topIt)->GetTabletId();
+ if (seen.find(topTabletId) != seen.end()) {
+ ++topIt;
+ continue;
+ }
+ if (index == Record.PartitionsSize()) {
+ result.emplace_back(std::move(*topIt++));
+ seen.insert(topTabletId);
+ continue;
+ }
+ auto& newPartition = Record.GetPartitions(index);
+ auto tabletId = newPartition.GetTabletId();
+ if (seen.find(tabletId) != seen.end()) {
+ ++index;
+ continue;
+ }
+ if ((*topIt)->GetCPUCores() >= newPartition.GetCPUCores()) {
+ result.emplace_back(std::move(*topIt++));
+ seen.insert(topTabletId);
+ } else {
+ copyNewPartition();
+ }
+ }
+ }
+
+ for (; topIt != top.end(); ++topIt) {
+ auto topTabletId = (*topIt)->GetTabletId();
+ if (seen.find(topTabletId) != seen.end()) {
+ continue;
+ }
+ db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, topTabletId).Delete();
+ }
+
+ top.swap(result);
+ }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ SVLOG_D("[" << Self->TabletID() << "] TTxTopPartitions::Execute: "
+ << "partition count# " << Record.PartitionsSize());
+
+ NIceDb::TNiceDb db(txc.DB);
+ ProcessTop(db, NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE, Self->PartitionTopMinute);
+ ProcessTop(db, NKikimrSysView::TOP_PARTITIONS_ONE_HOUR, Self->PartitionTopHour);
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ SVLOG_D("[" << Self->TabletID() << "] TTxTopPartitions::Complete");
+ }
+};
+
+void TSysViewProcessor::Handle(TEvSysView::TEvSendTopPartitions::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+ auto timeUs = record.GetTimeUs();
+ auto partitionIntervalEnd = IntervalEnd + TotalInterval;
+
+ if (timeUs < IntervalEnd.MicroSeconds() || timeUs >= partitionIntervalEnd.MicroSeconds()) {
+ SVLOG_W("[" << TabletID() << "] TEvSendTopPartitions, time mismath: "
+ << ", partition interval end# " << partitionIntervalEnd
+ << ", event time# " << TInstant::MicroSeconds(timeUs));
+ return;
+ }
+
+ Execute(new TTxTopPartitions(this, std::move(record)), TActivationContext::AsActorContext());
+}
+
+} // NKikimr::NSysView
diff --git a/ydb/core/sys_view/processor/ya.make b/ydb/core/sys_view/processor/ya.make
index ddf4664cfa..d98787c9b2 100644
--- a/ydb/core/sys_view/processor/ya.make
+++ b/ydb/core/sys_view/processor/ya.make
@@ -20,6 +20,7 @@ SRCS(
tx_aggregate.cpp
tx_interval_summary.cpp
tx_interval_metrics.cpp
+ tx_top_partitions.cpp
)
PEERDIR(
diff --git a/ydb/core/sys_view/query_stats/query_metrics.cpp b/ydb/core/sys_view/query_stats/query_metrics.cpp
index 5f2acb5c23..d303ea3978 100644
--- a/ydb/core/sys_view/query_stats/query_metrics.cpp
+++ b/ydb/core/sys_view/query_stats/query_metrics.cpp
@@ -1,17 +1,9 @@
#include "query_metrics.h"
-#include <ydb/core/sys_view/common/common.h>
#include <ydb/core/sys_view/common/events.h>
-#include <ydb/core/sys_view/common/keys.h>
-#include <ydb/core/sys_view/common/schema.h>
-#include <ydb/core/sys_view/common/scan_actor_base_impl.h>
+#include <ydb/core/sys_view/common/processor_scan.h>
-#include <ydb/core/base/tablet_pipecache.h>
-
-#include <library/cpp/actors/core/hfunc.h>
-
-namespace NKikimr {
-namespace NSysView {
+namespace NKikimr::NSysView {
using namespace NActors;
@@ -25,149 +17,66 @@ void SetField<1>(NKikimrSysView::TQueryMetricsKey& key, ui32 value) {
key.SetRank(value);
}
-class TQueryMetricsScan : public TScanActorBase<TQueryMetricsScan> {
-public:
- using TBase = TScanActorBase<TQueryMetricsScan>;
-
- static constexpr auto ActorActivityType() {
- return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN;
- }
-
- TQueryMetricsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
- const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns)
- : TBase(ownerId, scanId, tableId, tableRange, columns)
- {
- ConvertKeyRange<NKikimrSysView::TEvGetQueryMetricsRequest, ui64, ui32>(Request, TableRange);
- Request.SetType(NKikimrSysView::METRICS_ONE_MINUTE);
- }
-
- STFUNC(StateScan) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle);
- hFunc(TEvSysView::TEvGetQueryMetricsResponse, Handle);
- hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
- hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution);
- cFunc(TEvents::TEvWakeup::EventType, HandleTimeout);
- cFunc(TEvents::TEvPoison::EventType, PassAway);
- default:
- LOG_CRIT(ctx, NKikimrServices::SYSTEM_VIEWS,
- "NSysView::TQueryMetricsScan: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
- }
- }
-
-private:
- void ProceedToScan() override {
- Become(&TThis::StateScan);
- if (AckReceived) {
- RequestBatch();
- }
- }
-
- void RequestBatch() {
- if (!SysViewProcessorId) {
- SVLOG_W("No sysview processor for database " << TenantName
- << ", sending empty response");
- ReplyEmptyAndDie();
- return;
- }
-
- if (BatchRequestInFlight) {
- return;
- }
-
- auto request = MakeHolder<TEvSysView::TEvGetQueryMetricsRequest>();
- request->Record.CopyFrom(Request);
-
- Send(MakePipePeNodeCacheID(false),
- new TEvPipeCache::TEvForward(request.Release(), SysViewProcessorId, true),
- IEventHandle::FlagTrackDelivery);
-
- BatchRequestInFlight = true;
- }
-
- void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) {
- RequestBatch();
- }
-
- void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) {
- ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, "Delivery problem in query metrics scan");
- }
-
- void Handle(TEvSysView::TEvGetQueryMetricsResponse::TPtr& ev) {
- using TEntry = NKikimrSysView::TQueryMetricsEntry;
- using TExtractor = std::function<TCell(const TEntry&)>;
- using TSchema = Schema::QueryMetrics;
-
- struct TExtractorsMap : public THashMap<NTable::TTag, TExtractor> {
- TExtractorsMap() {
- insert({TSchema::IntervalEnd::ColumnId, [] (const TEntry& entry) {
- return TCell::Make<ui64>(entry.GetKey().GetIntervalEndUs());
- }});
- insert({TSchema::Rank::ColumnId, [] (const TEntry& entry) {
- return TCell::Make<ui32>(entry.GetKey().GetRank());
- }});
- insert({TSchema::QueryText::ColumnId, [] (const TEntry& entry) {
- const auto& text = entry.GetQueryText();
- return TCell(text.data(), text.size());
- }});
- insert({TSchema::Count::ColumnId, [] (const TEntry& entry) {
- return TCell::Make<ui64>(entry.GetMetrics().GetCount());
- }});
-
-#define ADD_METRICS(FieldName, MetricsName) \
- insert({TSchema::Sum ## FieldName::ColumnId, [] (const TEntry& entry) { \
- return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetSum()); \
- }}); \
- insert({TSchema::Min ## FieldName::ColumnId, [] (const TEntry& entry) { \
- return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetMin()); \
- }}); \
- insert({TSchema::Max ## FieldName::ColumnId, [] (const TEntry& entry) { \
- return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetMax()); \
- }});
-
- ADD_METRICS(CPUTime, CpuTimeUs);
- ADD_METRICS(Duration, DurationUs);
- ADD_METRICS(ReadRows, ReadRows);
- ADD_METRICS(ReadBytes, ReadBytes);
- ADD_METRICS(UpdateRows, UpdateRows);
- ADD_METRICS(UpdateBytes, UpdateBytes);
- ADD_METRICS(DeleteRows, DeleteRows);
- ADD_METRICS(RequestUnits, RequestUnits);
+struct TQueryMetricsExtractorsMap :
+ public std::unordered_map<NTable::TTag, TExtractorFunc<NKikimrSysView::TQueryMetricsEntry>>
+{
+ using S = Schema::QueryMetrics;
+ using E = NKikimrSysView::TQueryMetricsEntry;
+
+ TQueryMetricsExtractorsMap() {
+ insert({S::IntervalEnd::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui64>(entry.GetKey().GetIntervalEndUs());
+ }});
+ insert({S::Rank::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui32>(entry.GetKey().GetRank());
+ }});
+ insert({S::QueryText::ColumnId, [] (const E& entry) {
+ const auto& text = entry.GetQueryText();
+ return TCell(text.data(), text.size());
+ }});
+ insert({S::Count::ColumnId, [] (const E& entry) {
+ return TCell::Make<ui64>(entry.GetMetrics().GetCount());
+ }});
+
+#define ADD_METRICS(FieldName, MetricsName) \
+ insert({S::Sum ## FieldName::ColumnId, [] (const E& entry) { \
+ return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetSum()); \
+ }}); \
+ insert({S::Min ## FieldName::ColumnId, [] (const E& entry) { \
+ return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetMin()); \
+ }}); \
+ insert({S::Max ## FieldName::ColumnId, [] (const E& entry) { \
+ return TCell::Make<ui64>(entry.GetMetrics().Get ## MetricsName().GetMax()); \
+ }});
+
+ ADD_METRICS(CPUTime, CpuTimeUs);
+ ADD_METRICS(Duration, DurationUs);
+ ADD_METRICS(ReadRows, ReadRows);
+ ADD_METRICS(ReadBytes, ReadBytes);
+ ADD_METRICS(UpdateRows, UpdateRows);
+ ADD_METRICS(UpdateBytes, UpdateBytes);
+ ADD_METRICS(DeleteRows, DeleteRows);
+ ADD_METRICS(RequestUnits, RequestUnits);
#undef ADD_METRICS
- }
- };
-
- const auto& record = ev->Get()->Record;
- if (record.HasOverloaded() && record.GetOverloaded()) {
- ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, "SysViewProcessor is overloaded");
- return;
- }
-
- ReplyBatch<TEvSysView::TEvGetQueryMetricsResponse, TEntry, TExtractorsMap, true>(ev);
-
- if (!record.GetLastBatch()) {
- Y_VERIFY(record.HasNext());
- Request.MutableFrom()->CopyFrom(record.GetNext());
- Request.SetInclusiveFrom(true);
- }
-
- BatchRequestInFlight = false;
- }
-
- void PassAway() override {
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
- TBase::PassAway();
}
-
-private:
- NKikimrSysView::TEvGetQueryMetricsRequest Request;
};
THolder<IActor> CreateQueryMetricsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns)
{
- return MakeHolder<TQueryMetricsScan>(ownerId, scanId, tableId, tableRange, columns);
+ using TQueryMetricsScan = TProcessorScan<
+ NKikimrSysView::TQueryMetricsEntry,
+ NKikimrSysView::TEvGetQueryMetricsRequest,
+ NKikimrSysView::TEvGetQueryMetricsResponse,
+ TEvSysView::TEvGetQueryMetricsRequest,
+ TEvSysView::TEvGetQueryMetricsResponse,
+ TQueryMetricsExtractorsMap,
+ ui64,
+ ui32
+ >;
+
+ return MakeHolder<TQueryMetricsScan>(ownerId, scanId, tableId, tableRange, columns,
+ NKikimrSysView::METRICS_ONE_MINUTE);
}
-} // NSysView
-} // NKikimr
+} // NKikimr::NSysView
diff --git a/ydb/core/sys_view/query_stats/query_stats.cpp b/ydb/core/sys_view/query_stats/query_stats.cpp
index 918c9b781e..99134412fd 100644
--- a/ydb/core/sys_view/query_stats/query_stats.cpp
+++ b/ydb/core/sys_view/query_stats/query_stats.cpp
@@ -506,9 +506,11 @@ private:
NKikimrSysView::TEvGetQueryMetricsRequest Request;
};
-THolder<IActor> CreateQueryStatsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TStringBuf viewName,
+THolder<IActor> CreateQueryStatsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns)
{
+ auto viewName = tableId.SysViewInfo;
+
if (viewName == TopQueriesByDuration1MinuteName) {
return MakeHolder<TQueryStatsScan<TDurationGreater>>(ownerId, scanId, tableId, tableRange, columns,
NKikimrSysView::TOP_DURATION_ONE_MINUTE,
diff --git a/ydb/core/sys_view/query_stats/query_stats.h b/ydb/core/sys_view/query_stats/query_stats.h
index b9fe4b7ecc..a7a76b9b57 100644
--- a/ydb/core/sys_view/query_stats/query_stats.h
+++ b/ydb/core/sys_view/query_stats/query_stats.h
@@ -17,8 +17,7 @@ struct TQueryStatsBucketRange {
explicit TQueryStatsBucketRange(const TSerializedTableRange& range, const TDuration& bucketSize);
};
-THolder<IActor> CreateQueryStatsScan(const TActorId& ownerId, ui32 scanId,
- const TTableId& tableId, const TStringBuf viewName,
+THolder<IActor> CreateQueryStatsScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns);
} // NSysView
diff --git a/ydb/core/sys_view/scan.cpp b/ydb/core/sys_view/scan.cpp
index f337cb6ba7..9a2489607e 100644
--- a/ydb/core/sys_view/scan.cpp
+++ b/ydb/core/sys_view/scan.cpp
@@ -11,6 +11,7 @@
#include <ydb/core/sys_view/storage/storage_pools.h>
#include <ydb/core/sys_view/storage/storage_stats.h>
#include <ydb/core/sys_view/tablets/tablets.h>
+#include <ydb/core/sys_view/partition_stats/top_partitions.h>
namespace NKikimr {
namespace NSysView {
@@ -35,7 +36,7 @@ THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const
tableId.SysViewInfo == TopQueriesByRequestUnits1MinuteName ||
tableId.SysViewInfo == TopQueriesByRequestUnits1HourName)
{
- return CreateQueryStatsScan(ownerId, scanId, tableId, tableId.SysViewInfo, tableRange, columns);
+ return CreateQueryStatsScan(ownerId, scanId, tableId, tableRange, columns);
}
if (tableId.SysViewInfo == PDisksName) {
@@ -66,7 +67,13 @@ THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const
return CreateQueryMetricsScan(ownerId, scanId, tableId, tableRange, columns);
}
- return nullptr;
+ if (tableId.SysViewInfo == TopPartitions1MinuteName ||
+ tableId.SysViewInfo == TopPartitions1HourName)
+ {
+ return CreateTopPartitionsScan(ownerId, scanId, tableId, tableRange, columns);
+ }
+
+ return {};
}
} // NSysView
diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp
index 525bc5787a..8f67963059 100644
--- a/ydb/core/sys_view/ut_common.cpp
+++ b/ydb/core/sys_view/ut_common.cpp
@@ -59,6 +59,11 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool
Server = new Tests::TServer(*Settings);
Server->EnableGRpc(grpcPort);
+ auto* runtime = Server->GetRuntime();
+ for (ui32 i = 0; i < runtime->GetNodeCount(); ++i) {
+ runtime->GetAppData(i).UsePartitionStatsCollectorForTests = true;
+ }
+
Client = MakeHolder<Tests::TClient>(*Settings);
Tenants = MakeHolder<Tests::TTenants>(Server);
diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp
index 56eb5af269..7a44050218 100644
--- a/ydb/core/sys_view/ut_kqp.cpp
+++ b/ydb/core/sys_view/ut_kqp.cpp
@@ -49,31 +49,30 @@ void CreateTenants(TTestEnv& env, bool extSchemeShard = true) {
CreateTenant(env, "Tenant2", extSchemeShard);
}
-void CreateTables(TTestEnv& env) {
+void CreateTable(auto& session, const TString& name, ui64 partitionCount = 1) {
+ auto desc = TTableBuilder()
+ .AddNullableColumn("Key", EPrimitiveType::Uint64)
+ .AddNullableColumn("Value", EPrimitiveType::String)
+ .SetPrimaryKeyColumns({"Key"})
+ .Build();
+
+ auto settings = TCreateTableSettings();
+ settings.PartitioningPolicy(TPartitioningPolicy().UniformPartitions(partitionCount));
+
+ session.CreateTable(name, std::move(desc), std::move(settings)).GetValueSync();
+}
+
+void CreateTables(TTestEnv& env, ui64 partitionCount = 1) {
TTableClient client(env.GetDriver());
auto session = client.CreateSession().GetValueSync().GetSession();
- NKqp::AssertSuccessResult(session.ExecuteSchemeQuery(R"(
- CREATE TABLE `Root/Table0` (
- Key Uint64,
- Value String,
- PRIMARY KEY (Key)
- );
- )").GetValueSync());
-
+ CreateTable(session, "Root/Table0", partitionCount);
NKqp::AssertSuccessResult(session.ExecuteDataQuery(R"(
REPLACE INTO `Root/Table0` (Key, Value) VALUES
(0u, "Z");
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
- NKqp::AssertSuccessResult(session.ExecuteSchemeQuery(R"(
- CREATE TABLE `Root/Tenant1/Table1` (
- Key Uint64,
- Value String,
- PRIMARY KEY (Key)
- );
- )").GetValueSync());
-
+ CreateTable(session, "Root/Tenant1/Table1", partitionCount);
NKqp::AssertSuccessResult(session.ExecuteDataQuery(R"(
REPLACE INTO `Root/Tenant1/Table1` (Key, Value) VALUES
(1u, "A"),
@@ -81,14 +80,7 @@ void CreateTables(TTestEnv& env) {
(3u, "C");
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
- NKqp::AssertSuccessResult(session.ExecuteSchemeQuery(R"(
- CREATE TABLE `Root/Tenant2/Table2` (
- Key Uint64,
- Value String,
- PRIMARY KEY (Key)
- );
- )").GetValueSync());
-
+ CreateTable(session, "Root/Tenant2/Table2", partitionCount);
NKqp::AssertSuccessResult(session.ExecuteDataQuery(R"(
REPLACE INTO `Root/Tenant2/Table2` (Key, Value) VALUES
(4u, "D"),
@@ -96,9 +88,9 @@ void CreateTables(TTestEnv& env) {
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
}
-void CreateTenantsAndTables(TTestEnv& env, bool extSchemeShard = true) {
+void CreateTenantsAndTables(TTestEnv& env, bool extSchemeShard = true, ui64 partitionCount = 1) {
CreateTenants(env, extSchemeShard);
- CreateTables(env);
+ CreateTables(env, partitionCount);
}
void CreateRootTable(TTestEnv& env, ui64 partitionCount = 1) {
@@ -1151,6 +1143,198 @@ Y_UNIT_TEST_SUITE(SystemView) {
}
}
+ size_t GetRowCount(TTableClient& client, const TString& name) {
+ TStringBuilder query;
+ query << "SELECT * FROM `" << name << "`";
+ auto it = client.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ auto ysonString = NKqp::StreamResultToYson(it);
+ auto node = NYT::NodeFromYsonString(ysonString, ::NYson::EYsonType::Node);
+ UNIT_ASSERT(node.IsList());
+ return node.AsList().size();
+ }
+
+ ui64 GetIntervalEnd(TTableClient& client, const TString& name) {
+ TStringBuilder query;
+ query << "SELECT MAX(IntervalEnd) FROM `" << name << "`";
+ auto it = client.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ auto ysonString = NKqp::StreamResultToYson(it);
+ auto node = NYT::NodeFromYsonString(ysonString, ::NYson::EYsonType::Node);
+ UNIT_ASSERT(node.IsList());
+ UNIT_ASSERT(node.AsList().size() == 1);
+ auto row = node.AsList()[0];
+ UNIT_ASSERT(row.IsList());
+ UNIT_ASSERT(row.AsList().size() == 1);
+ auto value = row.AsList()[0];
+ UNIT_ASSERT(value.IsList());
+ UNIT_ASSERT(value.AsList().size() == 1);
+ return value.AsList()[0].AsUint64();
+ }
+
+ Y_UNIT_TEST(TopPartitionsFields) {
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(0);
+
+ auto nowUs = TInstant::Now().MicroSeconds();
+
+ TTestEnv env(1, 4, 0, true);
+ CreateTenantsAndTables(env);
+
+ TTableClient client(env.GetDriver());
+ size_t rowCount = 0;
+ for (size_t iter = 0; iter < 30 && !rowCount; ++iter) {
+ rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute");
+ if (!rowCount) {
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+ ui64 intervalEnd = GetIntervalEnd(client, "/Root/Tenant1/.sys/top_partitions_one_minute");
+
+ TStringBuilder query;
+ query << R"(
+ SELECT
+ IntervalEnd,
+ Rank,
+ TabletId,
+ Path,
+ PeakTime,
+ CPUCores,
+ NodeId,
+ DataSize,
+ RowCount,
+ IndexSize,
+ InFlightTxCount
+ FROM `/Root/Tenant1/.sys/top_partitions_one_minute`)"
+ << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp)";
+ auto it = client.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ auto ysonString = NKqp::StreamResultToYson(it);
+
+ TYsonFieldChecker check(ysonString, 11);
+ check.Uint64(intervalEnd); // IntervalEnd
+ check.Uint64(1); // Rank
+ check.Uint64Greater(0); // TabletId
+ check.String("/Root/Tenant1/Table1"); // Path
+ check.Uint64GreaterOrEquals(nowUs); // PeakTime
+ check.DoubleGreaterOrEquals(0.); // CPUCores
+ check.Uint64Greater(0); // NodeId
+ check.Uint64Greater(0); // DataSize
+ check.Uint64(3); // RowCount
+ check.Uint64(0); // IndexSize
+ check.Uint64(0); // InFlightTxCount
+ }
+
+ Y_UNIT_TEST(TopPartitionsTables) {
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(0);
+
+ constexpr ui64 partitionCount = 5;
+
+ TTestEnv env(1, 4, 0, true);
+ CreateTenantsAndTables(env, true, partitionCount);
+
+ TTableClient client(env.GetDriver());
+ size_t rowCount = 0;
+ for (size_t iter = 0; iter < 30 && rowCount < partitionCount; ++iter) {
+ rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute");
+ if (rowCount < partitionCount) {
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+ auto check = [&] (const TString& name) {
+ ui64 intervalEnd = GetIntervalEnd(client, name);
+ TStringBuilder query;
+ query << "SELECT Rank ";
+ query << "FROM `" << name << "` ";
+ query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) ";
+ auto it = client.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ NKqp::CompareYson("[[[1u]];[[2u]];[[3u]];[[4u]];[[5u]]]", NKqp::StreamResultToYson(it));
+ };
+ check("/Root/Tenant1/.sys/top_partitions_one_minute");
+ check("/Root/Tenant1/.sys/top_partitions_one_hour");
+ }
+
+ Y_UNIT_TEST(TopPartitionsRanges) {
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(0);
+
+ constexpr ui64 partitionCount = 5;
+
+ TTestEnv env(1, 4, 0, true);
+ CreateTenantsAndTables(env, true, partitionCount);
+
+ TTableClient client(env.GetDriver());
+ size_t rowCount = 0;
+ for (size_t iter = 0; iter < 30 && rowCount < partitionCount; ++iter) {
+ rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute");
+ if (rowCount < partitionCount) {
+ Sleep(TDuration::Seconds(5));
+ }
+ }
+ ui64 intervalEnd = GetIntervalEnd(client, "/Root/Tenant1/.sys/top_partitions_one_minute");
+ {
+ TStringBuilder query;
+ query << "SELECT IntervalEnd, Rank ";
+ query << "FROM `/Root/Tenant1/.sys/top_partitions_one_minute` ";
+ query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) ";
+ query << "AND Rank > 3u";
+ auto it = client.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TStringBuilder result;
+ result << "[";
+ result << "[[" << intervalEnd << "u];[4u]];";
+ result << "[[" << intervalEnd << "u];[5u]];";
+ result << "]";
+ NKqp::CompareYson(result, NKqp::StreamResultToYson(it));
+ }
+ {
+ TStringBuilder query;
+ query << "SELECT IntervalEnd, Rank ";
+ query << "FROM `/Root/Tenant1/.sys/top_partitions_one_minute` ";
+ query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) ";
+ query << "AND Rank >= 3u";
+ auto it = client.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TStringBuilder result;
+ result << "[";
+ result << "[[" << intervalEnd << "u];[3u]];";
+ result << "[[" << intervalEnd << "u];[4u]];";
+ result << "[[" << intervalEnd << "u];[5u]];";
+ result << "]";
+ NKqp::CompareYson(result, NKqp::StreamResultToYson(it));
+ }
+ {
+ TStringBuilder query;
+ query << "SELECT IntervalEnd, Rank ";
+ query << "FROM `/Root/Tenant1/.sys/top_partitions_one_minute` ";
+ query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) ";
+ query << "AND Rank < 3u";
+ auto it = client.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TStringBuilder result;
+ result << "[";
+ result << "[[" << intervalEnd << "u];[1u]];";
+ result << "[[" << intervalEnd << "u];[2u]];";
+ result << "]";
+ NKqp::CompareYson(result, NKqp::StreamResultToYson(it));
+ }
+ {
+ TStringBuilder query;
+ query << "SELECT IntervalEnd, Rank ";
+ query << "FROM `/Root/Tenant1/.sys/top_partitions_one_minute` ";
+ query << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) ";
+ query << "AND Rank <= 3u";
+ auto it = client.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TStringBuilder result;
+ result << "[";
+ result << "[[" << intervalEnd << "u];[1u]];";
+ result << "[[" << intervalEnd << "u];[2u]];";
+ result << "[[" << intervalEnd << "u];[3u]];";
+ result << "]";
+ NKqp::CompareYson(result, NKqp::StreamResultToYson(it));
+ }
+ }
+
Y_UNIT_TEST(OldEngineSystemView) {
TTestEnv env;
CreateRootTable(env);
@@ -1345,7 +1529,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
UNIT_ASSERT_VALUES_EQUAL(entry.Type, ESchemeEntryType::Directory);
auto children = result.GetChildren();
- UNIT_ASSERT_VALUES_EQUAL(children.size(), 16);
+ UNIT_ASSERT_VALUES_EQUAL(children.size(), 18);
THashSet<TString> names;
for (const auto& child : children) {
@@ -1363,7 +1547,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
UNIT_ASSERT_VALUES_EQUAL(entry.Type, ESchemeEntryType::Directory);
auto children = result.GetChildren();
- UNIT_ASSERT_VALUES_EQUAL(children.size(), 10);
+ UNIT_ASSERT_VALUES_EQUAL(children.size(), 12);
THashSet<TString> names;
for (const auto& child : children) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp b/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp
index f4f5db4ba7..4d7f18456b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp
@@ -451,7 +451,7 @@ struct TSchemeShard::TTxScheduleConditionalErase : public TTransactionBase<TSche
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxScheduleConditionalErase Complete"
<< ": at schemeshard: " << Self->TabletID());
- if (StatsCollectorEv) {
+ if (StatsCollectorEv && Self->SysPartitionStatsCollector) {
ctx.Send(Self->SysPartitionStatsCollector, StatsCollectorEv.Release());
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index 458bb20af0..eea4325f98 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -23,7 +23,9 @@ static ui64 GetIops(const T& c) {
}
void TSchemeShard::Handle(NSysView::TEvSysView::TEvGetPartitionStats::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(ev->Forward(SysPartitionStatsCollector));
+ if (SysPartitionStatsCollector) {
+ ctx.Send(ev->Forward(SysPartitionStatsCollector));
+ }
}
auto TSchemeShard::BuildStatsForCollector(TPathId pathId, TShardIdx shardIdx, TTabletId datashardId,
@@ -299,9 +301,11 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const
startTime = rec.GetStartTime();
}
- PendingMessages.emplace_back(
- Self->SysPartitionStatsCollector,
- Self->BuildStatsForCollector(pathId, shardIdx, datashardId, nodeId, startTime, newStats).Release());
+ if (Self->SysPartitionStatsCollector) {
+ PendingMessages.emplace_back(
+ Self->SysPartitionStatsCollector,
+ Self->BuildStatsForCollector(pathId, shardIdx, datashardId, nodeId, startTime, newStats).Release());
+ }
}
if (isOlapStore) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 9592b57f97..4b3532944f 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -67,6 +67,10 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx,
LoginProvider.Audience = TPath::Init(subDomainPathId, this).PathString();
domainPtr->UpdateSecurityState(LoginProvider.GetSecurityState());
+ TTabletId sysViewProcessorId = domainPtr->GetTenantSysViewProcessorID();
+ SysPartitionStatsCollector = Register(NSysView::CreatePartitionStatsCollector(
+ GetDomainKey(subDomainPathId), sysViewProcessorId ? sysViewProcessorId.GetValue() : 0).Release());
+
Execute(CreateTxInitPopulator(std::move(delayPublications)), ctx);
if (tablesToClean) {
@@ -3285,7 +3289,7 @@ void TSchemeShard::PersistRemoveTable(NIceDb::TNiceDb& db, TPathId pathId, const
Tables.erase(pathId);
DecrementPathDbRefCount(pathId, "remove table");
- if (AppData()->FeatureFlags.GetEnableSystemViews()) {
+ if (AppData()->FeatureFlags.GetEnableSystemViews() && SysPartitionStatsCollector) {
auto ev = MakeHolder<NSysView::TEvSysView::TEvRemoveTable>(GetDomainKey(pathId), pathId);
Send(SysPartitionStatsCollector, ev.Release());
}
@@ -3826,8 +3830,6 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
TxAllocatorClient = RegisterWithSameMailbox(CreateTxAllocatorClient(CollectTxAllocators(appData)));
- SysPartitionStatsCollector = Register(NSysView::CreatePartitionStatsCollector().Release());
-
SplitSettings.Register(appData->Icb);
Executor()->RegisterExternalTabletCounters(TabletCountersPtr);
@@ -5834,7 +5836,9 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T
auto path = TPath::Init(pathId, this);
auto ev = MakeHolder<NSysView::TEvSysView::TEvSetPartitioning>(GetDomainKey(pathId), pathId, path.PathString());
ev->ShardIndices.swap(shardIndices);
- Send(SysPartitionStatsCollector, ev.Release());
+ if (SysPartitionStatsCollector) {
+ Send(SysPartitionStatsCollector, ev.Release());
+ }
}
if (!tableInfo->IsBackup) {