aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Morozov <34001730+k-morozov@users.noreply.github.com>2024-01-03 08:28:08 +0100
committerGitHub <noreply@github.com>2024-01-03 10:28:08 +0300
commit080882eaae1f6d4870b648699b6c7600ebecb36c (patch)
tree5611b63b4c9f6ed0c8172739c555d6d9a8dc61db
parent30e189bcf3c7d98c08c422ac7458788c386b10d7 (diff)
downloadydb-080882eaae1f6d4870b648699b6c7600ebecb36c.tar.gz
Fix OLAP stats (#766)
* add tests for check olap stats, controller for act perioad and add filling stats for non-standalone table. * remove useless log * remove useless log * up controller * add test with some tables in store * support stats for table store * remove useless message * add size to stats and small up * fix flapping, up controller, add some log * reduce timeout * add check for primary index * up sender, set immutable var
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp146
-rw-r--r--ydb/core/kqp/ut/olap/ya.make1
-rw-r--r--ydb/core/protos/tx_datashard.proto2
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp138
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h15
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h6
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp93
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp29
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp11
12 files changed, 392 insertions, 67 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp
new file mode 100644
index 0000000000..635b8ea745
--- /dev/null
+++ b/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp
@@ -0,0 +1,146 @@
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/ut/common/columnshard.h>
+#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+#include <ydb/core/testlib/common_helper.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+Y_UNIT_TEST_SUITE(KqpOlapStats) {
+ constexpr size_t inserted_rows = 1000;
+ constexpr size_t tables_in_store = 1000;
+ constexpr size_t size_single_table = 13152;
+
+ const TVector<TTestHelper::TColumnSchema> schema = {
+ TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+ TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
+ TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
+ };
+
+ class TOlapStatsController: public NYDBTest::NColumnShard::TController {
+ public:
+ TDuration GetPeriodicWakeupActivationPeriod(const TDuration /*defaultValue*/) const override {
+ return TDuration::MilliSeconds(10);
+ }
+ TDuration GetStatsReportInterval(const TDuration /*defaultValue*/) const override {
+ return TDuration::MilliSeconds(10);
+ }
+ };
+
+ Y_UNIT_TEST(AddRowsTableStandalone) {
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<TOlapStatsController>();
+
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+
+ TTestHelper testHelper(runnerSettings);
+
+ TTestHelper::TColumnTable testTable;
+
+ testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+
+ for(size_t i=0; i<inserted_rows; i++) {
+ tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull();
+ }
+
+ testHelper.InsertData(testTable, tableInserter);
+ }
+
+ Sleep(TDuration::Seconds(1));
+
+ auto settings = TDescribeTableSettings().WithTableStatistics(true);
+ auto describeResult = testHelper.GetSession().DescribeTable("/Root/ColumnTableTest", settings).GetValueSync();
+
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+
+ const auto& description = describeResult.GetTableDescription();
+
+ UNIT_ASSERT_VALUES_EQUAL(inserted_rows, description.GetTableRows());
+ UNIT_ASSERT_VALUES_EQUAL(size_single_table, description.GetTableSize());
+ }
+
+ Y_UNIT_TEST(AddRowsTableInTableStore) {
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<TOlapStatsController>();
+
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+
+ TTestHelper testHelper(runnerSettings);
+
+ TTestHelper::TColumnTableStore testTableStore;
+
+ testTableStore.SetName("/Root/TableStoreTest").SetPrimaryKey({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTableStore);
+ TTestHelper::TColumnTable testTable;
+ testTable.SetName("/Root/TableStoreTest/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ for(size_t i=0; i<inserted_rows; i++) {
+ tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull();
+ }
+ testHelper.InsertData(testTable, tableInserter);
+ }
+
+ Sleep(TDuration::Seconds(1));
+
+ auto settings = TDescribeTableSettings().WithTableStatistics(true);
+ auto describeResult = testHelper.GetSession().DescribeTable("/Root/TableStoreTest/ColumnTableTest", settings).GetValueSync();
+
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+
+ const auto& description = describeResult.GetTableDescription();
+
+ UNIT_ASSERT_VALUES_EQUAL(inserted_rows, description.GetTableRows());
+ UNIT_ASSERT_VALUES_EQUAL(size_single_table, description.GetTableSize());
+ }
+
+ Y_UNIT_TEST(AddRowsSomeTablesInTableStore) {
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<TOlapStatsController>();
+
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+
+ TTestHelper testHelper(runnerSettings);
+
+ TTestHelper::TColumnTableStore testTableStore;
+
+ testTableStore.SetName("/Root/TableStoreTest").SetPrimaryKey({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTableStore);
+
+ Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).SetPriority(NActors::NLog::PRI_DEBUG).Initialize();
+
+ for(size_t t=0; t<tables_in_store; t++) {
+ TTestHelper::TColumnTable testTable;
+ testTable.SetName("/Root/TableStoreTest/ColumnTableTest_" + std::to_string(t)).SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ for(size_t i=0; i < t+ inserted_rows; i++) {
+ tableInserter.AddRow().Add(i + t * tables_in_store).Add("test_res_" + std::to_string(i + t * tables_in_store)).AddNull();
+ }
+ testHelper.InsertData(testTable, tableInserter);;
+ }
+
+ Sleep(TDuration::Seconds(20));
+
+ auto settings = TDescribeTableSettings().WithTableStatistics(true);
+ for(size_t t=0; t<tables_in_store; t++) {
+ auto describeResult = testHelper.GetSession().DescribeTable("/Root/TableStoreTest/ColumnTableTest_" + std::to_string(t), settings).GetValueSync();
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+ const auto& description = describeResult.GetTableDescription();
+
+ UNIT_ASSERT_VALUES_EQUAL(t + inserted_rows, description.GetTableRows());
+ }
+ }
+}
+
+} // namespace NKqp
+} // namespace NKikimr \ No newline at end of file
diff --git a/ydb/core/kqp/ut/olap/ya.make b/ydb/core/kqp/ut/olap/ya.make
index 8331573d01..8dfe183405 100644
--- a/ydb/core/kqp/ut/olap/ya.make
+++ b/ydb/core/kqp/ut/olap/ya.make
@@ -13,6 +13,7 @@ ELSE()
ENDIF()
SRCS(
+ kqp_olap_stats_ut.cpp
kqp_olap_ut.cpp
)
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index fbb6bf66f5..599ffff448 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -806,6 +806,8 @@ message TEvPeriodicTableStats {
optional uint64 TableOwnerId = 12;
optional bool IsDstSplit = 13;
+
+ repeated TEvPeriodicTableStats Tables = 14;
}
message TSerializedRowColumnsScheme {
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index b4459dca0c..8662548284 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -46,7 +46,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
EnqueueProgressTx(ctx);
}
EnqueueBackgroundActivities();
- ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
+ ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
}
void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
@@ -158,7 +158,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC
SendWaitPlanStep(GetOutdatedStep());
SendPeriodicStats();
- ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
+ ctx.Schedule(PeriodicWakeupActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
}
}
@@ -293,15 +293,106 @@ void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage&
metrics->TryUpdate(ctx);
}
+void TColumnShard::ConfigureStats(const NOlap::TColumnEngineStats& indexStats, ::NKikimrTableStats::TTableStats * tabletStats) {
+ NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate();
+ auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted
+
+ if (activeIndexStats.Rows < 0 || activeIndexStats.Bytes < 0) {
+ LOG_S_WARN("Negative stats counter. Rows: " << activeIndexStats.Rows
+ << " Bytes: " << activeIndexStats.Bytes << TabletID());
+
+ activeIndexStats.Rows = (activeIndexStats.Rows < 0) ? 0 : activeIndexStats.Rows;
+ activeIndexStats.Bytes = (activeIndexStats.Bytes < 0) ? 0 : activeIndexStats.Bytes;
+ }
+
+ tabletStats->SetRowCount(activeIndexStats.Rows);
+ tabletStats->SetDataSize(activeIndexStats.Bytes + TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get());
+
+ // TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside)
+ //tabletStats->SetIndexSize(); // TODO: calc size of internal tables
+
+ tabletStats->SetLastAccessTime(LastAccessTime.MilliSeconds());
+ tabletStats->SetLastUpdateTime(lastIndexUpdate.GetPlanStep());
+}
+
+TDuration TColumnShard::GetControllerPeriodicWakeupActivationPeriod() {
+ return NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod(TSettings::DefaultPeriodicWakeupActivationPeriod);
+}
+
+TDuration TColumnShard::GetControllerStatsReportInterval() {
+ return NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval(TSettings::DefaultStatsReportInterval);
+}
+
+void TColumnShard::FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats) const {
+ tableStats->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOAD].Get());
+ tableStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get());
+ tableStats->SetInFlightTxCount(Executor()->GetStats().TxInFly);
+}
+
+void TColumnShard::FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev) {
+ ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
+ ev->Record.SetGeneration(Executor()->Generation());
+ ev->Record.SetRound(StatsReportRound++);
+ ev->Record.SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId);
+ ev->Record.SetStartTime(StartTime().MilliSeconds());
+ if (auto* resourceMetrics = Executor()->GetResourceMetrics()) {
+ resourceMetrics->Fill(*ev->Record.MutableTabletMetrics());
+ }
+ auto* tabletStats = ev->Record.MutableTableStats();
+ FillTxTableStats(tabletStats);
+ if (TablesManager.HasPrimaryIndex()) {
+ const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats();
+ ConfigureStats(indexStats, tabletStats);
+ }
+}
+
+void TColumnShard::FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev) {
+ if (!TablesManager.HasPrimaryIndex()) {
+ return;
+ }
+ const auto& tablesIndexStats = TablesManager.MutablePrimaryIndex().GetStats();
+ LOG_S_DEBUG("There are stats for " << tablesIndexStats.size() << " tables");
+ for(const auto& [tableLocalID, columnStats] : tablesIndexStats) {
+ if (!columnStats) {
+ LOG_S_ERROR("SendPeriodicStats: empty stats");
+ continue;
+ }
+
+ auto* periodicTableStats = ev->Record.AddTables();
+ periodicTableStats->SetDatashardId(TabletID());
+ periodicTableStats->SetTableLocalId(tableLocalID);
+
+ periodicTableStats->SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
+ periodicTableStats->SetGeneration(Executor()->Generation());
+ periodicTableStats->SetRound(StatsReportRound++);
+ periodicTableStats->SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId);
+ periodicTableStats->SetStartTime(StartTime().MilliSeconds());
+
+ if (auto* resourceMetrics = Executor()->GetResourceMetrics()) {
+ resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics());
+ }
+
+ auto* tableStats = periodicTableStats->MutableTableStats();
+ FillTxTableStats(tableStats);
+ ConfigureStats(*columnStats, tableStats);
+
+ LOG_S_TRACE("Add stats for table, tableLocalID=" << tableLocalID);
+ }
+}
+
void TColumnShard::SendPeriodicStats() {
+ LOG_S_DEBUG("Send periodic stats.");
+
if (!CurrentSchemeShardId || !OwnerPathId) {
LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID());
return;
}
const TActorContext& ctx = ActorContext();
- TInstant now = TAppData::TimeProvider->Now();
+ const TInstant now = TAppData::TimeProvider->Now();
+
if (LastStatsReport + StatsReportInterval > now) {
+ LOG_S_TRACE("Skip send periodic stats: report interavl = " << StatsReportInterval);
return;
}
LastStatsReport = now;
@@ -313,45 +404,10 @@ void TColumnShard::SendPeriodicStats() {
}
auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), OwnerPathId);
- {
- ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
- ev->Record.SetGeneration(Executor()->Generation());
- ev->Record.SetRound(StatsReportRound++);
- ev->Record.SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId);
- ev->Record.SetStartTime(StartTime().MilliSeconds());
-
- if (auto* resourceMetrics = Executor()->GetResourceMetrics()) {
- resourceMetrics->Fill(*ev->Record.MutableTabletMetrics());
- }
- auto* tabletStats = ev->Record.MutableTableStats();
- tabletStats->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOAD].Get());
- tabletStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get());
- tabletStats->SetInFlightTxCount(Executor()->GetStats().TxInFly);
-
- if (TablesManager.HasPrimaryIndex()) {
- const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats();
- NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate();
- auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted
-
- if (activeIndexStats.Rows < 0 || activeIndexStats.Bytes < 0) {
- LOG_S_WARN("Negative stats counter. Rows: " << activeIndexStats.Rows
- << " Bytes: " << activeIndexStats.Bytes << TabletID());
-
- activeIndexStats.Rows = (activeIndexStats.Rows < 0) ? 0 : activeIndexStats.Rows;
- activeIndexStats.Bytes = (activeIndexStats.Bytes < 0) ? 0 : activeIndexStats.Bytes;
- }
-
- tabletStats->SetRowCount(activeIndexStats.Rows);
- tabletStats->SetDataSize(activeIndexStats.Bytes + TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get());
- // TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside)
- //tabletStats->SetIndexSize(); // TODO: calc size of internal tables
- tabletStats->SetLastAccessTime(LastAccessTime.MilliSeconds());
- tabletStats->SetLastUpdateTime(lastIndexUpdate.GetPlanStep());
- }
- }
-
- LOG_S_DEBUG("Sending periodic stats at tablet " << TabletID());
+ FillOlapStats(ctx, ev);
+ FillColumnTableStats(ctx, ev);
+
NTabletPipe::SendData(ctx, StatsReportPipe, ev.release());
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 7a39a30cc7..bd93759cc4 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -165,6 +165,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, nullptr)
, ProgressTxController(std::make_unique<TTxController>(*this))
+ , PeriodicWakeupActivationPeriod(GetControllerPeriodicWakeupActivationPeriod())
+ , StatsReportInterval(GetControllerStatsReportInterval())
, StoragesManager(std::make_shared<TStoragesManager>(*this))
, InFlightReadsTracker(StoragesManager)
, TablesManager(StoragesManager, info->TabletID)
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 2f728fbc01..e9c4062061 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -69,6 +69,8 @@ struct TSettings {
static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16;
static constexpr TDuration GuaranteeIndexationInterval = TDuration::Seconds(10);
+ static constexpr TDuration DefaultPeriodicWakeupActivationPeriod = TDuration::Seconds(60);
+ static constexpr TDuration DefaultStatsReportInterval = TDuration::Seconds(10);
static constexpr i64 GuaranteeIndexationStartBytesLimit = (i64)5 * 1024 * 1024 * 1024;
TControlWrapper BlobWriteGrouppingEnabled;
@@ -389,9 +391,9 @@ private:
bool MediatorTimeCastRegistered = false;
TSet<ui64> MediatorTimeCastWaitingSteps;
TDuration MaxReadStaleness = TDuration::Minutes(5); // TODO: Make configurable?
- TDuration ActivationPeriod = TDuration::Seconds(60);
+ const TDuration PeriodicWakeupActivationPeriod;
TDuration FailActivationDelay = TDuration::Seconds(1);
- TDuration StatsReportInterval = TDuration::Seconds(10);
+ const TDuration StatsReportInterval;
TInstant LastAccessTime;
TInstant LastStatsReport;
@@ -482,7 +484,16 @@ private:
void UpdateIndexCounters();
void UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage);
ui64 MemoryUsage() const;
+
void SendPeriodicStats();
+ void FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev);
+ void FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev);
+ void ConfigureStats(const NOlap::TColumnEngineStats& indexStats, ::NKikimrTableStats::TTableStats * tabletStats);
+ void FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats) const;
+
+ static TDuration GetControllerPeriodicWakeupActivationPeriod();
+ static TDuration GetControllerStatsReportInterval();
+
public:
const std::shared_ptr<NOlap::IStoragesManager>& GetStoragesManager() const {
return StoragesManager;
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index 7a40d59aa9..6b5f70674c 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -91,6 +91,12 @@ public:
virtual TDuration GetGuaranteeIndexationInterval(const TDuration defaultValue) const {
return defaultValue;
}
+ virtual TDuration GetPeriodicWakeupActivationPeriod(const TDuration defaultValue) const {
+ return defaultValue;
+ }
+ virtual TDuration GetStatsReportInterval(const TDuration defaultValue) const {
+ return defaultValue;
+ }
virtual ui64 GetGuaranteeIndexationStartBytesLimit(const ui64 defaultValue) const {
return defaultValue;
}
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index 60160940fe..29bf1995ff 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -8,6 +8,8 @@ private:
YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0);
YDB_READONLY(TAtomicCounter, Compactions, 0);
YDB_ACCESSOR(std::optional<TDuration>, GuaranteeIndexationInterval, TDuration::Zero());
+ YDB_ACCESSOR(std::optional<TDuration>, PeriodicWakeupActivationPeriod, std::nullopt);
+ YDB_ACCESSOR(std::optional<TDuration>, StatsReportInterval, std::nullopt);
YDB_ACCESSOR(std::optional<ui64>, GuaranteeIndexationStartBytesLimit, 0);
YDB_ACCESSOR(std::optional<TDuration>, OptimizerFreshnessCheckDuration, TDuration::Zero());
EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;
@@ -17,6 +19,12 @@ protected:
virtual TDuration GetGuaranteeIndexationInterval(const TDuration defaultValue) const override {
return GuaranteeIndexationInterval.value_or(defaultValue);
}
+ TDuration GetPeriodicWakeupActivationPeriod(const TDuration defaultValue) const override {
+ return PeriodicWakeupActivationPeriod.value_or(defaultValue);
+ }
+ TDuration GetStatsReportInterval(const TDuration defaultValue) const override {
+ return StatsReportInterval.value_or(defaultValue);
+ }
virtual ui64 GetGuaranteeIndexationStartBytesLimit(const ui64 defaultValue) const override {
return GuaranteeIndexationStartBytesLimit.value_or(defaultValue);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index 340a8a2254..aebba18ca4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -91,6 +91,9 @@ public:
// returns true to continue batching
bool PersistSingleStats(const TPathId& pathId, const TStatsQueue<TEvDataShard::TEvPeriodicTableStats>::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override;
void ScheduleNextBatch(const TActorContext& ctx) override;
+
+ template<typename T>
+ TPartitionStats PrepareStats(const TActorContext& ctx, const T& rec) const;
};
@@ -121,30 +124,12 @@ THolder<TProposeRequest> MergeRequest(
return std::move(request);
}
-bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
- const TStatsQueueItem<TEvDataShard::TEvPeriodicTableStats>& item, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) {
- const auto& rec = item.Ev->Get()->Record;
- auto datashardId = TTabletId(rec.GetDatashardId());
+template<typename T>
+TPartitionStats TTxStoreTableStats::PrepareStats(
+ const TActorContext& ctx, const T& rec) const {
const auto& tableStats = rec.GetTableStats();
const auto& tabletMetrics = rec.GetTabletMetrics();
- ui64 dataSize = tableStats.GetDataSize();
- ui64 rowCount = tableStats.GetRowCount();
-
- bool isDataShard = Self->Tables.contains(pathId);
- bool isOlapStore = Self->OlapStores.contains(pathId);
- bool isColumnTable = Self->ColumnTables.contains(pathId);
- if (!isDataShard && !isOlapStore && !isColumnTable) {
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unexpected stats from shard " << datashardId);
- return true;
- }
-
- if (!Self->TabletIdToShardIdx.contains(datashardId)) {
- LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "No shardIdx for shard " << datashardId);
- return true;
- }
-
- TShardIdx shardIdx = Self->TabletIdToShardIdx[datashardId];
TPartitionStats newStats;
newStats.SeqNo = TMessageSeqNo(rec.GetGeneration(), rec.GetRound());
@@ -194,8 +179,47 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
}
newStats.ShardState = rec.GetShardState();
+ return newStats;
+}
+
+bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
+ const TStatsQueueItem<TEvDataShard::TEvPeriodicTableStats>& item, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) {
+ const auto& rec = item.Ev->Get()->Record;
+ const auto datashardId = TTabletId(rec.GetDatashardId());
+
+ const auto& tableStats = rec.GetTableStats();
+ ui64 dataSize = tableStats.GetDataSize();
+ ui64 rowCount = tableStats.GetRowCount();
+
+ const bool isDataShard = Self->Tables.contains(pathId);
+ const bool isOlapStore = Self->OlapStores.contains(pathId);
+ const bool isColumnTable = Self->ColumnTables.contains(pathId);
+
+ if (!isDataShard && !isOlapStore && !isColumnTable) {
+ LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unexpected stats from shard " << datashardId);
+ return true;
+ }
+
+ if (!Self->TabletIdToShardIdx.contains(datashardId)) {
+ LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "No shardIdx for shard " << datashardId);
+ return true;
+ }
+
+ TShardIdx shardIdx = Self->TabletIdToShardIdx[datashardId];
+
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Add stats from shard " << datashardId << ", pathId " << pathId.LocalPathId
+ "TTxStoreTableStats.PersistSingleStats: main stats from"
+ << " datashardId(TabletID)=" << datashardId
+ << " maps to shardIdx: " << shardIdx
+ << ", pathId: " << pathId
+ << ", pathId map=" << Self->PathsById[pathId]->Name
+ << ", is column=" << isColumnTable
+ << ", is olap=" << isOlapStore);
+
+ const TPartitionStats newStats = PrepareStats(ctx, rec);
+
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Add stats from shard with datashardId(TabletID)=" << datashardId << ", pathId " << pathId.LocalPathId
<< ": RowCount " << newStats.RowCount << ", DataSize " << newStats.DataSize);
NIceDb::TNiceDb db(txc.DB);
@@ -235,7 +259,32 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
olapStore->UpdateShardStats(shardIdx, newStats);
newAggrStats = olapStore->GetStats().Aggregated;
updateSubdomainInfo = true;
+
+ const auto tables = rec.GetTables();
+ LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "OLAP store contains " << tables.size() << " tables.");
+
+ for(const auto& table : tables) {
+ const TPartitionStats newTableStats = PrepareStats(ctx, table);
+
+ const TPathId tablePathId = TPathId(TOwnerId(pathId.OwnerId), TLocalPathId(table.GetTableLocalId()));
+
+ if (Self->ColumnTables.contains(tablePathId)) {
+ LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "add stats for exists table with pathId=" << tablePathId);
+
+ auto columnTable = Self->ColumnTables.TakeVerified(tablePathId);
+ columnTable->UpdateTableStats(tablePathId, newTableStats);
+ } else {
+ LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "failed add stats for table with pathId=" << tablePathId);
+ }
+ }
+
} else if (isColumnTable) {
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "PersistSingleStats: ColumnTable rec.GetColumnTables() size="
+ << rec.GetTables().size());
+
auto columnTable = Self->ColumnTables.TakeVerified(pathId);
oldAggrStats = columnTable->GetStats().Aggregated;
columnTable->UpdateShardStats(shardIdx, newStats);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index d3b4dcc472..0f3cc648ce 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -1477,6 +1477,35 @@ void TAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPartition
}
}
+void TAggregatedStats::UpdateTableStats(const TPathId& pathId, const TPartitionStats& newStats) {
+ if (!TableStats.contains(pathId)) {
+ TableStats[pathId] = newStats;
+ return;
+ }
+
+ TPartitionStats& oldStats = TableStats[pathId];
+
+ if (newStats.SeqNo <= oldStats.SeqNo) {
+ // Ignore outdated message
+ return;
+ }
+
+ if (newStats.SeqNo.Generation > oldStats.SeqNo.Generation) {
+ // Reset incremental counter baselines if tablet has restarted
+ oldStats.ImmediateTxCompleted = 0;
+ oldStats.PlannedTxCompleted = 0;
+ oldStats.TxRejectedByOverload = 0;
+ oldStats.TxRejectedBySpace = 0;
+ oldStats.RowUpdates = 0;
+ oldStats.RowDeletes = 0;
+ oldStats.RowReads = 0;
+ oldStats.RangeReads = 0;
+ oldStats.RangeReadRows = 0;
+ }
+ TableStats[pathId].RowCount += (newStats.RowCount - oldStats.RowCount);
+ TableStats[pathId].DataSize += (newStats.DataSize - oldStats.DataSize);
+}
+
void TTableInfo::RegisterSplitMergeOp(TOperationId opId, const TTxState& txState) {
Y_ABORT_UNLESS(txState.TxType == TTxState::TxSplitTablePartition || txState.TxType == TTxState::TxMergeTablePartition);
Y_ABORT_UNLESS(txState.SplitDescription);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 501b8c6853..7a7491c0a8 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -302,9 +302,11 @@ private:
struct TAggregatedStats {
TPartitionStats Aggregated;
THashMap<TShardIdx, TPartitionStats> PartitionStats;
+ THashMap<TPathId, TPartitionStats> TableStats;
size_t PartitionStatsUpdated = 0;
void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats);
+ void UpdateTableStats(const TPathId& pathId, const TPartitionStats& newStats);
};
struct TSubDomainInfo;
@@ -1001,11 +1003,15 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> {
return Stats;
}
- void UpdateShardStats(TShardIdx shardIdx, const TPartitionStats& newStats) {
+ void UpdateShardStats(const TShardIdx shardIdx, const TPartitionStats& newStats) {
Stats.Aggregated.PartCount = ColumnShards.size();
Stats.PartitionStats[shardIdx]; // insert if none
Stats.UpdateShardStats(shardIdx, newStats);
}
+
+ void UpdateTableStats(const TPathId& pathId, const TPartitionStats& newStats) {
+ Stats.UpdateTableStats(pathId, newStats);
+ }
};
struct TTopicStats {
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 397e008c98..7e36ac8d1e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -48,6 +48,11 @@ static void FillAggregatedStats(NKikimrSchemeOp::TPathDescription& pathDescripti
FillTableMetrics(pathDescription.MutableTabletMetrics(), stats.Aggregated);
}
+static void FillTableStats(NKikimrSchemeOp::TPathDescription& pathDescription, const TPartitionStats& stats) {
+ FillTableStats(pathDescription.MutableTableStats(), stats);
+ FillTableMetrics(pathDescription.MutableTabletMetrics(), stats);
+}
+
void TPathDescriber::FillPathDescr(NKikimrSchemeOp::TDirEntry* descr, TPathElement::TPtr pathEl, TPathElement::EPathSubType subType) {
FillChildDescr(descr, pathEl);
@@ -369,6 +374,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
void TPathDescriber::DescribeOlapStore(TPathId pathId, TPathElement::TPtr pathEl) {
const TOlapStoreInfo::TPtr storeInfo = *Self->OlapStores.FindPtr(pathId);
+
Y_ABORT_UNLESS(storeInfo, "OlapStore not found");
Y_UNUSED(pathEl);
@@ -387,7 +393,7 @@ void TPathDescriber::DescribeOlapStore(TPathId pathId, TPathElement::TPtr pathEl
}
void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr pathEl) {
- const auto tableInfo = Self->ColumnTables.GetVerified(pathId);
+ const auto tableInfo = Self->ColumnTables.GetVerified(pathId);
Y_UNUSED(pathEl);
auto* pathDescription = Result->Record.MutablePathDescription();
@@ -407,6 +413,9 @@ void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr path
if (description->HasSchemaPresetVersionAdj()) {
description->MutableSchema()->SetVersion(description->GetSchema().GetVersion() + description->GetSchemaPresetVersionAdj());
}
+ if (tableInfo->GetStats().TableStats.contains(pathId)) {
+ FillTableStats(*pathDescription, tableInfo->GetStats().TableStats.at(pathId));
+ }
}
}