aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-03-15 15:08:25 +0300
committerEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-03-15 15:08:25 +0300
commit8c05ac3806c3eb59f2212629dfc55a4c759d16d9 (patch)
treeb66be18de6b8d7054bb3c867048ff21dedf7eb1a
parent3a16d48c92221e5022802835c8702513a68eb543 (diff)
downloadydb-8c05ac3806c3eb59f2212629dfc55a4c759d16d9.tar.gz
KIKIMR-9748: minor fixes of metrics, logs and compaction queue updating
ref:250f0b458b8cca166b9d175cc2054a58974462a5
-rw-r--r--ydb/core/protos/counters_datashard.proto36
-rw-r--r--ydb/core/protos/counters_schemeshard.proto58
-rw-r--r--ydb/core/protos/tx_datashard.proto3
-rw-r--r--ydb/core/tx/datashard/datashard__compaction.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard__stats.cpp38
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h2
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h4
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__compaction.cpp123
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp32
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h24
-rw-r--r--ydb/core/util/circular_queue.h2
-rw-r--r--ydb/core/util/operation_queue.h10
-rw-r--r--ydb/core/util/operation_queue_priority_ut.cpp3
-rw-r--r--ydb/core/util/operation_queue_ut.cpp3
20 files changed, 252 insertions, 108 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index 956a1369378..db413eaceab 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -108,6 +108,7 @@ enum ECumulativeCounters {
COUNTER_TX_BACKGROUND_COMPACTION_NOT_NEEDED = 82 [(CounterOpts) = {Name: "TxCompactTableNotNeeded"}];
COUNTER_TX_BACKGROUND_COMPACTION_FAILED_BORROWED = 83 [(CounterOpts) = {Name: "TxCompactTableFailedBorrowed"}];
COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START = 84 [(CounterOpts) = {Name: "TxCompactTableFailedStart"}];
+ COUNTER_FULL_COMPACTION_DONE = 85 [(CounterOpts) = {Name: "FullCompactionCount"}];
}
enum EPercentileCounters {
@@ -308,41 +309,6 @@ enum EPercentileCounters {
Ranges: { Value: 15000 Name: "15000"},
Ranges: { Value: 30000 Name: "30000"}
}];
-
- COUNTER_SHARDS_WITH_SEARCH_HEIGHT = 18 [(CounterOpts) = {Name: "ShardsWithSearchHeight",
- Integral: true,
- Ranges: { Value: 0 Name: "0"},
- Ranges: { Value: 1 Name: "1"},
- Ranges: { Value: 2 Name: "2"},
- Ranges: { Value: 3 Name: "3"},
- Ranges: { Value: 4 Name: "4"},
- Ranges: { Value: 5 Name: "5"},
- Ranges: { Value: 6 Name: "6"},
- Ranges: { Value: 7 Name: "7"},
- Ranges: { Value: 8 Name: "8"},
- Ranges: { Value: 9 Name: "9"},
- Ranges: { Value: 10 Name: "10"},
- Ranges: { Value: 11 Name: "11"},
- Ranges: { Value: 12 Name: "12"},
- Ranges: { Value: 13 Name: "13"},
- Ranges: { Value: 14 Name: "14"},
- Ranges: { Value: 15 Name: "15"},
- Ranges: { Value: 20 Name: "20"},
- }];
-
- // buckets are hours since last full compaction ts, i.e. number of shards
- // compacted within 1 hour or within 24 hours
- COUNTER_SHARDS_WITH_FULL_COMPACTION = 19 [(CounterOpts) = {Name: "ShardsFullCompactionAgeHours",
- Integral: true,
- Ranges: { Value: 0 Name: "0"},
- Ranges: { Value: 1 Name: "1"},
- Ranges: { Value: 6 Name: "6"},
- Ranges: { Value: 12 Name: "12"},
- Ranges: { Value: 24 Name: "24"},
- Ranges: { Value: 48 Name: "48"},
- Ranges: { Value: 72 Name: "72"},
- Ranges: { Value: 168 Name: "inf"},
- }];
}
enum ETxTypes {
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index d5a6cbc4273..d36819d1014 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -240,6 +240,9 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCreateReplication = 70 [(CounterOpts) = {Name: "FinishedOps/CreateReplication"}];
COUNTER_FINISHED_OPS_TxAlterReplication = 71 [(CounterOpts) = {Name: "FinishedOps/AlterReplication"}];
COUNTER_FINISHED_OPS_TxDropReplication = 72 [(CounterOpts) = {Name: "FinishedOps/DropReplication"}];
+
+ COUNTER_BACKGROUND_COMPACTION_BORROWED = 73 [(CounterOpts) = {Name: "BackgroundCompactionBorrowed"}];
+ COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED = 74 [(CounterOpts) = {Name: "BackgroundCompactionNotNeeded"}];
}
enum EPercentileCounters {
@@ -264,7 +267,9 @@ enum EPercentileCounters {
Ranges { Value: 32000000 Name: "(18) 32 < s" }
};
- COUNTER_BACKGROUND_COMPACTION_OK_LATENCY = 0 [(CounterOpts) = {Name: "BackgroundCompactionOkLatency"}];
+ COUNTER_BACKGROUND_COMPACTION_OK_LATENCY = 0 [(CounterOpts) = {
+ Name: "BackgroundCompactionOkLatency"
+ }];
COUNTER_NUM_SHARDS_BY_TTL_LAG = 1 [(CounterOpts) = {
Name: "NumShardsByTtlLag"
@@ -279,6 +284,57 @@ enum EPercentileCounters {
Ranges { Value: 57600 Name: "57600" }
Ranges { Value: 86400 Name: "inf" }
}];
+
+ COUNTER_SHARDS_WITH_SEARCH_HEIGHT = 2 [(CounterOpts) = {
+ Name: "ShardsWithSearchHeight",
+ Integral: true,
+ Ranges: { Value: 0 Name: "0" },
+ Ranges: { Value: 1 Name: "1" },
+ Ranges: { Value: 2 Name: "2" },
+ Ranges: { Value: 3 Name: "3" },
+ Ranges: { Value: 4 Name: "4" },
+ Ranges: { Value: 5 Name: "5" },
+ Ranges: { Value: 6 Name: "6" },
+ Ranges: { Value: 7 Name: "7" },
+ Ranges: { Value: 8 Name: "8" },
+ Ranges: { Value: 9 Name: "9" },
+ Ranges: { Value: 10 Name: "10" },
+ Ranges: { Value: 11 Name: "11" },
+ Ranges: { Value: 12 Name: "12" },
+ Ranges: { Value: 13 Name: "13" },
+ Ranges: { Value: 14 Name: "14" },
+ Ranges: { Value: 15 Name: "15" },
+ Ranges: { Value: 20 Name: "20" },
+ }];
+
+ // buckets are hours since last full compaction ts, i.e. number of shards
+ // compacted within 1 hour or within 24 hours
+ COUNTER_SHARDS_WITH_FULL_COMPACTION = 3 [(CounterOpts) = {
+ Name: "ShardsFullCompactionAgeHours",
+ Integral: true,
+ Ranges: { Value: 0 Name: "0" },
+ Ranges: { Value: 1 Name: "1" },
+ Ranges: { Value: 6 Name: "6" },
+ Ranges: { Value: 12 Name: "12" },
+ Ranges: { Value: 24 Name: "24" },
+ Ranges: { Value: 48 Name: "48" },
+ Ranges: { Value: 72 Name: "72" },
+ Ranges: { Value: 168 Name: "inf" },
+ }];
+
+ COUNTER_SHARDS_WITH_ROW_DELETES = 4 [(CounterOpts) = {
+ Name: "ShardsWithRowDeletes",
+ Integral: true,
+ Ranges: { Value: 0 Name: "0" },
+ Ranges: { Value: 100 Name: "100" },
+ Ranges: { Value: 1000 Name: "1000" },
+ Ranges: { Value: 10000 Name: "10000" },
+ Ranges: { Value: 100000 Name: "100000" },
+ Ranges: { Value: 1000000 Name: "1000000" },
+ Ranges: { Value: 10000000 Name: "10000000" },
+ Ranges: { Value: 100000000 Name: "100000000" },
+ Ranges: { Value: 1000000000 Name: "inf" },
+ }];
}
enum ETxTypes {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 341b81f20b6..9eecd56131e 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1408,6 +1408,7 @@ message TEvCompactTableResult {
OK = 0;
NOT_NEEDED = 1;
FAILED = 2;
+ BORROWED = 3;
};
optional uint64 TabletId = 1;
@@ -1688,7 +1689,7 @@ message TEvReplicationSourceOffsetsAck {
optional uint64 ReadId = 1;
// The client is acknowledging every SeqNo <= AckSeqNo has been received and processed
- // This will remove their corresponding
+ // This will remove their corresponding
optional uint64 AckSeqNo = 2;
// The client may optionally change the current window size
diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp
index 7ab0bfeca57..8b12088ccd6 100644
--- a/ydb/core/tx/datashard/datashard__compaction.cpp
+++ b/ydb/core/tx/datashard/datashard__compaction.cpp
@@ -93,7 +93,7 @@ public:
auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>(
Self->TabletID(),
pathId,
- NKikimrTxDataShard::TEvCompactTableResult::FAILED);
+ NKikimrTxDataShard::TEvCompactTableResult::BORROWED);
ctx.Send(Ev->Sender, std::move(response));
return true;
}
@@ -177,7 +177,7 @@ public:
<< ", last full compaction# " << Ts);
}
};
-
+
void TDataShard::Handle(TEvDataShard::TEvCompactTable::TPtr& ev, const TActorContext& ctx) {
Executor()->Execute(new TTxCompactTable(this, ev), ctx);
}
@@ -190,6 +190,7 @@ void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) {
if (ti.second->LocalTid != tableId && ti.second->ShadowTid != tableId)
continue;
if (ti.second->Stats.LastFullCompaction < finishedInfo.FullCompactionTs) {
+ IncCounter(COUNTER_FULL_COMPACTION_DONE);
ti.second->Stats.LastFullCompaction = finishedInfo.FullCompactionTs;
Executor()->Execute(
new TTxPersistFullCompactionTs(
diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp
index 129a14dca5f..3ee32261af4 100644
--- a/ydb/core/tx/datashard/datashard__stats.cpp
+++ b/ydb/core/tx/datashard/datashard__stats.cpp
@@ -196,7 +196,7 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo
dataSize += tableInfo.Stats.DataStats.DataSize;
- UpdateSearchHeightStats(tableInfo.Stats, ev->Get()->SearchHeight);
+ tableInfo.Stats.SearchHeight = ev->Get()->SearchHeight;
tableInfo.StatsUpdateInProgress = false;
@@ -258,12 +258,10 @@ public:
searchHeight = 0;
}
- Self->UpdateFullCompactionTsMetric(ti.second->Stats);
-
if (!ti.second->StatsNeedUpdate) {
ti.second->Stats.MemRowCount = memRowCount;
ti.second->Stats.MemDataSize = memDataSize;
- Self->UpdateSearchHeightStats(ti.second->Stats, searchHeight);
+ ti.second->Stats.SearchHeight = searchHeight;
continue;
}
@@ -361,38 +359,6 @@ void TDataShard::UpdateTableStats(const TActorContext &ctx) {
Executor()->Execute(new TTxInitiateStatsUpdate(this), ctx);
}
-void TDataShard::UpdateSearchHeightStats(TUserTable::TStats& stats, ui64 newSearchHeight) {
- if (TabletCounters) {
- if (stats.LastSearchHeightMetricSet)
- TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].DecrementFor(stats.SearchHeight);
-
- TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].IncrementFor(newSearchHeight);
- stats.LastSearchHeightMetricSet = true;
- }
- stats.SearchHeight = newSearchHeight;
-}
-
-void TDataShard::UpdateFullCompactionTsMetric(TUserTable::TStats& stats) {
- if (!TabletCounters)
- return;
-
- auto now = AppData()->TimeProvider->Now();
- if (now < stats.LastFullCompaction) {
- // extra sanity check
- return;
- }
-
- auto newHours = (now - stats.LastFullCompaction).Hours();
- if (stats.HoursSinceFullCompaction && newHours == *stats.HoursSinceFullCompaction)
- return;
-
- if (stats.HoursSinceFullCompaction)
- TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].DecrementFor(*stats.HoursSinceFullCompaction);
-
- TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].IncrementFor(newHours);
- stats.HoursSinceFullCompaction = newHours;
-}
-
void TDataShard::CollectCpuUsage(const TActorContext &ctx) {
auto* metrics = Executor()->GetResourceMetrics();
TInstant now = AppData(ctx)->TimeProvider->Now();
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index c144311a4e7..0dc7861e125 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1300,8 +1300,6 @@ public:
NMiniKQL::IKeyAccessSampler::TPtr GetKeyAccessSampler();
void EnableKeyAccessSampling(const TActorContext &ctx, TInstant until);
void UpdateTableStats(const TActorContext& ctx);
- void UpdateSearchHeightStats(TUserTable::TStats& stats, ui64 newSearchHeight);
- void UpdateFullCompactionTsMetric(TUserTable::TStats& stats);
void CollectCpuUsage(const TActorContext& ctx);
void ScanComplete(NTable::EAbort status, TAutoPtr<IDestructable> prod, ui64 cookie, const TActorContext &ctx) override;
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 962ef2ead3e..4cc7845d8b4 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -320,10 +320,6 @@ struct TUserTable : public TThrRefBase {
ui64 BackgroundCompactionRequests = 0;
NTable::TKeyAccessSample AccessStats;
- bool LastSearchHeightMetricSet = false;
-
- std::optional<ui32> HoursSinceFullCompaction;
-
void Update(NTable::TStats&& dataStats, ui64 indexSize, THashSet<ui64>&& partOwners, ui64 partCount, TInstant statsUpdateTime) {
DataStats = dataStats;
IndexSize = indexSize;
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index cb745fcb517..148daffa3bf 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -57,6 +57,8 @@ public:
TActorBase::PassAway();
}
+ TInstant GetWakeupTime() const { return When; }
+
private:
// ITimer, note that it is made private,
// since it should be called only from TBase
diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
index 44bb3934151..b97a0cff1eb 100644
--- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
@@ -5,7 +5,6 @@ namespace NSchemeShard {
NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardIdx& shardId) {
UpdateBackgroundCompactionQueueMetrics();
- TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_TIMEOUT].Increment(CompactionQueue->ResetTimeoutCount());
auto ctx = TActivationContext::ActorContextFor(SelfId());
@@ -23,7 +22,10 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCompaction "
"for pathId# " << pathId << ", datashard# " << datashardId
+ << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
+ << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
<< ", running# " << CompactionQueue->RunningSize() << " shards"
<< " at schemeshard " << TabletID());
@@ -35,6 +37,34 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha
return NOperationQueue::EStartStatus::EOperationRunning;
}
+void TSchemeShard::OnBackgroundCompactionTimeout(const TShardIdx& shardId) {
+ UpdateBackgroundCompactionQueueMetrics();
+
+ TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_TIMEOUT].Increment(1);
+
+ auto ctx = TActivationContext::ActorContextFor(SelfId());
+
+ auto it = ShardInfos.find(shardId);
+ if (it == ShardInfos.end()) {
+ LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info "
+ "for timeout background compaction# " << shardId
+ << " at schemeshard# " << TabletID());
+ return;
+ }
+
+ const auto& datashardId = it->second.TabletID;
+ const auto& pathId = it->second.PathId;
+
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background compaction timeout "
+ "for pathId# " << pathId << ", datashard# " << datashardId
+ << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", rate# " << CompactionQueue->GetRate()
+ << ", in queue# " << CompactionQueue->Size() << " shards"
+ << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
+ << ", running# " << CompactionQueue->RunningSize() << " shards"
+ << " at schemeshard " << TabletID());
+}
+
void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const TActorContext &ctx) {
const auto& record = ev->Get()->Record;
@@ -55,13 +85,21 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
if (shardIdx == InvalidShardIdx) {
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction of unknown shard "
"for pathId# " << pathId << ", datashard# " << tabletId
+ << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus()
+ << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
+ << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
<< ", running# " << CompactionQueue->RunningSize() << " shards"
<< " at schemeshard " << TabletID());
} else {
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction "
"for pathId# " << pathId << ", datashard# " << tabletId
+ << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus()
+ << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
+ << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
<< ", running# " << CompactionQueue->RunningSize() << " shards"
<< " at schemeshard " << TabletID());
}
@@ -70,19 +108,54 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
switch (record.GetStatus()) {
case NKikimrTxDataShard::TEvCompactTableResult::OK:
- case NKikimrTxDataShard::TEvCompactTableResult::NOT_NEEDED:
TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_OK].Increment(1);
if (duration)
histCounters[COUNTER_BACKGROUND_COMPACTION_OK_LATENCY].IncrementFor(duration.MilliSeconds());
break;
+ case NKikimrTxDataShard::TEvCompactTableResult::NOT_NEEDED:
+ TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED].Increment(1);
+ break;
case NKikimrTxDataShard::TEvCompactTableResult::FAILED:
TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_FAILED].Increment(1);
break;
+ case NKikimrTxDataShard::TEvCompactTableResult::BORROWED:
+ TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_BORROWED].Increment(1);
+ break;
}
UpdateBackgroundCompactionQueueMetrics();
}
+void TSchemeShard::EnqueueCompaction(TShardCompactionInfo&& info) {
+ if (!CompactionQueue)
+ return;
+
+ CompactionQueue->Enqueue(std::move(info));
+ UpdateBackgroundCompactionQueueMetrics();
+}
+
+void TSchemeShard::UpdateCompaction(TShardCompactionInfo&& info) {
+ if (!CompactionQueue)
+ return;
+
+ if (!CompactionQueue->Update(info))
+ CompactionQueue->Enqueue(std::move(info));
+ UpdateBackgroundCompactionQueueMetrics();
+}
+
+void TSchemeShard::RemoveCompaction(const TShardIdx& shardIdx) {
+ if (!CompactionQueue)
+ return;
+
+ CompactionQueue->Remove(TShardCompactionInfo(shardIdx));
+ UpdateBackgroundCompactionQueueMetrics();
+}
+
+void TSchemeShard::ShardRemoved(const TShardIdx& shardIdx) {
+ RemoveCompaction(shardIdx);
+ RemoveShardMetrics(shardIdx);
+}
+
void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() {
TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_SIZE].Set(CompactionQueue->Size());
TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_RUNNING].Set(CompactionQueue->RunningSize());
@@ -94,5 +167,51 @@ void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() {
TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_SIZE_DELETES].Set(queue.SizeByRowDeletes());
}
+void TSchemeShard::UpdateShardMetrics(
+ const TShardIdx& shardIdx,
+ const TTableInfo::TPartitionStats& newStats)
+{
+ THashMap<TShardIdx, TPartitionMetrics>::insert_ctx insertCtx;
+ auto it = PartitionMetricsMap.find(shardIdx, insertCtx);
+ if (it != PartitionMetricsMap.end()) {
+ const auto& metrics = it->second;
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].DecrementFor(metrics.SearchHeight);
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].DecrementFor(metrics.HoursSinceFullCompaction);
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].DecrementFor(metrics.RowDeletes);
+ } else {
+ it = PartitionMetricsMap.insert_direct(std::make_pair(shardIdx, TPartitionMetrics()), insertCtx);
+ }
+
+ auto& metrics = it->second;
+
+ metrics.SearchHeight = newStats.SearchHeight;
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].IncrementFor(metrics.SearchHeight);
+
+ metrics.RowDeletes = newStats.RowDeletes;
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].IncrementFor(metrics.RowDeletes);
+
+ auto now = AppData()->TimeProvider->Now();
+ auto compactionTime = TInstant::Seconds(newStats.FullCompactionTs);
+ if (now >= compactionTime)
+ metrics.HoursSinceFullCompaction = (now - compactionTime).Hours();
+ else
+ metrics.HoursSinceFullCompaction = 0;
+
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].IncrementFor(metrics.HoursSinceFullCompaction);
+}
+
+void TSchemeShard::RemoveShardMetrics(const TShardIdx& shardIdx) {
+ auto it = PartitionMetricsMap.find(shardIdx);
+ if (it == PartitionMetricsMap.end())
+ return;
+
+ const auto& metrics = it->second;
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].DecrementFor(metrics.SearchHeight);
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].DecrementFor(metrics.HoursSinceFullCompaction);
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].DecrementFor(metrics.RowDeletes);
+
+ PartitionMetricsMap.erase(it);
+}
+
} // NSchemeShard
} // NKikimr
diff --git a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
index 855553cddc8..060001c0741 100644
--- a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
@@ -154,6 +154,8 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase {
NIceDb::TNiceDb db(txc.DB);
Self->PersistUnknownShardDeleted(db, ShardIdx);
}
+
+ Self->ShardRemoved(ShardIdx);
}
void DoComplete(const TActorContext &ctx) override {
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 762bdbb55d2..1d07f1a93d8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2170,6 +2170,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
tableInfo->UpdateShardStats(shardIdx, stats);
+ // note that we don't update shard metrics here, because we will always update
+ // the shard metrics in TSchemeShard::SetPartitioning
+
if (!rowSet.Next()) {
return false;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 24423ce1a6f..1395e37aa37 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -394,6 +394,7 @@ void NTableState::UpdatePartitioningForCopyTable(TOperationId operationId, TTxSt
context.SS->ShardInfos.erase(shard.Idx);
domainInfo->RemoveInternalShard(shard.Idx);
context.SS->DecrementPathDbRefCount(pathId, "remove shard from txState");
+ context.SS->ShardRemoved(shard.Idx);
}
}
txState.Shards.clear();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp
index d7efc1dbf5f..126d02d850c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp
@@ -126,6 +126,7 @@ void TMemoryChanges::UnDo(TSchemeShard* ss) {
ss->ShardInfos[id] = *elem;
} else {
ss->ShardInfos.erase(id);
+ ss->ShardRemoved(id);
}
Shards.pop();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index 00e6d8fad04..dc85ff4e050 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -191,11 +191,9 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
auto oldAggrStats = table->GetStats().Aggregated;
table->UpdateShardStats(shardIdx, newStats);
- if (Self->CompactionQueue) {
- TShardCompactionInfo compactionInfo(shardIdx, newStats);
- if (!Self->CompactionQueue->Update(compactionInfo))
- Self->CompactionQueue->Enqueue(std::move(compactionInfo));
- Self->UpdateBackgroundCompactionQueueMetrics();
+ if (!table->IsBackup) {
+ Self->UpdateCompaction(TShardCompactionInfo(shardIdx, newStats));
+ Self->UpdateShardMetrics(shardIdx, newStats);
}
NIceDb::TNiceDb db(txc.DB);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index d88e53c3769..92ed532ab28 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -306,6 +306,11 @@ void TSchemeShard::Clear() {
TabletCounters->Simple()[idx].Set(0);
}
TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].Clear();
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_SEARCH_HEIGHT].Clear();
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_FULL_COMPACTION].Clear();
+ TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].Clear();
+
+ PartitionMetricsMap.clear();
}
void TSchemeShard::IncrementPathDbRefCount(const TPathId& pathId, const TStringBuf& debug) {
@@ -2074,8 +2079,7 @@ void TSchemeShard::DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId pa
db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete();
db.Table<Schema::TablePartitionStats>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete();
- CompactionQueue->Remove(TShardCompactionInfo(partitions[pi].ShardIdx));
- UpdateBackgroundCompactionQueueMetrics();
+ ShardRemoved(partitions[pi].ShardIdx);
}
}
@@ -3250,10 +3254,10 @@ void TSchemeShard::PersistRemoveTable(NIceDb::TNiceDb& db, TPathId pathId, const
}
}
+ // sanity check: by this time compaction queue and metrics must be updated already
for (const auto& p: tableInfo->GetPartitions()) {
- CompactionQueue->Remove(TShardCompactionInfo(p.ShardIdx));
+ ShardRemoved(p.ShardIdx);
}
- UpdateBackgroundCompactionQueueMetrics();
Tables.erase(pathId);
DecrementPathDbRefCount(pathId, "remove table");
@@ -5798,8 +5802,8 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T
const auto& partitionStats = tableInfo->GetStats().PartitionStats;
auto it = partitionStats.find(p.ShardIdx);
if (it != partitionStats.end()) {
- CompactionQueue->Enqueue(TShardCompactionInfo(p.ShardIdx, it->second));
- UpdateBackgroundCompactionQueueMetrics();
+ EnqueueCompaction(TShardCompactionInfo(p.ShardIdx, it->second));
+ UpdateShardMetrics(p.ShardIdx, it->second);
}
}
}
@@ -6008,13 +6012,6 @@ void TSchemeShard::ConfigureCompactionQueue(
compactionConfig.MaxRate = config.GetMaxRate();
compactionConfig.MinOperationRepeatDelay = TDuration::Seconds(config.GetMinCompactionRepeatDelaySeconds());
- LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "CompactionQueue configured: Timeout# " << compactionConfig.Timeout
- << ", WakeupInterval# " << compactionConfig.WakeupInterval
- << ", RoundInterval# " << compactionConfig.RoundInterval
- << ", InflightLimit# " << compactionConfig.InflightLimit
- << ", MaxRate# " << compactionConfig.MaxRate);
-
if (CompactionQueue) {
CompactionQueue->UpdateConfig(compactionConfig, queueConfig);
} else {
@@ -6023,6 +6020,15 @@ void TSchemeShard::ConfigureCompactionQueue(
queueConfig,
CompactionStarter);
}
+
+ LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "CompactionQueue configured: Timeout# " << compactionConfig.Timeout
+ << ", Rate# " << CompactionQueue->GetRate()
+ << ", WakeupInterval# " << compactionConfig.WakeupInterval
+ << ", RoundInterval# " << compactionConfig.RoundInterval
+ << ", InflightLimit# " << compactionConfig.InflightLimit
+ << ", MinCompactionRepeatDelaySeconds# " << compactionConfig.MinOperationRepeatDelay
+ << ", MaxRate# " << compactionConfig.MaxRate);
}
void TSchemeShard::StartStopCompactionQueue() {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index a2954065d07..53b96241201 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -86,10 +86,14 @@ private:
: Self(self)
{ }
- NOperationQueue::EStartStatus StartOperation(const TShardCompactionInfo& info) {
+ NOperationQueue::EStartStatus StartOperation(const TShardCompactionInfo& info) override {
return Self->StartBackgroundCompaction(info.ShardIdx);
}
+ void OnTimeout(const TShardCompactionInfo& info) override {
+ Self->OnBackgroundCompactionTimeout(info.ShardIdx);
+ }
+
private:
TSchemeShard* Self;
};
@@ -182,6 +186,14 @@ public:
THashMap<TTabletId, TShardIdx> TabletIdToShardIdx;
THashMap<TShardIdx, TVector<TActorId>> ShardDeletionSubscribers; // for tests
+ // in case of integral hists we need to remember what values we have set
+ struct TPartitionMetrics {
+ ui64 SearchHeight = 0;
+ ui64 RowDeletes = 0;
+ ui32 HoursSinceFullCompaction = 0;
+ };
+ THashMap<TShardIdx, TPartitionMetrics> PartitionMetricsMap;
+
TActorId SchemeBoardPopulator;
static constexpr ui32 InitiateCachedTxIdsCount = 100;
@@ -623,7 +635,17 @@ public:
void ScheduleCleanDroppedPaths();
void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx);
+ void EnqueueCompaction(TShardCompactionInfo&& info);
+ void UpdateCompaction(TShardCompactionInfo&& info);
+ void RemoveCompaction(const TShardIdx& shardIdx);
+
+ void UpdateShardMetrics(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats);
+ void RemoveShardMetrics(const TShardIdx& shardIdx);
+
+ void ShardRemoved(const TShardIdx& shardIdx);
+
NOperationQueue::EStartStatus StartBackgroundCompaction(const TShardIdx& shardId);
+ void OnBackgroundCompactionTimeout(const TShardIdx& shardId);
void UpdateBackgroundCompactionQueueMetrics();
struct TTxCleanDroppedSubDomains;
diff --git a/ydb/core/util/circular_queue.h b/ydb/core/util/circular_queue.h
index ffffb6db57e..bc4195c982d 100644
--- a/ydb/core/util/circular_queue.h
+++ b/ydb/core/util/circular_queue.h
@@ -180,7 +180,7 @@ public:
Queue.PopFrontToBack();
}
- bool Empty() const {
+ bool Empty() const {
return Queue.Empty();
}
diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h
index e4075e05158..19c8750b490 100644
--- a/ydb/core/util/operation_queue.h
+++ b/ydb/core/util/operation_queue.h
@@ -37,6 +37,10 @@ public:
virtual ~IStarter() = default;
virtual EStartStatus StartOperation(const T& item) = 0;
+
+ // in many cases just for metrics/logging, because
+ // queue is able to restart operation itself
+ virtual void OnTimeout(const T& item) = 0;
};
struct TConfig {
@@ -262,8 +266,6 @@ private:
bool Running = false;
bool WasRunning = false;
- ui64 TimeoutCount = 0;
-
// operations / s
double Rate = 0;
@@ -389,8 +391,6 @@ public:
double GetRate() const { return Rate; }
- ui64 ResetTimeoutCount() { return TimeoutCount; TimeoutCount = 0; }
-
const TQueue& GetReadyQueue() const { return ReadyQueue; }
// copies items, should be used in tests only
@@ -551,7 +551,7 @@ void TOperationQueue<T, TQueue>::CheckTimeoutOperations() {
while (!RunningItems.Empty()) {
const auto& item = RunningItems.Front();
if (item.Timestamp + Config.Timeout <= now) {
- ++TimeoutCount;
+ Starter.OnTimeout(item.Item);
if (Config.IsCircular)
ReEnqueueNoStart(std::move(item.Item));
RunningItems.PopFront();
diff --git a/ydb/core/util/operation_queue_priority_ut.cpp b/ydb/core/util/operation_queue_priority_ut.cpp
index 6912e0c8c81..32bd274fe66 100644
--- a/ydb/core/util/operation_queue_priority_ut.cpp
+++ b/ydb/core/util/operation_queue_priority_ut.cpp
@@ -103,6 +103,9 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue
WakeupHistory.push_back(t);
}
+ void OnTimeout(const TPriorityItem&) override
+ {}
+
TInstant Now() override
{
return TimeProvider.Now();
diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp
index 3c88f45a6b8..1015cb217e3 100644
--- a/ydb/core/util/operation_queue_ut.cpp
+++ b/ydb/core/util/operation_queue_ut.cpp
@@ -53,6 +53,9 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim
WakeupHistory.push_back(t);
}
+ void OnTimeout(const int&) override
+ {}
+
TInstant Now() override
{
return TimeProvider.Now();