diff options
author | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-03-15 15:08:25 +0300 |
---|---|---|
committer | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-03-15 15:08:25 +0300 |
commit | 8c05ac3806c3eb59f2212629dfc55a4c759d16d9 (patch) | |
tree | b66be18de6b8d7054bb3c867048ff21dedf7eb1a | |
parent | 3a16d48c92221e5022802835c8702513a68eb543 (diff) | |
download | ydb-8c05ac3806c3eb59f2212629dfc55a4c759d16d9.tar.gz |
KIKIMR-9748: minor fixes of metrics, logs and compaction queue updating
ref:250f0b458b8cca166b9d175cc2054a58974462a5
20 files changed, 252 insertions, 108 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 956a1369378..db413eaceab 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -108,6 +108,7 @@ enum ECumulativeCounters { COUNTER_TX_BACKGROUND_COMPACTION_NOT_NEEDED = 82 [(CounterOpts) = {Name: "TxCompactTableNotNeeded"}]; COUNTER_TX_BACKGROUND_COMPACTION_FAILED_BORROWED = 83 [(CounterOpts) = {Name: "TxCompactTableFailedBorrowed"}]; COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START = 84 [(CounterOpts) = {Name: "TxCompactTableFailedStart"}]; + COUNTER_FULL_COMPACTION_DONE = 85 [(CounterOpts) = {Name: "FullCompactionCount"}]; } enum EPercentileCounters { @@ -308,41 +309,6 @@ enum EPercentileCounters { Ranges: { Value: 15000 Name: "15000"}, Ranges: { Value: 30000 Name: "30000"} }]; - - COUNTER_SHARDS_WITH_SEARCH_HEIGHT = 18 [(CounterOpts) = {Name: "ShardsWithSearchHeight", - Integral: true, - Ranges: { Value: 0 Name: "0"}, - Ranges: { Value: 1 Name: "1"}, - Ranges: { Value: 2 Name: "2"}, - Ranges: { Value: 3 Name: "3"}, - Ranges: { Value: 4 Name: "4"}, - Ranges: { Value: 5 Name: "5"}, - Ranges: { Value: 6 Name: "6"}, - Ranges: { Value: 7 Name: "7"}, - Ranges: { Value: 8 Name: "8"}, - Ranges: { Value: 9 Name: "9"}, - Ranges: { Value: 10 Name: "10"}, - Ranges: { Value: 11 Name: "11"}, - Ranges: { Value: 12 Name: "12"}, - Ranges: { Value: 13 Name: "13"}, - Ranges: { Value: 14 Name: "14"}, - Ranges: { Value: 15 Name: "15"}, - Ranges: { Value: 20 Name: "20"}, - }]; - - // buckets are hours since last full compaction ts, i.e. number of shards - // compacted within 1 hour or within 24 hours - COUNTER_SHARDS_WITH_FULL_COMPACTION = 19 [(CounterOpts) = {Name: "ShardsFullCompactionAgeHours", - Integral: true, - Ranges: { Value: 0 Name: "0"}, - Ranges: { Value: 1 Name: "1"}, - Ranges: { Value: 6 Name: "6"}, - Ranges: { Value: 12 Name: "12"}, - Ranges: { Value: 24 Name: "24"}, - Ranges: { Value: 48 Name: "48"}, - Ranges: { Value: 72 Name: "72"}, - Ranges: { Value: 168 Name: "inf"}, - }]; } enum ETxTypes { diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index d5a6cbc4273..d36819d1014 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -240,6 +240,9 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateReplication = 70 [(CounterOpts) = {Name: "FinishedOps/CreateReplication"}]; COUNTER_FINISHED_OPS_TxAlterReplication = 71 [(CounterOpts) = {Name: "FinishedOps/AlterReplication"}]; COUNTER_FINISHED_OPS_TxDropReplication = 72 [(CounterOpts) = {Name: "FinishedOps/DropReplication"}]; + + COUNTER_BACKGROUND_COMPACTION_BORROWED = 73 [(CounterOpts) = {Name: "BackgroundCompactionBorrowed"}]; + COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED = 74 [(CounterOpts) = {Name: "BackgroundCompactionNotNeeded"}]; } enum EPercentileCounters { @@ -264,7 +267,9 @@ enum EPercentileCounters { Ranges { Value: 32000000 Name: "(18) 32 < s" } }; - COUNTER_BACKGROUND_COMPACTION_OK_LATENCY = 0 [(CounterOpts) = {Name: "BackgroundCompactionOkLatency"}]; + COUNTER_BACKGROUND_COMPACTION_OK_LATENCY = 0 [(CounterOpts) = { + Name: "BackgroundCompactionOkLatency" + }]; COUNTER_NUM_SHARDS_BY_TTL_LAG = 1 [(CounterOpts) = { Name: "NumShardsByTtlLag" @@ -279,6 +284,57 @@ enum EPercentileCounters { Ranges { Value: 57600 Name: "57600" } Ranges { Value: 86400 Name: "inf" } }]; + + COUNTER_SHARDS_WITH_SEARCH_HEIGHT = 2 [(CounterOpts) = { + Name: "ShardsWithSearchHeight", + Integral: true, + Ranges: { Value: 0 Name: "0" }, + Ranges: { Value: 1 Name: "1" }, + Ranges: { Value: 2 Name: "2" }, + Ranges: { Value: 3 Name: "3" }, + Ranges: { Value: 4 Name: "4" }, + Ranges: { Value: 5 Name: "5" }, + Ranges: { Value: 6 Name: "6" }, + Ranges: { Value: 7 Name: "7" }, + Ranges: { Value: 8 Name: "8" }, + Ranges: { Value: 9 Name: "9" }, + Ranges: { Value: 10 Name: "10" }, + Ranges: { Value: 11 Name: "11" }, + Ranges: { Value: 12 Name: "12" }, + Ranges: { Value: 13 Name: "13" }, + Ranges: { Value: 14 Name: "14" }, + Ranges: { Value: 15 Name: "15" }, + Ranges: { Value: 20 Name: "20" }, + }]; + + // buckets are hours since last full compaction ts, i.e. number of shards + // compacted within 1 hour or within 24 hours + COUNTER_SHARDS_WITH_FULL_COMPACTION = 3 [(CounterOpts) = { + Name: "ShardsFullCompactionAgeHours", + Integral: true, + Ranges: { Value: 0 Name: "0" }, + Ranges: { Value: 1 Name: "1" }, + Ranges: { Value: 6 Name: "6" }, + Ranges: { Value: 12 Name: "12" }, + Ranges: { Value: 24 Name: "24" }, + Ranges: { Value: 48 Name: "48" }, + Ranges: { Value: 72 Name: "72" }, + Ranges: { Value: 168 Name: "inf" }, + }]; + + COUNTER_SHARDS_WITH_ROW_DELETES = 4 [(CounterOpts) = { + Name: "ShardsWithRowDeletes", + Integral: true, + Ranges: { Value: 0 Name: "0" }, + Ranges: { Value: 100 Name: "100" }, + Ranges: { Value: 1000 Name: "1000" }, + Ranges: { Value: 10000 Name: "10000" }, + Ranges: { Value: 100000 Name: "100000" }, + Ranges: { Value: 1000000 Name: "1000000" }, + Ranges: { Value: 10000000 Name: "10000000" }, + Ranges: { Value: 100000000 Name: "100000000" }, + Ranges: { Value: 1000000000 Name: "inf" }, + }]; } enum ETxTypes { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 341b81f20b6..9eecd56131e 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1408,6 +1408,7 @@ message TEvCompactTableResult { OK = 0; NOT_NEEDED = 1; FAILED = 2; + BORROWED = 3; }; optional uint64 TabletId = 1; @@ -1688,7 +1689,7 @@ message TEvReplicationSourceOffsetsAck { optional uint64 ReadId = 1; // The client is acknowledging every SeqNo <= AckSeqNo has been received and processed - // This will remove their corresponding + // This will remove their corresponding optional uint64 AckSeqNo = 2; // The client may optionally change the current window size diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp index 7ab0bfeca57..8b12088ccd6 100644 --- a/ydb/core/tx/datashard/datashard__compaction.cpp +++ b/ydb/core/tx/datashard/datashard__compaction.cpp @@ -93,7 +93,7 @@ public: auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>( Self->TabletID(), pathId, - NKikimrTxDataShard::TEvCompactTableResult::FAILED); + NKikimrTxDataShard::TEvCompactTableResult::BORROWED); ctx.Send(Ev->Sender, std::move(response)); return true; } @@ -177,7 +177,7 @@ public: << ", last full compaction# " << Ts); } }; - + void TDataShard::Handle(TEvDataShard::TEvCompactTable::TPtr& ev, const TActorContext& ctx) { Executor()->Execute(new TTxCompactTable(this, ev), ctx); } @@ -190,6 +190,7 @@ void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) { if (ti.second->LocalTid != tableId && ti.second->ShadowTid != tableId) continue; if (ti.second->Stats.LastFullCompaction < finishedInfo.FullCompactionTs) { + IncCounter(COUNTER_FULL_COMPACTION_DONE); ti.second->Stats.LastFullCompaction = finishedInfo.FullCompactionTs; Executor()->Execute( new TTxPersistFullCompactionTs( diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp index 129a14dca5f..3ee32261af4 100644 --- a/ydb/core/tx/datashard/datashard__stats.cpp +++ b/ydb/core/tx/datashard/datashard__stats.cpp @@ -196,7 +196,7 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo dataSize += tableInfo.Stats.DataStats.DataSize; - UpdateSearchHeightStats(tableInfo.Stats, ev->Get()->SearchHeight); + tableInfo.Stats.SearchHeight = ev->Get()->SearchHeight; tableInfo.StatsUpdateInProgress = false; @@ -258,12 +258,10 @@ public: searchHeight = 0; } - Self->UpdateFullCompactionTsMetric(ti.second->Stats); - if (!ti.second->StatsNeedUpdate) { ti.second->Stats.MemRowCount = memRowCount; ti.second->Stats.MemDataSize = memDataSize; - Self->UpdateSearchHeightStats(ti.second->Stats, searchHeight); + ti.second->Stats.SearchHeight = searchHeight; continue; } @@ -361,38 +359,6 @@ void TDataShard::UpdateTableStats(const TActorContext &ctx) { Executor()->Execute(new TTxInitiateStatsUpdate(this), ctx); } -void TDataShard::UpdateSearchHeightStats(TUserTable::TStats& stats, ui64 newSearchHeight) { - if (TabletCounters) { - if (stats.LastSearchHeightMetricSet) - TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].DecrementFor(stats.SearchHeight); - - TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].IncrementFor(newSearchHeight); - stats.LastSearchHeightMetricSet = true; - } - stats.SearchHeight = newSearchHeight; -} - -void TDataShard::UpdateFullCompactionTsMetric(TUserTable::TStats& stats) { - if (!TabletCounters) - return; - - auto now = AppData()->TimeProvider->Now(); - if (now < stats.LastFullCompaction) { - // extra sanity check - return; - } - - auto newHours = (now - stats.LastFullCompaction).Hours(); - if (stats.HoursSinceFullCompaction && newHours == *stats.HoursSinceFullCompaction) - return; - - if (stats.HoursSinceFullCompaction) - TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].DecrementFor(*stats.HoursSinceFullCompaction); - - TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].IncrementFor(newHours); - stats.HoursSinceFullCompaction = newHours; -} - void TDataShard::CollectCpuUsage(const TActorContext &ctx) { auto* metrics = Executor()->GetResourceMetrics(); TInstant now = AppData(ctx)->TimeProvider->Now(); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index c144311a4e7..0dc7861e125 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1300,8 +1300,6 @@ public: NMiniKQL::IKeyAccessSampler::TPtr GetKeyAccessSampler(); void EnableKeyAccessSampling(const TActorContext &ctx, TInstant until); void UpdateTableStats(const TActorContext& ctx); - void UpdateSearchHeightStats(TUserTable::TStats& stats, ui64 newSearchHeight); - void UpdateFullCompactionTsMetric(TUserTable::TStats& stats); void CollectCpuUsage(const TActorContext& ctx); void ScanComplete(NTable::EAbort status, TAutoPtr<IDestructable> prod, ui64 cookie, const TActorContext &ctx) override; diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 962ef2ead3e..4cc7845d8b4 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -320,10 +320,6 @@ struct TUserTable : public TThrRefBase { ui64 BackgroundCompactionRequests = 0; NTable::TKeyAccessSample AccessStats; - bool LastSearchHeightMetricSet = false; - - std::optional<ui32> HoursSinceFullCompaction; - void Update(NTable::TStats&& dataStats, ui64 indexSize, THashSet<ui64>&& partOwners, ui64 partCount, TInstant statsUpdateTime) { DataStats = dataStats; IndexSize = indexSize; diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h index cb745fcb517..148daffa3bf 100644 --- a/ydb/core/tx/schemeshard/operation_queue_timer.h +++ b/ydb/core/tx/schemeshard/operation_queue_timer.h @@ -57,6 +57,8 @@ public: TActorBase::PassAway(); } + TInstant GetWakeupTime() const { return When; } + private: // ITimer, note that it is made private, // since it should be called only from TBase diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp index 44bb3934151..b97a0cff1eb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp @@ -5,7 +5,6 @@ namespace NSchemeShard { NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardIdx& shardId) { UpdateBackgroundCompactionQueueMetrics(); - TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_TIMEOUT].Increment(CompactionQueue->ResetTimeoutCount()); auto ctx = TActivationContext::ActorContextFor(SelfId()); @@ -23,7 +22,10 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCompaction " "for pathId# " << pathId << ", datashard# " << datashardId + << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" + << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" << ", running# " << CompactionQueue->RunningSize() << " shards" << " at schemeshard " << TabletID()); @@ -35,6 +37,34 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha return NOperationQueue::EStartStatus::EOperationRunning; } +void TSchemeShard::OnBackgroundCompactionTimeout(const TShardIdx& shardId) { + UpdateBackgroundCompactionQueueMetrics(); + + TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_TIMEOUT].Increment(1); + + auto ctx = TActivationContext::ActorContextFor(SelfId()); + + auto it = ShardInfos.find(shardId); + if (it == ShardInfos.end()) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info " + "for timeout background compaction# " << shardId + << " at schemeshard# " << TabletID()); + return; + } + + const auto& datashardId = it->second.TabletID; + const auto& pathId = it->second.PathId; + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background compaction timeout " + "for pathId# " << pathId << ", datashard# " << datashardId + << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", rate# " << CompactionQueue->GetRate() + << ", in queue# " << CompactionQueue->Size() << " shards" + << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" + << ", running# " << CompactionQueue->RunningSize() << " shards" + << " at schemeshard " << TabletID()); +} + void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const TActorContext &ctx) { const auto& record = ev->Get()->Record; @@ -55,13 +85,21 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T if (shardIdx == InvalidShardIdx) { LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction of unknown shard " "for pathId# " << pathId << ", datashard# " << tabletId + << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() + << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" + << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" << ", running# " << CompactionQueue->RunningSize() << " shards" << " at schemeshard " << TabletID()); } else { LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction " "for pathId# " << pathId << ", datashard# " << tabletId + << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() + << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" + << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" << ", running# " << CompactionQueue->RunningSize() << " shards" << " at schemeshard " << TabletID()); } @@ -70,19 +108,54 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T switch (record.GetStatus()) { case NKikimrTxDataShard::TEvCompactTableResult::OK: - case NKikimrTxDataShard::TEvCompactTableResult::NOT_NEEDED: TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_OK].Increment(1); if (duration) histCounters[COUNTER_BACKGROUND_COMPACTION_OK_LATENCY].IncrementFor(duration.MilliSeconds()); break; + case NKikimrTxDataShard::TEvCompactTableResult::NOT_NEEDED: + TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED].Increment(1); + break; case NKikimrTxDataShard::TEvCompactTableResult::FAILED: TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_FAILED].Increment(1); break; + case NKikimrTxDataShard::TEvCompactTableResult::BORROWED: + TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_BORROWED].Increment(1); + break; } UpdateBackgroundCompactionQueueMetrics(); } +void TSchemeShard::EnqueueCompaction(TShardCompactionInfo&& info) { + if (!CompactionQueue) + return; + + CompactionQueue->Enqueue(std::move(info)); + UpdateBackgroundCompactionQueueMetrics(); +} + +void TSchemeShard::UpdateCompaction(TShardCompactionInfo&& info) { + if (!CompactionQueue) + return; + + if (!CompactionQueue->Update(info)) + CompactionQueue->Enqueue(std::move(info)); + UpdateBackgroundCompactionQueueMetrics(); +} + +void TSchemeShard::RemoveCompaction(const TShardIdx& shardIdx) { + if (!CompactionQueue) + return; + + CompactionQueue->Remove(TShardCompactionInfo(shardIdx)); + UpdateBackgroundCompactionQueueMetrics(); +} + +void TSchemeShard::ShardRemoved(const TShardIdx& shardIdx) { + RemoveCompaction(shardIdx); + RemoveShardMetrics(shardIdx); +} + void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() { TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_SIZE].Set(CompactionQueue->Size()); TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_RUNNING].Set(CompactionQueue->RunningSize()); @@ -94,5 +167,51 @@ void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() { TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_SIZE_DELETES].Set(queue.SizeByRowDeletes()); } +void TSchemeShard::UpdateShardMetrics( + const TShardIdx& shardIdx, + const TTableInfo::TPartitionStats& newStats) +{ + THashMap<TShardIdx, TPartitionMetrics>::insert_ctx insertCtx; + auto it = PartitionMetricsMap.find(shardIdx, insertCtx); + if (it != PartitionMetricsMap.end()) { + const auto& metrics = it->second; + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].DecrementFor(metrics.SearchHeight); + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].DecrementFor(metrics.HoursSinceFullCompaction); + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].DecrementFor(metrics.RowDeletes); + } else { + it = PartitionMetricsMap.insert_direct(std::make_pair(shardIdx, TPartitionMetrics()), insertCtx); + } + + auto& metrics = it->second; + + metrics.SearchHeight = newStats.SearchHeight; + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].IncrementFor(metrics.SearchHeight); + + metrics.RowDeletes = newStats.RowDeletes; + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].IncrementFor(metrics.RowDeletes); + + auto now = AppData()->TimeProvider->Now(); + auto compactionTime = TInstant::Seconds(newStats.FullCompactionTs); + if (now >= compactionTime) + metrics.HoursSinceFullCompaction = (now - compactionTime).Hours(); + else + metrics.HoursSinceFullCompaction = 0; + + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].IncrementFor(metrics.HoursSinceFullCompaction); +} + +void TSchemeShard::RemoveShardMetrics(const TShardIdx& shardIdx) { + auto it = PartitionMetricsMap.find(shardIdx); + if (it == PartitionMetricsMap.end()) + return; + + const auto& metrics = it->second; + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].DecrementFor(metrics.SearchHeight); + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].DecrementFor(metrics.HoursSinceFullCompaction); + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].DecrementFor(metrics.RowDeletes); + + PartitionMetricsMap.erase(it); +} + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp index 855553cddc8..060001c0741 100644 --- a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp @@ -154,6 +154,8 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase { NIceDb::TNiceDb db(txc.DB); Self->PersistUnknownShardDeleted(db, ShardIdx); } + + Self->ShardRemoved(ShardIdx); } void DoComplete(const TActorContext &ctx) override { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 762bdbb55d2..1d07f1a93d8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2170,6 +2170,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { tableInfo->UpdateShardStats(shardIdx, stats); + // note that we don't update shard metrics here, because we will always update + // the shard metrics in TSchemeShard::SetPartitioning + if (!rowSet.Next()) { return false; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 24423ce1a6f..1395e37aa37 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -394,6 +394,7 @@ void NTableState::UpdatePartitioningForCopyTable(TOperationId operationId, TTxSt context.SS->ShardInfos.erase(shard.Idx); domainInfo->RemoveInternalShard(shard.Idx); context.SS->DecrementPathDbRefCount(pathId, "remove shard from txState"); + context.SS->ShardRemoved(shard.Idx); } } txState.Shards.clear(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp index d7efc1dbf5f..126d02d850c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp @@ -126,6 +126,7 @@ void TMemoryChanges::UnDo(TSchemeShard* ss) { ss->ShardInfos[id] = *elem; } else { ss->ShardInfos.erase(id); + ss->ShardRemoved(id); } Shards.pop(); } diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index 00e6d8fad04..dc85ff4e050 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -191,11 +191,9 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte auto oldAggrStats = table->GetStats().Aggregated; table->UpdateShardStats(shardIdx, newStats); - if (Self->CompactionQueue) { - TShardCompactionInfo compactionInfo(shardIdx, newStats); - if (!Self->CompactionQueue->Update(compactionInfo)) - Self->CompactionQueue->Enqueue(std::move(compactionInfo)); - Self->UpdateBackgroundCompactionQueueMetrics(); + if (!table->IsBackup) { + Self->UpdateCompaction(TShardCompactionInfo(shardIdx, newStats)); + Self->UpdateShardMetrics(shardIdx, newStats); } NIceDb::TNiceDb db(txc.DB); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index d88e53c3769..92ed532ab28 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -306,6 +306,11 @@ void TSchemeShard::Clear() { TabletCounters->Simple()[idx].Set(0); } TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].Clear(); + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].Clear(); + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].Clear(); + TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].Clear(); + + PartitionMetricsMap.clear(); } void TSchemeShard::IncrementPathDbRefCount(const TPathId& pathId, const TStringBuf& debug) { @@ -2074,8 +2079,7 @@ void TSchemeShard::DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId pa db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete(); db.Table<Schema::TablePartitionStats>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete(); - CompactionQueue->Remove(TShardCompactionInfo(partitions[pi].ShardIdx)); - UpdateBackgroundCompactionQueueMetrics(); + ShardRemoved(partitions[pi].ShardIdx); } } @@ -3250,10 +3254,10 @@ void TSchemeShard::PersistRemoveTable(NIceDb::TNiceDb& db, TPathId pathId, const } } + // sanity check: by this time compaction queue and metrics must be updated already for (const auto& p: tableInfo->GetPartitions()) { - CompactionQueue->Remove(TShardCompactionInfo(p.ShardIdx)); + ShardRemoved(p.ShardIdx); } - UpdateBackgroundCompactionQueueMetrics(); Tables.erase(pathId); DecrementPathDbRefCount(pathId, "remove table"); @@ -5798,8 +5802,8 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T const auto& partitionStats = tableInfo->GetStats().PartitionStats; auto it = partitionStats.find(p.ShardIdx); if (it != partitionStats.end()) { - CompactionQueue->Enqueue(TShardCompactionInfo(p.ShardIdx, it->second)); - UpdateBackgroundCompactionQueueMetrics(); + EnqueueCompaction(TShardCompactionInfo(p.ShardIdx, it->second)); + UpdateShardMetrics(p.ShardIdx, it->second); } } } @@ -6008,13 +6012,6 @@ void TSchemeShard::ConfigureCompactionQueue( compactionConfig.MaxRate = config.GetMaxRate(); compactionConfig.MinOperationRepeatDelay = TDuration::Seconds(config.GetMinCompactionRepeatDelaySeconds()); - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "CompactionQueue configured: Timeout# " << compactionConfig.Timeout - << ", WakeupInterval# " << compactionConfig.WakeupInterval - << ", RoundInterval# " << compactionConfig.RoundInterval - << ", InflightLimit# " << compactionConfig.InflightLimit - << ", MaxRate# " << compactionConfig.MaxRate); - if (CompactionQueue) { CompactionQueue->UpdateConfig(compactionConfig, queueConfig); } else { @@ -6023,6 +6020,15 @@ void TSchemeShard::ConfigureCompactionQueue( queueConfig, CompactionStarter); } + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "CompactionQueue configured: Timeout# " << compactionConfig.Timeout + << ", Rate# " << CompactionQueue->GetRate() + << ", WakeupInterval# " << compactionConfig.WakeupInterval + << ", RoundInterval# " << compactionConfig.RoundInterval + << ", InflightLimit# " << compactionConfig.InflightLimit + << ", MinCompactionRepeatDelaySeconds# " << compactionConfig.MinOperationRepeatDelay + << ", MaxRate# " << compactionConfig.MaxRate); } void TSchemeShard::StartStopCompactionQueue() { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index a2954065d07..53b96241201 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -86,10 +86,14 @@ private: : Self(self) { } - NOperationQueue::EStartStatus StartOperation(const TShardCompactionInfo& info) { + NOperationQueue::EStartStatus StartOperation(const TShardCompactionInfo& info) override { return Self->StartBackgroundCompaction(info.ShardIdx); } + void OnTimeout(const TShardCompactionInfo& info) override { + Self->OnBackgroundCompactionTimeout(info.ShardIdx); + } + private: TSchemeShard* Self; }; @@ -182,6 +186,14 @@ public: THashMap<TTabletId, TShardIdx> TabletIdToShardIdx; THashMap<TShardIdx, TVector<TActorId>> ShardDeletionSubscribers; // for tests + // in case of integral hists we need to remember what values we have set + struct TPartitionMetrics { + ui64 SearchHeight = 0; + ui64 RowDeletes = 0; + ui32 HoursSinceFullCompaction = 0; + }; + THashMap<TShardIdx, TPartitionMetrics> PartitionMetricsMap; + TActorId SchemeBoardPopulator; static constexpr ui32 InitiateCachedTxIdsCount = 100; @@ -623,7 +635,17 @@ public: void ScheduleCleanDroppedPaths(); void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx); + void EnqueueCompaction(TShardCompactionInfo&& info); + void UpdateCompaction(TShardCompactionInfo&& info); + void RemoveCompaction(const TShardIdx& shardIdx); + + void UpdateShardMetrics(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats); + void RemoveShardMetrics(const TShardIdx& shardIdx); + + void ShardRemoved(const TShardIdx& shardIdx); + NOperationQueue::EStartStatus StartBackgroundCompaction(const TShardIdx& shardId); + void OnBackgroundCompactionTimeout(const TShardIdx& shardId); void UpdateBackgroundCompactionQueueMetrics(); struct TTxCleanDroppedSubDomains; diff --git a/ydb/core/util/circular_queue.h b/ydb/core/util/circular_queue.h index ffffb6db57e..bc4195c982d 100644 --- a/ydb/core/util/circular_queue.h +++ b/ydb/core/util/circular_queue.h @@ -180,7 +180,7 @@ public: Queue.PopFrontToBack(); } - bool Empty() const { + bool Empty() const { return Queue.Empty(); } diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h index e4075e05158..19c8750b490 100644 --- a/ydb/core/util/operation_queue.h +++ b/ydb/core/util/operation_queue.h @@ -37,6 +37,10 @@ public: virtual ~IStarter() = default; virtual EStartStatus StartOperation(const T& item) = 0; + + // in many cases just for metrics/logging, because + // queue is able to restart operation itself + virtual void OnTimeout(const T& item) = 0; }; struct TConfig { @@ -262,8 +266,6 @@ private: bool Running = false; bool WasRunning = false; - ui64 TimeoutCount = 0; - // operations / s double Rate = 0; @@ -389,8 +391,6 @@ public: double GetRate() const { return Rate; } - ui64 ResetTimeoutCount() { return TimeoutCount; TimeoutCount = 0; } - const TQueue& GetReadyQueue() const { return ReadyQueue; } // copies items, should be used in tests only @@ -551,7 +551,7 @@ void TOperationQueue<T, TQueue>::CheckTimeoutOperations() { while (!RunningItems.Empty()) { const auto& item = RunningItems.Front(); if (item.Timestamp + Config.Timeout <= now) { - ++TimeoutCount; + Starter.OnTimeout(item.Item); if (Config.IsCircular) ReEnqueueNoStart(std::move(item.Item)); RunningItems.PopFront(); diff --git a/ydb/core/util/operation_queue_priority_ut.cpp b/ydb/core/util/operation_queue_priority_ut.cpp index 6912e0c8c81..32bd274fe66 100644 --- a/ydb/core/util/operation_queue_priority_ut.cpp +++ b/ydb/core/util/operation_queue_priority_ut.cpp @@ -103,6 +103,9 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue WakeupHistory.push_back(t); } + void OnTimeout(const TPriorityItem&) override + {} + TInstant Now() override { return TimeProvider.Now(); diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp index 3c88f45a6b8..1015cb217e3 100644 --- a/ydb/core/util/operation_queue_ut.cpp +++ b/ydb/core/util/operation_queue_ut.cpp @@ -53,6 +53,9 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim WakeupHistory.push_back(t); } + void OnTimeout(const int&) override + {} + TInstant Now() override { return TimeProvider.Now(); |