diff options
author | Konstantin Morozov <34001730+k-morozov@users.noreply.github.com> | 2024-01-03 08:28:08 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-03 10:28:08 +0300 |
commit | 080882eaae1f6d4870b648699b6c7600ebecb36c (patch) | |
tree | 5611b63b4c9f6ed0c8172739c555d6d9a8dc61db | |
parent | 30e189bcf3c7d98c08c422ac7458788c386b10d7 (diff) | |
download | ydb-080882eaae1f6d4870b648699b6c7600ebecb36c.tar.gz |
Fix OLAP stats (#766)
* add tests for check olap stats, controller for act perioad and add filling stats for non-standalone table.
* remove useless log
* remove useless log
* up controller
* add test with some tables in store
* support stats for table store
* remove useless message
* add size to stats and small up
* fix flapping, up controller, add some log
* reduce timeout
* add check for primary index
* up sender, set immutable var
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp | 146 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.cpp | 138 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 15 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/hooks/abstract/abstract.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/hooks/testing/controller.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__table_stats.cpp | 93 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.cpp | 29 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_path_describer.cpp | 11 |
12 files changed, 392 insertions, 67 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp new file mode 100644 index 0000000000..635b8ea745 --- /dev/null +++ b/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp @@ -0,0 +1,146 @@ +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/core/kqp/ut/common/columnshard.h> +#include <ydb/core/tx/columnshard/hooks/testing/controller.h> +#include <ydb/core/testlib/common_helper.h> + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NTable; + +Y_UNIT_TEST_SUITE(KqpOlapStats) { + constexpr size_t inserted_rows = 1000; + constexpr size_t tables_in_store = 1000; + constexpr size_t size_single_table = 13152; + + const TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), + TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) + }; + + class TOlapStatsController: public NYDBTest::NColumnShard::TController { + public: + TDuration GetPeriodicWakeupActivationPeriod(const TDuration /*defaultValue*/) const override { + return TDuration::MilliSeconds(10); + } + TDuration GetStatsReportInterval(const TDuration /*defaultValue*/) const override { + return TDuration::MilliSeconds(10); + } + }; + + Y_UNIT_TEST(AddRowsTableStandalone) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<TOlapStatsController>(); + + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + + TTestHelper testHelper(runnerSettings); + + TTestHelper::TColumnTable testTable; + + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + + for(size_t i=0; i<inserted_rows; i++) { + tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull(); + } + + testHelper.InsertData(testTable, tableInserter); + } + + Sleep(TDuration::Seconds(1)); + + auto settings = TDescribeTableSettings().WithTableStatistics(true); + auto describeResult = testHelper.GetSession().DescribeTable("/Root/ColumnTableTest", settings).GetValueSync(); + + UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString()); + + const auto& description = describeResult.GetTableDescription(); + + UNIT_ASSERT_VALUES_EQUAL(inserted_rows, description.GetTableRows()); + UNIT_ASSERT_VALUES_EQUAL(size_single_table, description.GetTableSize()); + } + + Y_UNIT_TEST(AddRowsTableInTableStore) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<TOlapStatsController>(); + + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + + TTestHelper testHelper(runnerSettings); + + TTestHelper::TColumnTableStore testTableStore; + + testTableStore.SetName("/Root/TableStoreTest").SetPrimaryKey({"id"}).SetSchema(schema); + testHelper.CreateTable(testTableStore); + TTestHelper::TColumnTable testTable; + testTable.SetName("/Root/TableStoreTest/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + for(size_t i=0; i<inserted_rows; i++) { + tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull(); + } + testHelper.InsertData(testTable, tableInserter); + } + + Sleep(TDuration::Seconds(1)); + + auto settings = TDescribeTableSettings().WithTableStatistics(true); + auto describeResult = testHelper.GetSession().DescribeTable("/Root/TableStoreTest/ColumnTableTest", settings).GetValueSync(); + + UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString()); + + const auto& description = describeResult.GetTableDescription(); + + UNIT_ASSERT_VALUES_EQUAL(inserted_rows, description.GetTableRows()); + UNIT_ASSERT_VALUES_EQUAL(size_single_table, description.GetTableSize()); + } + + Y_UNIT_TEST(AddRowsSomeTablesInTableStore) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<TOlapStatsController>(); + + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + + TTestHelper testHelper(runnerSettings); + + TTestHelper::TColumnTableStore testTableStore; + + testTableStore.SetName("/Root/TableStoreTest").SetPrimaryKey({"id"}).SetSchema(schema); + testHelper.CreateTable(testTableStore); + + Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).SetPriority(NActors::NLog::PRI_DEBUG).Initialize(); + + for(size_t t=0; t<tables_in_store; t++) { + TTestHelper::TColumnTable testTable; + testTable.SetName("/Root/TableStoreTest/ColumnTableTest_" + std::to_string(t)).SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + for(size_t i=0; i < t+ inserted_rows; i++) { + tableInserter.AddRow().Add(i + t * tables_in_store).Add("test_res_" + std::to_string(i + t * tables_in_store)).AddNull(); + } + testHelper.InsertData(testTable, tableInserter);; + } + + Sleep(TDuration::Seconds(20)); + + auto settings = TDescribeTableSettings().WithTableStatistics(true); + for(size_t t=0; t<tables_in_store; t++) { + auto describeResult = testHelper.GetSession().DescribeTable("/Root/TableStoreTest/ColumnTableTest_" + std::to_string(t), settings).GetValueSync(); + UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString()); + const auto& description = describeResult.GetTableDescription(); + + UNIT_ASSERT_VALUES_EQUAL(t + inserted_rows, description.GetTableRows()); + } + } +} + +} // namespace NKqp +} // namespace NKikimr
\ No newline at end of file diff --git a/ydb/core/kqp/ut/olap/ya.make b/ydb/core/kqp/ut/olap/ya.make index 8331573d01..8dfe183405 100644 --- a/ydb/core/kqp/ut/olap/ya.make +++ b/ydb/core/kqp/ut/olap/ya.make @@ -13,6 +13,7 @@ ELSE() ENDIF() SRCS( + kqp_olap_stats_ut.cpp kqp_olap_ut.cpp ) diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index fbb6bf66f5..599ffff448 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -806,6 +806,8 @@ message TEvPeriodicTableStats { optional uint64 TableOwnerId = 12; optional bool IsDstSplit = 13; + + repeated TEvPeriodicTableStats Tables = 14; } message TSerializedRowColumnsScheme { diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index b4459dca0c..8662548284 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -46,7 +46,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { EnqueueProgressTx(ctx); } EnqueueBackgroundActivities(); - ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); + ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup()); } void TColumnShard::OnActivateExecutor(const TActorContext& ctx) { @@ -158,7 +158,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC SendWaitPlanStep(GetOutdatedStep()); SendPeriodicStats(); - ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); + ctx.Schedule(PeriodicWakeupActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); } } @@ -293,15 +293,106 @@ void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage& metrics->TryUpdate(ctx); } +void TColumnShard::ConfigureStats(const NOlap::TColumnEngineStats& indexStats, ::NKikimrTableStats::TTableStats * tabletStats) { + NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate(); + auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted + + if (activeIndexStats.Rows < 0 || activeIndexStats.Bytes < 0) { + LOG_S_WARN("Negative stats counter. Rows: " << activeIndexStats.Rows + << " Bytes: " << activeIndexStats.Bytes << TabletID()); + + activeIndexStats.Rows = (activeIndexStats.Rows < 0) ? 0 : activeIndexStats.Rows; + activeIndexStats.Bytes = (activeIndexStats.Bytes < 0) ? 0 : activeIndexStats.Bytes; + } + + tabletStats->SetRowCount(activeIndexStats.Rows); + tabletStats->SetDataSize(activeIndexStats.Bytes + TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get()); + + // TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside) + //tabletStats->SetIndexSize(); // TODO: calc size of internal tables + + tabletStats->SetLastAccessTime(LastAccessTime.MilliSeconds()); + tabletStats->SetLastUpdateTime(lastIndexUpdate.GetPlanStep()); +} + +TDuration TColumnShard::GetControllerPeriodicWakeupActivationPeriod() { + return NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod(TSettings::DefaultPeriodicWakeupActivationPeriod); +} + +TDuration TColumnShard::GetControllerStatsReportInterval() { + return NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval(TSettings::DefaultStatsReportInterval); +} + +void TColumnShard::FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats) const { + tableStats->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOAD].Get()); + tableStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get()); + tableStats->SetInFlightTxCount(Executor()->GetStats().TxInFly); +} + +void TColumnShard::FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev) { + ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready + ev->Record.SetGeneration(Executor()->Generation()); + ev->Record.SetRound(StatsReportRound++); + ev->Record.SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId); + ev->Record.SetStartTime(StartTime().MilliSeconds()); + if (auto* resourceMetrics = Executor()->GetResourceMetrics()) { + resourceMetrics->Fill(*ev->Record.MutableTabletMetrics()); + } + auto* tabletStats = ev->Record.MutableTableStats(); + FillTxTableStats(tabletStats); + if (TablesManager.HasPrimaryIndex()) { + const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats(); + ConfigureStats(indexStats, tabletStats); + } +} + +void TColumnShard::FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev) { + if (!TablesManager.HasPrimaryIndex()) { + return; + } + const auto& tablesIndexStats = TablesManager.MutablePrimaryIndex().GetStats(); + LOG_S_DEBUG("There are stats for " << tablesIndexStats.size() << " tables"); + for(const auto& [tableLocalID, columnStats] : tablesIndexStats) { + if (!columnStats) { + LOG_S_ERROR("SendPeriodicStats: empty stats"); + continue; + } + + auto* periodicTableStats = ev->Record.AddTables(); + periodicTableStats->SetDatashardId(TabletID()); + periodicTableStats->SetTableLocalId(tableLocalID); + + periodicTableStats->SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready + periodicTableStats->SetGeneration(Executor()->Generation()); + periodicTableStats->SetRound(StatsReportRound++); + periodicTableStats->SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId); + periodicTableStats->SetStartTime(StartTime().MilliSeconds()); + + if (auto* resourceMetrics = Executor()->GetResourceMetrics()) { + resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics()); + } + + auto* tableStats = periodicTableStats->MutableTableStats(); + FillTxTableStats(tableStats); + ConfigureStats(*columnStats, tableStats); + + LOG_S_TRACE("Add stats for table, tableLocalID=" << tableLocalID); + } +} + void TColumnShard::SendPeriodicStats() { + LOG_S_DEBUG("Send periodic stats."); + if (!CurrentSchemeShardId || !OwnerPathId) { LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID()); return; } const TActorContext& ctx = ActorContext(); - TInstant now = TAppData::TimeProvider->Now(); + const TInstant now = TAppData::TimeProvider->Now(); + if (LastStatsReport + StatsReportInterval > now) { + LOG_S_TRACE("Skip send periodic stats: report interavl = " << StatsReportInterval); return; } LastStatsReport = now; @@ -313,45 +404,10 @@ void TColumnShard::SendPeriodicStats() { } auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), OwnerPathId); - { - ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready - ev->Record.SetGeneration(Executor()->Generation()); - ev->Record.SetRound(StatsReportRound++); - ev->Record.SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId); - ev->Record.SetStartTime(StartTime().MilliSeconds()); - - if (auto* resourceMetrics = Executor()->GetResourceMetrics()) { - resourceMetrics->Fill(*ev->Record.MutableTabletMetrics()); - } - auto* tabletStats = ev->Record.MutableTableStats(); - tabletStats->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOAD].Get()); - tabletStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get()); - tabletStats->SetInFlightTxCount(Executor()->GetStats().TxInFly); - - if (TablesManager.HasPrimaryIndex()) { - const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats(); - NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate(); - auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted - - if (activeIndexStats.Rows < 0 || activeIndexStats.Bytes < 0) { - LOG_S_WARN("Negative stats counter. Rows: " << activeIndexStats.Rows - << " Bytes: " << activeIndexStats.Bytes << TabletID()); - - activeIndexStats.Rows = (activeIndexStats.Rows < 0) ? 0 : activeIndexStats.Rows; - activeIndexStats.Bytes = (activeIndexStats.Bytes < 0) ? 0 : activeIndexStats.Bytes; - } - - tabletStats->SetRowCount(activeIndexStats.Rows); - tabletStats->SetDataSize(activeIndexStats.Bytes + TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get()); - // TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside) - //tabletStats->SetIndexSize(); // TODO: calc size of internal tables - tabletStats->SetLastAccessTime(LastAccessTime.MilliSeconds()); - tabletStats->SetLastUpdateTime(lastIndexUpdate.GetPlanStep()); - } - } - - LOG_S_DEBUG("Sending periodic stats at tablet " << TabletID()); + FillOlapStats(ctx, ev); + FillColumnTableStats(ctx, ev); + NTabletPipe::SendData(ctx, StatsReportPipe, ev.release()); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 7a39a30cc7..bd93759cc4 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -165,6 +165,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, nullptr) , ProgressTxController(std::make_unique<TTxController>(*this)) + , PeriodicWakeupActivationPeriod(GetControllerPeriodicWakeupActivationPeriod()) + , StatsReportInterval(GetControllerStatsReportInterval()) , StoragesManager(std::make_shared<TStoragesManager>(*this)) , InFlightReadsTracker(StoragesManager) , TablesManager(StoragesManager, info->TabletID) diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 2f728fbc01..e9c4062061 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -69,6 +69,8 @@ struct TSettings { static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16; static constexpr TDuration GuaranteeIndexationInterval = TDuration::Seconds(10); + static constexpr TDuration DefaultPeriodicWakeupActivationPeriod = TDuration::Seconds(60); + static constexpr TDuration DefaultStatsReportInterval = TDuration::Seconds(10); static constexpr i64 GuaranteeIndexationStartBytesLimit = (i64)5 * 1024 * 1024 * 1024; TControlWrapper BlobWriteGrouppingEnabled; @@ -389,9 +391,9 @@ private: bool MediatorTimeCastRegistered = false; TSet<ui64> MediatorTimeCastWaitingSteps; TDuration MaxReadStaleness = TDuration::Minutes(5); // TODO: Make configurable? - TDuration ActivationPeriod = TDuration::Seconds(60); + const TDuration PeriodicWakeupActivationPeriod; TDuration FailActivationDelay = TDuration::Seconds(1); - TDuration StatsReportInterval = TDuration::Seconds(10); + const TDuration StatsReportInterval; TInstant LastAccessTime; TInstant LastStatsReport; @@ -482,7 +484,16 @@ private: void UpdateIndexCounters(); void UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage); ui64 MemoryUsage() const; + void SendPeriodicStats(); + void FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev); + void FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev); + void ConfigureStats(const NOlap::TColumnEngineStats& indexStats, ::NKikimrTableStats::TTableStats * tabletStats); + void FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats) const; + + static TDuration GetControllerPeriodicWakeupActivationPeriod(); + static TDuration GetControllerStatsReportInterval(); + public: const std::shared_ptr<NOlap::IStoragesManager>& GetStoragesManager() const { return StoragesManager; diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 7a40d59aa9..6b5f70674c 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -91,6 +91,12 @@ public: virtual TDuration GetGuaranteeIndexationInterval(const TDuration defaultValue) const { return defaultValue; } + virtual TDuration GetPeriodicWakeupActivationPeriod(const TDuration defaultValue) const { + return defaultValue; + } + virtual TDuration GetStatsReportInterval(const TDuration defaultValue) const { + return defaultValue; + } virtual ui64 GetGuaranteeIndexationStartBytesLimit(const ui64 defaultValue) const { return defaultValue; } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 60160940fe..29bf1995ff 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -8,6 +8,8 @@ private: YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0); YDB_READONLY(TAtomicCounter, Compactions, 0); YDB_ACCESSOR(std::optional<TDuration>, GuaranteeIndexationInterval, TDuration::Zero()); + YDB_ACCESSOR(std::optional<TDuration>, PeriodicWakeupActivationPeriod, std::nullopt); + YDB_ACCESSOR(std::optional<TDuration>, StatsReportInterval, std::nullopt); YDB_ACCESSOR(std::optional<ui64>, GuaranteeIndexationStartBytesLimit, 0); YDB_ACCESSOR(std::optional<TDuration>, OptimizerFreshnessCheckDuration, TDuration::Zero()); EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force; @@ -17,6 +19,12 @@ protected: virtual TDuration GetGuaranteeIndexationInterval(const TDuration defaultValue) const override { return GuaranteeIndexationInterval.value_or(defaultValue); } + TDuration GetPeriodicWakeupActivationPeriod(const TDuration defaultValue) const override { + return PeriodicWakeupActivationPeriod.value_or(defaultValue); + } + TDuration GetStatsReportInterval(const TDuration defaultValue) const override { + return StatsReportInterval.value_or(defaultValue); + } virtual ui64 GetGuaranteeIndexationStartBytesLimit(const ui64 defaultValue) const override { return GuaranteeIndexationStartBytesLimit.value_or(defaultValue); } diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index 340a8a2254..aebba18ca4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -91,6 +91,9 @@ public: // returns true to continue batching bool PersistSingleStats(const TPathId& pathId, const TStatsQueue<TEvDataShard::TEvPeriodicTableStats>::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override; void ScheduleNextBatch(const TActorContext& ctx) override; + + template<typename T> + TPartitionStats PrepareStats(const TActorContext& ctx, const T& rec) const; }; @@ -121,30 +124,12 @@ THolder<TProposeRequest> MergeRequest( return std::move(request); } -bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, - const TStatsQueueItem<TEvDataShard::TEvPeriodicTableStats>& item, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) { - const auto& rec = item.Ev->Get()->Record; - auto datashardId = TTabletId(rec.GetDatashardId()); +template<typename T> +TPartitionStats TTxStoreTableStats::PrepareStats( + const TActorContext& ctx, const T& rec) const { const auto& tableStats = rec.GetTableStats(); const auto& tabletMetrics = rec.GetTabletMetrics(); - ui64 dataSize = tableStats.GetDataSize(); - ui64 rowCount = tableStats.GetRowCount(); - - bool isDataShard = Self->Tables.contains(pathId); - bool isOlapStore = Self->OlapStores.contains(pathId); - bool isColumnTable = Self->ColumnTables.contains(pathId); - if (!isDataShard && !isOlapStore && !isColumnTable) { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unexpected stats from shard " << datashardId); - return true; - } - - if (!Self->TabletIdToShardIdx.contains(datashardId)) { - LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "No shardIdx for shard " << datashardId); - return true; - } - - TShardIdx shardIdx = Self->TabletIdToShardIdx[datashardId]; TPartitionStats newStats; newStats.SeqNo = TMessageSeqNo(rec.GetGeneration(), rec.GetRound()); @@ -194,8 +179,47 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, } newStats.ShardState = rec.GetShardState(); + return newStats; +} + +bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, + const TStatsQueueItem<TEvDataShard::TEvPeriodicTableStats>& item, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) { + const auto& rec = item.Ev->Get()->Record; + const auto datashardId = TTabletId(rec.GetDatashardId()); + + const auto& tableStats = rec.GetTableStats(); + ui64 dataSize = tableStats.GetDataSize(); + ui64 rowCount = tableStats.GetRowCount(); + + const bool isDataShard = Self->Tables.contains(pathId); + const bool isOlapStore = Self->OlapStores.contains(pathId); + const bool isColumnTable = Self->ColumnTables.contains(pathId); + + if (!isDataShard && !isOlapStore && !isColumnTable) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unexpected stats from shard " << datashardId); + return true; + } + + if (!Self->TabletIdToShardIdx.contains(datashardId)) { + LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "No shardIdx for shard " << datashardId); + return true; + } + + TShardIdx shardIdx = Self->TabletIdToShardIdx[datashardId]; + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Add stats from shard " << datashardId << ", pathId " << pathId.LocalPathId + "TTxStoreTableStats.PersistSingleStats: main stats from" + << " datashardId(TabletID)=" << datashardId + << " maps to shardIdx: " << shardIdx + << ", pathId: " << pathId + << ", pathId map=" << Self->PathsById[pathId]->Name + << ", is column=" << isColumnTable + << ", is olap=" << isOlapStore); + + const TPartitionStats newStats = PrepareStats(ctx, rec); + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Add stats from shard with datashardId(TabletID)=" << datashardId << ", pathId " << pathId.LocalPathId << ": RowCount " << newStats.RowCount << ", DataSize " << newStats.DataSize); NIceDb::TNiceDb db(txc.DB); @@ -235,7 +259,32 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, olapStore->UpdateShardStats(shardIdx, newStats); newAggrStats = olapStore->GetStats().Aggregated; updateSubdomainInfo = true; + + const auto tables = rec.GetTables(); + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "OLAP store contains " << tables.size() << " tables."); + + for(const auto& table : tables) { + const TPartitionStats newTableStats = PrepareStats(ctx, table); + + const TPathId tablePathId = TPathId(TOwnerId(pathId.OwnerId), TLocalPathId(table.GetTableLocalId())); + + if (Self->ColumnTables.contains(tablePathId)) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "add stats for exists table with pathId=" << tablePathId); + + auto columnTable = Self->ColumnTables.TakeVerified(tablePathId); + columnTable->UpdateTableStats(tablePathId, newTableStats); + } else { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "failed add stats for table with pathId=" << tablePathId); + } + } + } else if (isColumnTable) { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "PersistSingleStats: ColumnTable rec.GetColumnTables() size=" + << rec.GetTables().size()); + auto columnTable = Self->ColumnTables.TakeVerified(pathId); oldAggrStats = columnTable->GetStats().Aggregated; columnTable->UpdateShardStats(shardIdx, newStats); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index d3b4dcc472..0f3cc648ce 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1477,6 +1477,35 @@ void TAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPartition } } +void TAggregatedStats::UpdateTableStats(const TPathId& pathId, const TPartitionStats& newStats) { + if (!TableStats.contains(pathId)) { + TableStats[pathId] = newStats; + return; + } + + TPartitionStats& oldStats = TableStats[pathId]; + + if (newStats.SeqNo <= oldStats.SeqNo) { + // Ignore outdated message + return; + } + + if (newStats.SeqNo.Generation > oldStats.SeqNo.Generation) { + // Reset incremental counter baselines if tablet has restarted + oldStats.ImmediateTxCompleted = 0; + oldStats.PlannedTxCompleted = 0; + oldStats.TxRejectedByOverload = 0; + oldStats.TxRejectedBySpace = 0; + oldStats.RowUpdates = 0; + oldStats.RowDeletes = 0; + oldStats.RowReads = 0; + oldStats.RangeReads = 0; + oldStats.RangeReadRows = 0; + } + TableStats[pathId].RowCount += (newStats.RowCount - oldStats.RowCount); + TableStats[pathId].DataSize += (newStats.DataSize - oldStats.DataSize); +} + void TTableInfo::RegisterSplitMergeOp(TOperationId opId, const TTxState& txState) { Y_ABORT_UNLESS(txState.TxType == TTxState::TxSplitTablePartition || txState.TxType == TTxState::TxMergeTablePartition); Y_ABORT_UNLESS(txState.SplitDescription); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 501b8c6853..7a7491c0a8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -302,9 +302,11 @@ private: struct TAggregatedStats { TPartitionStats Aggregated; THashMap<TShardIdx, TPartitionStats> PartitionStats; + THashMap<TPathId, TPartitionStats> TableStats; size_t PartitionStatsUpdated = 0; void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats); + void UpdateTableStats(const TPathId& pathId, const TPartitionStats& newStats); }; struct TSubDomainInfo; @@ -1001,11 +1003,15 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> { return Stats; } - void UpdateShardStats(TShardIdx shardIdx, const TPartitionStats& newStats) { + void UpdateShardStats(const TShardIdx shardIdx, const TPartitionStats& newStats) { Stats.Aggregated.PartCount = ColumnShards.size(); Stats.PartitionStats[shardIdx]; // insert if none Stats.UpdateShardStats(shardIdx, newStats); } + + void UpdateTableStats(const TPathId& pathId, const TPartitionStats& newStats) { + Stats.UpdateTableStats(pathId, newStats); + } }; struct TTopicStats { diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 397e008c98..7e36ac8d1e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -48,6 +48,11 @@ static void FillAggregatedStats(NKikimrSchemeOp::TPathDescription& pathDescripti FillTableMetrics(pathDescription.MutableTabletMetrics(), stats.Aggregated); } +static void FillTableStats(NKikimrSchemeOp::TPathDescription& pathDescription, const TPartitionStats& stats) { + FillTableStats(pathDescription.MutableTableStats(), stats); + FillTableMetrics(pathDescription.MutableTabletMetrics(), stats); +} + void TPathDescriber::FillPathDescr(NKikimrSchemeOp::TDirEntry* descr, TPathElement::TPtr pathEl, TPathElement::EPathSubType subType) { FillChildDescr(descr, pathEl); @@ -369,6 +374,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa void TPathDescriber::DescribeOlapStore(TPathId pathId, TPathElement::TPtr pathEl) { const TOlapStoreInfo::TPtr storeInfo = *Self->OlapStores.FindPtr(pathId); + Y_ABORT_UNLESS(storeInfo, "OlapStore not found"); Y_UNUSED(pathEl); @@ -387,7 +393,7 @@ void TPathDescriber::DescribeOlapStore(TPathId pathId, TPathElement::TPtr pathEl } void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr pathEl) { - const auto tableInfo = Self->ColumnTables.GetVerified(pathId); + const auto tableInfo = Self->ColumnTables.GetVerified(pathId); Y_UNUSED(pathEl); auto* pathDescription = Result->Record.MutablePathDescription(); @@ -407,6 +413,9 @@ void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr path if (description->HasSchemaPresetVersionAdj()) { description->MutableSchema()->SetVersion(description->GetSchema().GetVersion() + description->GetSchemaPresetVersionAdj()); } + if (tableInfo->GetStats().TableStats.contains(pathId)) { + FillTableStats(*pathDescription, tableInfo->GetStats().TableStats.at(pathId)); + } } } |