aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <i@eivanov.com>2022-06-15 13:44:00 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-06-15 13:44:00 +0300
commita26f504e5ded277ef98fe9b1232514eb7af628cf (patch)
treee7a534e9cef8e8f9e91dd47df2051e9233811f30
parent4b9c305c0b7baac37eb298e810e86d999f057e45 (diff)
downloadydb-a26f504e5ded277ef98fe9b1232514eb7af628cf.tar.gz
PR from branch users/eivanov89/KIKIMR-14960-batch-schemeshard-stats_22-2
KIKIMR-14960: fix enum value of SchemeShardConfigItem KIKIMR-14960: schemeshard should batch stats REVIEW: 2606929 REVIEW: 2627628 x-ydb-stable-ref: 85844b8d405c3251c100a7b45471ad6a62c2b853
-rw-r--r--ydb/core/base/appdata.h1
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/protos/config.proto13
-rw-r--r--ydb/core/protos/console_config.proto1
-rw-r--r--ydb/core/protos/counters_schemeshard.proto25
-rw-r--r--ydb/core/testlib/actors/test_runtime.cpp1
-rw-r--r--ydb/core/testlib/basics/appdata.cpp1
-rw-r--r--ydb/core/testlib/basics/appdata.h1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp202
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp26
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h53
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_private.h4
-rw-r--r--ydb/core/tx/schemeshard/ut_export.cpp6
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.cpp5
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.h1
-rw-r--r--ydb/core/tx/schemeshard/ut_split_merge.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_stats.cpp300
-rw-r--r--ydb/core/tx/schemeshard/ut_stats/ya.make37
-rw-r--r--ydb/core/tx/schemeshard/ut_subdomain.cpp11
-rw-r--r--ydb/core/tx/schemeshard/ut_ttl.cpp5
-rw-r--r--ydb/core/tx/schemeshard/ya.make1
21 files changed, 655 insertions, 44 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h
index fa0d5330ae..ad781ba56c 100644
--- a/ydb/core/base/appdata.h
+++ b/ydb/core/base/appdata.h
@@ -157,6 +157,7 @@ struct TAppData {
TFeatureFlags FeatureFlags;
NKikimrConfig::THiveConfig HiveConfig;
NKikimrConfig::TDataShardConfig DataShardConfig;
+ NKikimrConfig::TSchemeShardConfig SchemeShardConfig;
NKikimrConfig::TMeteringConfig MeteringConfig;
NKikimrConfig::TCompactionConfig CompactionConfig;
NKikimrConfig::TDomainsConfig DomainsConfig;
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index b43275e738..61820d1347 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -910,6 +910,10 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
AppData->DataShardConfig = runConfig.AppConfig.GetDataShardConfig();
}
+ if (runConfig.AppConfig.HasSchemeShardConfig()) {
+ AppData->SchemeShardConfig = runConfig.AppConfig.GetSchemeShardConfig();
+ }
+
if (runConfig.AppConfig.HasMeteringConfig()) {
AppData->MeteringConfig = runConfig.AppConfig.GetMeteringConfig();
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 0f0875a8b1..db5d5572e5 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1402,6 +1402,18 @@ message TDataShardConfig {
optional uint64 IdleMemCompactionIntervalSeconds = 15 [default = 60];
}
+message TSchemeShardConfig {
+ // after this amount of time we forcely write full stats to local DB
+ // to disable set to 0
+ optional uint32 StatsBatchTimeoutMs = 1 [default = 100];
+
+ // number of shards stats to batch together
+ // to disable set to 0
+ optional uint32 StatsMaxBatchSize = 2 [default = 100];
+
+ optional uint32 StatsMaxExecuteMs = 3 [default = 10];
+}
+
message TCompactionConfig {
message TBackgroundCompactionConfig {
optional double MaxRate = 1 [default = 1]; // 1 compaction / s
@@ -1536,6 +1548,7 @@ message TAppConfig {
optional NYq.NConfig.TConfig YandexQueryConfig = 50;
optional TCompactionConfig CompactionConfig = 52;
optional THttpProxyConfig HttpProxyConfig = 53;
+ optional TSchemeShardConfig SchemeShardConfig = 54;
repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto
index 5ee706f46e..b17d871bd5 100644
--- a/ydb/core/protos/console_config.proto
+++ b/ydb/core/protos/console_config.proto
@@ -113,6 +113,7 @@ message TConfigItem {
PDiskKeyConfigItem = 51;
CompactionConfigItem = 52;
HttpProxyConfigItem = 53;
+ SchemeShardConfigItem = 54;
NamedConfigsItem = 100;
ClusterYamlConfigItem = 101;
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index 570566e293..ae67c9455c 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -158,6 +158,14 @@ enum ESimpleCounters {
COUNTER_BORROWED_COMPACTION_QUEUE_SIZE = 129 [(CounterOpts) = {Name: "BorrowedCompactionQueueSize"}];
COUNTER_BORROWED_COMPACTION_QUEUE_RUNNING = 130 [(CounterOpts) = {Name: "BorrowedCompactionQueueRunning"}];
+
+ COUNTER_BLOB_DEPOT_COUNT = 131 [(CounterOpts) = {Name: "BlobDepots"}];
+ COUNTER_BLOB_DEPOT_SHARD_COUNT = 132 [(CounterOpts) = {Name: "BlobDepotShards"}];
+ COUNTER_IN_FLIGHT_OPS_TxCreateBlobDepot = 133 [(CounterOpts) = {Name: "InFlightOps/CreateBlobDepot"}];
+ COUNTER_IN_FLIGHT_OPS_TxAlterBlobDepot = 134 [(CounterOpts) = {Name: "InFlightOps/AlterBlobDepot"}];
+ COUNTER_IN_FLIGHT_OPS_TxDropBlobDepot = 135 [(CounterOpts) = {Name: "InFlightOps/DropBlobDepot"}];
+
+ COUNTER_STATS_QUEUE_SIZE = 136 [(CounterOpts) = {Name: "StatsQueueSize"}];
}
enum ECumulativeCounters {
@@ -252,6 +260,12 @@ enum ECumulativeCounters {
COUNTER_BORROWED_COMPACTION_OK = 76 [(CounterOpts) = {Name: "BorrowedCompactionOK"}];
COUNTER_BORROWED_COMPACTION_TIMEOUT = 77 [(CounterOpts) = {Name: "BorrowedCompactionTimeout"}];
+
+ COUNTER_FINISHED_OPS_TxCreateBlobDepot = 78 [(CounterOpts) = {Name: "FinishedOps/CreateBlobDepot"}];
+ COUNTER_FINISHED_OPS_TxAlterBlobDepot = 79 [(CounterOpts) = {Name: "FinishedOps/AlterBlobDepot"}];
+ COUNTER_FINISHED_OPS_TxDropBlobDepot = 80 [(CounterOpts) = {Name: "FinishedOps/DropBlobDepot"}];
+
+ COUNTER_STATS_WRITTEN = 81 [(CounterOpts) = {Name: "StatsWritten"}];
}
enum EPercentileCounters {
@@ -343,6 +357,17 @@ enum EPercentileCounters {
Ranges: { Value: 100000000 Name: "100000000" },
Ranges: { Value: 1000000000 Name: "1000000000" },
}];
+
+ COUNTER_STATS_BATCH_LATENCY = 5 [(CounterOpts) = {
+ Name: "StatsBatchLatency",
+ Ranges: { Value: 1000 Name: "1 ms" }
+ Ranges: { Value: 10000 Name: "10 ms" }
+ Ranges: { Value: 50000 Name: "50 ms" }
+ Ranges: { Value: 100000 Name: "100 ms" }
+ Ranges: { Value: 200000 Name: "200 ms" }
+ Ranges: { Value: 500000 Name: "500 ms" }
+ Ranges: { Value: 1000000 Name: "1000 ms" }
+ }];
}
enum ETxTypes {
diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp
index ff09f2df27..febeb9dbfb 100644
--- a/ydb/core/testlib/actors/test_runtime.cpp
+++ b/ydb/core/testlib/actors/test_runtime.cpp
@@ -136,6 +136,7 @@ namespace NActors {
nodeAppData->FeatureFlags = app0->FeatureFlags;
nodeAppData->CompactionConfig = app0->CompactionConfig;
nodeAppData->HiveConfig = app0->HiveConfig;
+ nodeAppData->SchemeShardConfig = app0->SchemeShardConfig;
nodeAppData->DataShardConfig = app0->DataShardConfig;
nodeAppData->MeteringConfig = app0->MeteringConfig;
nodeAppData->EnableMvccSnapshotWithLegacyDomainRoot = app0->EnableMvccSnapshotWithLegacyDomainRoot;
diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp
index e9e105a138..03a68b7312 100644
--- a/ydb/core/testlib/basics/appdata.cpp
+++ b/ydb/core/testlib/basics/appdata.cpp
@@ -54,6 +54,7 @@ namespace NKikimr {
app->CompactionConfig = CompactionConfig;
app->HiveConfig = HiveConfig;
app->DataShardConfig = DataShardConfig;
+ app->SchemeShardConfig = SchemeShardConfig;
app->MeteringConfig = MeteringConfig;
app->FeatureFlags = FeatureFlags;
diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h
index 0db85b527b..93c7e6a988 100644
--- a/ydb/core/testlib/basics/appdata.h
+++ b/ydb/core/testlib/basics/appdata.h
@@ -88,6 +88,7 @@ namespace NKikimr {
TString NetDataSourceUrl;
NKikimrConfig::THiveConfig HiveConfig;
NKikimrConfig::TDataShardConfig DataShardConfig;
+ NKikimrConfig::TSchemeShardConfig SchemeShardConfig;
NKikimrConfig::TMeteringConfig MeteringConfig;
NKikimrPQ::TPQConfig PQConfig;
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index 66100f7434..bd64a85234 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -1,5 +1,6 @@
#include "schemeshard_impl.h"
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/cputime.h>
#include <ydb/core/protos/sys_view.pb.h>
namespace NKikimr {
@@ -61,18 +62,23 @@ auto TSchemeShard::BuildStatsForCollector(TPathId pathId, TShardIdx shardIdx, TT
}
class TTxStorePartitionStats: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
- TEvDataShard::TEvPeriodicTableStats::TPtr Ev;
+ TSideEffects MergeOpSideEffects;
- THolder<NSysView::TEvSysView::TEvSendPartitionStats> StatsCollectorEv;
- THolder<TEvDataShard::TEvGetTableStats> GetStatsEv;
- THolder<TEvDataShard::TEvCompactBorrowed> CompactEv;
+ struct TMessage {
+ TActorId Actor;
+ THolder<IEventBase> Event;
- TSideEffects MergeOpSideEffects;
+ TMessage(const TActorId& actor, IEventBase* event)
+ : Actor(actor)
+ , Event(event)
+ {}
+ };
+
+ TVector<TMessage> PendingMessages;
public:
- explicit TTxStorePartitionStats(TSelf* self, TEvDataShard::TEvPeriodicTableStats::TPtr& ev)
+ TTxStorePartitionStats(TSelf* self)
: TBase(self)
- , Ev(ev)
{
}
@@ -85,7 +91,9 @@ public:
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
-}; // TTxStorePartitionStats
+ // returns true to continue batching
+ bool PersistSingleStats(TTransactionContext& txc, const TActorContext& ctx);
+};
THolder<TProposeRequest> MergeRequest(
TSchemeShard* ss, TTxId& txId, TPathId& pathId, const TVector<TShardIdx>& shardsToMerge)
@@ -115,26 +123,63 @@ THolder<TProposeRequest> MergeRequest(
}
bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorContext& ctx) {
- const auto& rec = Ev->Get()->Record;
- auto datashardId = TTabletId(rec.GetDatashardId());
- TPathId tableId = InvalidPathId;
- if (rec.HasTableOwnerId()) {
- tableId = TPathId(TOwnerId(rec.GetTableOwnerId()),
- TLocalPathId(rec.GetTableLocalId()));
- } else {
- tableId = Self->MakeLocalId(TLocalPathId(rec.GetTableLocalId()));
+ Self->PersistStatsPending = false;
+
+ if (Self->StatsQueue.empty())
+ return true;
+
+ NCpuTime::TCpuTimer timer;
+
+ const ui32 maxBatchSize = Self->StatsMaxBatchSize ? Self->StatsMaxBatchSize : 1;
+ ui32 batchSize = 0;
+ while (batchSize < maxBatchSize && !Self->StatsQueue.empty()) {
+ ++batchSize;
+ if (!PersistSingleStats(txc, ctx))
+ break;
+
+ if (timer.GetTime() >= Self->StatsMaxExecuteTime)
+ break;
+ }
+
+ Self->TabletCounters->Cumulative()[COUNTER_STATS_WRITTEN].Increment(batchSize);
+
+ bool isBatchingDisabled = Self->StatsMaxBatchSize == 0;
+ if (isBatchingDisabled) {
+ // there will be per stat transaction, don't need to schedule additional one
+ return true;
}
+ if (!Self->StatsQueue.empty()) {
+ Self->ScheduleStatsBatch(ctx);
+ }
+
+ return true;
+}
+
+bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const TActorContext& ctx) {
+ auto item = Self->StatsQueue.front();
+ Self->StatsQueue.pop_front();
+
+ auto timeInQueue = AppData()->MonotonicTimeProvider->Now() - item.Ts;
+ Self->TabletCounters->Percentile()[COUNTER_STATS_BATCH_LATENCY].IncrementFor(timeInQueue.MicroSeconds());
+
+ const auto& rec = item.Ev->Get()->Record;
+ auto datashardId = TTabletId(rec.GetDatashardId());
+ const TPathId& pathId = item.PathId;
+
+ TSchemeShard::TStatsId statsId(pathId, datashardId);
+ Self->StatsMap.erase(statsId);
+
const auto& tableStats = rec.GetTableStats();
const auto& tabletMetrics = rec.GetTabletMetrics();
ui64 dataSize = tableStats.GetDataSize();
ui64 rowCount = tableStats.GetRowCount();
- if (!Self->Tables.contains(tableId)) {
+ if (!Self->Tables.contains(pathId)) {
return true;
}
- TTableInfo::TPtr table = Self->Tables[tableId];
+ TTableInfo::TPtr table = Self->Tables[pathId];
if (!Self->TabletIdToShardIdx.contains(datashardId)) {
return true;
@@ -207,8 +252,8 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
if (!table->IsBackup && !table->IsShardsStatsDetached()) {
auto newAggrStats = table->GetStats().Aggregated;
- auto subDomainId = Self->ResolveDomainId(tableId);
- auto subDomainInfo = Self->ResolveDomainInfo(tableId);
+ auto subDomainId = Self->ResolveDomainId(pathId);
+ auto subDomainInfo = Self->ResolveDomainInfo(pathId);
subDomainInfo->AggrDiskSpaceUsage(Self, newAggrStats, oldAggrStats);
if (subDomainInfo->CheckDiskSpaceQuotas(Self)) {
Self->PersistSubDomainState(db, subDomainId, *subDomainInfo);
@@ -219,7 +264,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
}
}
- Self->PersistTablePartitionStats(db, tableId, shardIdx, table);
+ Self->PersistTablePartitionStats(db, pathId, shardIdx, table);
if (AppData(ctx)->FeatureFlags.GetEnableSystemViews()) {
TMaybe<ui32> nodeId;
@@ -230,7 +275,10 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
if (rec.HasStartTime()) {
startTime = rec.GetStartTime();
}
- StatsCollectorEv = Self->BuildStatsForCollector(tableId, shardIdx, datashardId, nodeId, startTime, newStats);
+
+ PendingMessages.emplace_back(
+ Self->SysPartitionStatsCollector,
+ Self->BuildStatsForCollector(pathId, shardIdx, datashardId, nodeId, startTime, newStats).Release());
}
const auto& shardToPartition = table->GetShard2PartitionIdx();
@@ -282,7 +330,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
dbChanges.Apply(Self, txc, ctx);
MergeOpSideEffects.ApplyOnExecute(Self, txc, ctx);
- return true;
+ return false;
}
if (rec.GetShardState() != NKikimrTxDataShard::Ready) {
@@ -309,8 +357,8 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
{
constexpr ui64 deltaShards = 2;
- TPathElement::TPtr path = Self->PathsById.at(tableId);
- TSubDomainInfo::TPtr domainInfo = Self->ResolveDomainInfo(tableId);
+ TPathElement::TPtr path = Self->PathsById.at(pathId);
+ TSubDomainInfo::TPtr domainInfo = Self->ResolveDomainInfo(pathId);
if (domainInfo->GetShardsInside() + deltaShards > domainInfo->GetSchemeLimits().MaxShards) {
LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
@@ -343,7 +391,15 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
}
// Request histograms from the datashard
- GetStatsEv.Reset(new TEvDataShard::TEvGetTableStats(tableId.LocalPathId, dataSizeResolution, rowCountResolution, collectKeySample));
+ LOG_DEBUG(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Requesting full stats from datashard %" PRIu64, rec.GetDatashardId());
+ PendingMessages.emplace_back(
+ item.Ev->Sender,
+ new TEvDataShard::TEvGetTableStats(
+ pathId.LocalPathId,
+ dataSizeResolution,
+ rowCountResolution,
+ collectKeySample));
return true;
}
@@ -351,21 +407,12 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
void TTxStorePartitionStats::Complete(const TActorContext& ctx) {
MergeOpSideEffects.ApplyOnComplete(Self, ctx);
- if (StatsCollectorEv) {
- ctx.Send(Self->SysPartitionStatsCollector, StatsCollectorEv.Release());
+ for (auto& m: PendingMessages) {
+ Y_VERIFY(m.Event);
+ ctx.Send(m.Actor, m.Event.Release());
}
- if (CompactEv) {
- LOG_DEBUG(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Requesting borrowed compaction from datasbard %" PRIu64, Ev->Get()->Record.GetDatashardId());
- ctx.Send(Ev->Sender, CompactEv.Release());
- }
-
- if (GetStatsEv) {
- LOG_DEBUG(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Requesting full stats from datashard %" PRIu64, Ev->Get()->Record.GetDatashardId());
- ctx.Send(Ev->Sender, GetStatsEv.Release());
- }
+ Self->TabletCounters->Simple()[COUNTER_STATS_QUEUE_SIZE].Set(Self->StatsQueue.size());
}
void TSchemeShard::Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const TActorContext& ctx) {
@@ -390,7 +437,82 @@ void TSchemeShard::Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const T
<< " rowCount " << rowCount
<< " cpuUsage " << tabletMetrics.GetCPU()/10000.0);
- Execute(new TTxStorePartitionStats(this, ev), ctx);
+ TStatsId statsId(pathId, datashardId);
+ TStatsMap::insert_ctx insertCtx;
+ auto it = StatsMap.find(statsId, insertCtx);
+ if (it == StatsMap.end()) {
+ StatsQueue.emplace_back(ev.Release(), pathId);
+ StatsMap.emplace_direct(insertCtx, statsId, &StatsQueue.back());
+ } else {
+ // already in queue, just update
+ it->second->Ev = ev.Release();
+ }
+
+ TabletCounters->Simple()[COUNTER_STATS_QUEUE_SIZE].Set(StatsQueue.size());
+ ScheduleStatsBatch(ctx);
+}
+
+void TSchemeShard::ScheduleStatsBatch(const TActorContext& ctx) {
+ if (StatsQueue.empty())
+ return;
+
+ bool isBatchingDisabled = StatsMaxBatchSize == 0;
+ if (isBatchingDisabled) {
+ PersistStatsPending = true;
+ Execute(new TTxStorePartitionStats(this), ctx);
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Will execute TTxStorePartitionStats without batch");
+ return;
+ }
+
+ if (PersistStatsPending)
+ return;
+
+ if (StatsQueue.size() >= StatsMaxBatchSize || !StatsBatchTimeout) {
+ // note that we don't care if already scheduled
+ PersistStatsPending = true;
+ Execute(new TTxStorePartitionStats(this), ctx);
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Will execute TTxStorePartitionStats, queue# " << StatsQueue.size());
+ return;
+ }
+
+ const auto& oldestItem = StatsQueue.front();
+ auto age = AppData()->MonotonicTimeProvider->Now() - oldestItem.Ts;
+ if (age >= StatsBatchTimeout) {
+ PersistStatsPending = true;
+ Execute(new TTxStorePartitionStats(this), ctx);
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Will execute TTxStorePartitionStats because of age, queue# " << StatsQueue.size());
+ return;
+ }
+
+ if (StatsBatchScheduled)
+ return;
+
+ auto delay = StatsBatchTimeout - age;
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Will delay TTxStorePartitionStats on# " << delay << ", queue# " << StatsQueue.size());
+
+ ctx.Schedule(delay, new TEvPrivate::TEvPersistStats());
+ StatsBatchScheduled = true;
+}
+
+void TSchemeShard::Handle(TEvPrivate::TEvPersistStats::TPtr&, const TActorContext& ctx) {
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Started TEvPersistStats at tablet " << TabletID() << ", queue size# " << StatsQueue.size());
+
+ StatsBatchScheduled = false;
+
+ if (PersistStatsPending) {
+ return;
+ }
+
+ if (StatsQueue.empty()) {
+ return;
+ }
+
+ PersistStatsPending = true;
+ Execute(new TTxStorePartitionStats(this), ctx);
}
}}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 83c4b64bfc..485e4a7212 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -2156,6 +2156,10 @@ void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId
}
void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TShardIdx& shardIdx, const TTableInfo::TPtr tableInfo) {
+ if (!AppData()->FeatureFlags.GetEnablePersistentPartitionStats()) {
+ return;
+ }
+
const auto& shardToPartition = tableInfo->GetShard2PartitionIdx();
if (!shardToPartition.contains(shardIdx)) {
return;
@@ -2172,6 +2176,10 @@ void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId
}
void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TTableInfo::TPtr tableInfo) {
+ if (!AppData()->FeatureFlags.GetEnablePersistentPartitionStats()) {
+ return;
+ }
+
const auto& tableStats = tableInfo->GetStats();
for (const auto& [shardIdx, pi] : tableInfo->GetShard2PartitionIdx()) {
@@ -3800,6 +3808,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
EnableBackgroundCompactionServerless = appData->FeatureFlags.GetEnableBackgroundCompactionServerless();
ConfigureCompactionQueues(appData->CompactionConfig, ctx);
+ ConfigureStatsBatching(appData->SchemeShardConfig, ctx);
if (appData->ChannelProfiles) {
ChannelProfiles = appData->ChannelProfiles;
@@ -4048,6 +4057,8 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvPrivate::TEvCleanDroppedSubDomains, Handle);
HFuncTraced(TEvPrivate::TEvSubscribeToShardDeletion, Handle);
+ HFuncTraced(TEvPrivate::TEvPersistStats, Handle);
+
HFuncTraced(TEvSchemeShard::TEvLogin, Handle);
default:
@@ -6002,6 +6013,7 @@ void TSchemeShard::SubscribeConsoleConfigs(const TActorContext &ctx) {
new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem,
(ui32)NKikimrConsole::TConfigItem::CompactionConfigItem,
+ (ui32)NKikimrConsole::TConfigItem::SchemeShardConfigItem,
}));
}
@@ -6015,6 +6027,10 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
ConfigureCompactionQueues(compactionConfig, ctx);
}
+ if (appConfig.HasSchemeShardConfig()) {
+ ConfigureStatsBatching(appConfig.GetSchemeShardConfig(), ctx);
+ }
+
if (IsShemeShardConfigured()) {
StartStopCompactionQueues();
}
@@ -6043,6 +6059,16 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu
EnableBackgroundCompactionServerless = featureFlags.GetEnableBackgroundCompactionServerless();
}
+void TSchemeShard::ConfigureStatsBatching(const NKikimrConfig::TSchemeShardConfig& config, const TActorContext& ctx) {
+ StatsBatchTimeout = TDuration::MilliSeconds(config.GetStatsBatchTimeoutMs());
+ StatsMaxBatchSize = config.GetStatsMaxBatchSize();
+ StatsMaxExecuteTime = TDuration::MicroSeconds(config.GetStatsMaxExecuteMs());
+ LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "StatsBatching config: StatsBatchTimeout# " << StatsBatchTimeout
+ << ", StatsMaxBatchSize# " << StatsMaxBatchSize
+ << ", StatsMaxExecuteTime# " << StatsMaxExecuteTime);
+}
+
void TSchemeShard::ConfigureCompactionQueues(
const NKikimrConfig::TCompactionConfig& compactionConfig,
const TActorContext &ctx)
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index e7fcebbeed..8d3f34013b 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -253,6 +253,53 @@ public:
TActorId SysPartitionStatsCollector;
+ TDuration StatsMaxExecuteTime;
+ TDuration StatsBatchTimeout;
+ ui32 StatsMaxBatchSize = 0;
+
+ // time when we opened the batch
+ TMonotonic StatsBatchStartTs;
+ bool StatsBatchScheduled = false;
+ bool PersistStatsPending = false;
+
+ struct TStatsQueueItem {
+ TEvDataShard::TEvPeriodicTableStats::TPtr Ev;
+ TPathId PathId;
+ TMonotonic Ts;
+
+ TStatsQueueItem(TEvDataShard::TEvPeriodicTableStats::TPtr ev, const TPathId& pathId)
+ : Ev(ev)
+ , PathId(pathId)
+ , Ts(AppData()->MonotonicTimeProvider->Now())
+ {}
+ };
+
+ struct TStatsId {
+ TPathId PathId;
+ TTabletId Datashard;
+
+ TStatsId(const TPathId& pathId, const TTabletId& datashard)
+ : PathId(pathId)
+ , Datashard(datashard)
+ {
+ }
+
+ bool operator==(const TStatsId& rhs) const {
+ return PathId == rhs.PathId && Datashard == rhs.Datashard;
+ }
+
+ struct THash {
+ inline size_t operator()(const TStatsId& obj) const {
+ return MultiHash(obj.PathId.Hash(), obj.Datashard);
+ }
+ };
+ };
+
+ using TStatsMap = THashMap<TStatsId, TStatsQueueItem*, TStatsId::THash>;
+
+ TStatsMap StatsMap;
+ TDeque<TStatsQueueItem> StatsQueue;
+
TSet<TPathId> CleanDroppedPathsCandidates;
TSet<TPathId> CleanDroppedSubDomainsCandidates;
bool CleanDroppedPathsInFly = false;
@@ -329,6 +376,10 @@ public:
void ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfig, const TActorContext& ctx);
void ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featureFlags, const TActorContext& ctx);
+ void ConfigureStatsBatching(
+ const NKikimrConfig::TSchemeShardConfig& config,
+ const TActorContext &ctx);
+
void ConfigureCompactionQueues(
const NKikimrConfig::TCompactionConfig& config,
const TActorContext &ctx);
@@ -881,6 +932,8 @@ public:
void Handle(TEvDataShard::TEvSplitAck::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvSplitPartitioningChangedAck::TPtr& ev, const TActorContext& ctx);
+ void ScheduleStatsBatch(const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvPersistStats::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvGetTableStatsResult::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h
index 751de0231e..e6e1a5f2a9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_private.h
+++ b/ydb/core/tx/schemeshard/schemeshard_private.h
@@ -24,6 +24,7 @@ struct TEvPrivate {
EvRunBorrowedCompaction,
EvCompletePublication,
EvCompleteBarrier,
+ EvPersistStats,
EvEnd
};
@@ -156,6 +157,9 @@ struct TEvPrivate {
}
};
+ struct TEvPersistStats: public TEventLocal<TEvPersistStats, EvPersistStats> {
+ TEvPersistStats() = default;
+ };
}; // TEvPrivate
diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp
index 20c5353bef..952063bd8a 100644
--- a/ydb/core/tx/schemeshard/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export.cpp
@@ -702,7 +702,11 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) {
Y_UNIT_TEST(ShouldExcludeBackupTableFromStats) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ TTestEnvOptions opts;
+ opts.DisableStatsBatching(true);
+
+ TTestEnv env(runtime, opts);
+
ui64 txId = 100;
auto writeRow = [&](ui64 tabletId, const TString& key, const TString& value) {
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
index 9cfd8037b7..9b749cb26e 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
@@ -507,6 +507,11 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnableBackgroundCompaction(opts.EnableBackgroundCompaction_);
app.FeatureFlags.SetEnablePublicApiExternalBlobs(true);
+ if (opts.DisableStatsBatching_.value_or(false)) {
+ app.SchemeShardConfig.SetStatsMaxBatchSize(0);
+ app.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
+ }
+
for (const auto& sid : opts.SystemBackupSIDs_) {
app.AddSystemBackupSID(sid);
}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
index 72b934c570..7e0a6f6d9c 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
@@ -42,6 +42,7 @@ namespace NSchemeShardUT_Private {
OPTION(std::optional<bool>, EnableOlapSchemaOperations, std::nullopt);
OPTION(std::optional<bool>, EnableProtoSourceIdInfo, std::nullopt);
OPTION(std::optional<bool>, EnableBackgroundCompaction, std::nullopt);
+ OPTION(std::optional<bool>, DisableStatsBatching, std::nullopt);
OPTION(THashSet<TString>, SystemBackupSIDs, {});
#undef OPTION
diff --git a/ydb/core/tx/schemeshard/ut_split_merge.cpp b/ydb/core/tx/schemeshard/ut_split_merge.cpp
index 5b7f82f604..45d8d8e0b1 100644
--- a/ydb/core/tx/schemeshard/ut_split_merge.cpp
+++ b/ydb/core/tx/schemeshard/ut_split_merge.cpp
@@ -162,6 +162,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitTest) {
TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
+ opts.DisableStatsBatching(true);
TTestEnv env(runtime, opts);
diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp
new file mode 100644
index 0000000000..f0c986fa34
--- /dev/null
+++ b/ydb/core/tx/schemeshard/ut_stats.cpp
@@ -0,0 +1,300 @@
+#include <ydb/core/cms/console/console.h>
+#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/tx/datashard/datashard.h>
+
+using namespace NKikimr;
+using namespace NSchemeShardUT_Private;
+
+namespace {
+
+constexpr ui64 INITIAL_ROWS_COUNT = 100;
+
+void WriteData(
+ TTestActorRuntime &runtime,
+ const char* name,
+ ui64 fromKeyInclusive,
+ ui64 toKey,
+ ui64 tabletId = TTestTxConfig::FakeHiveTablets)
+{
+ auto fnWriteRow = [&] (ui64 tabletId, ui64 key, const char* tableName) {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('key (Uint64 '%lu)) ) )
+ (let value '('('value (Utf8 'MostMeaninglessValueInTheWorld)) ) )
+ (return (AsList (UpdateRow '__user__%s key value) ))
+ )
+ )", key, tableName);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
+ };
+
+ for (ui64 key = fromKeyInclusive; key < toKey; ++key) {
+ fnWriteRow(tabletId, key, name);
+ }
+}
+
+void CreateTable(
+ TTestActorRuntime &runtime,
+ TTestEnv& env,
+ const char* path,
+ const char* name,
+ ui32 shardsCount,
+ ui64& txId,
+ ui64 schemeshardId = TTestTxConfig::SchemeShard)
+{
+ TestCreateTable(runtime, schemeshardId, ++txId, path,
+ Sprintf(R"____(
+ Name: "%s"
+ Columns { Name: "key" Type: "Uint64"}
+ Columns { Name: "value" Type: "Utf8"}
+ KeyColumnNames: ["key"]
+ UniformPartitionsCount: %d
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: %d
+ MaxPartitionsCount: %d
+ }
+ }
+ )____", name, shardsCount, shardsCount, shardsCount));
+ env.TestWaitNotification(runtime, txId, schemeshardId);
+}
+
+void CreateTableWithData(
+ TTestActorRuntime &runtime,
+ TTestEnv& env,
+ const char* path,
+ const char* name,
+ ui32 shardsCount,
+ ui64& txId,
+ ui64 schemeshardId = TTestTxConfig::SchemeShard)
+{
+ CreateTable(runtime, env, path, name, shardsCount, txId, schemeshardId);
+ WriteData(runtime, name, 0, INITIAL_ROWS_COUNT);
+}
+
+void WaitStat(
+ TTestActorRuntime &runtime,
+ TTestEnv& env,
+ ui64 rowsExpected,
+ ui64& storageStat)
+{
+ while (true) {
+ auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/Simple", true, true);
+ ui64 rowCount = description.GetPathDescription().GetTableStats().GetRowCount();
+ storageStat = description.GetPathDescription().GetTabletMetrics().GetStorage();
+ if (rowCount == rowsExpected)
+ break;
+ env.SimulateSleep(runtime, TDuration::MilliSeconds(100));
+ }
+}
+
+void WaitAndCheckStatPersisted(
+ TTestActorRuntime &runtime,
+ TTestEnv& env,
+ const ui64 rowsExpected,
+ TDuration batchTimeout,
+ TTestActorRuntime::EEventAction& eventAction,
+ bool rowsShouldRestore = true)
+{
+ ui64 storageStatExpected = 0;
+ WaitStat(runtime, env, rowsExpected, storageStatExpected);
+
+ env.SimulateSleep(runtime, batchTimeout + TDuration::Seconds(1));
+
+ // drop any further stat updates and restart SS
+ // the only way for SS to know proper stat is to read it from localDB
+ eventAction = TTestActorRuntime::EEventAction::DROP;
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/Simple", true, true);
+ ui64 rowCount = description.GetPathDescription().GetTableStats().GetRowCount();
+
+ if (rowsShouldRestore)
+ UNIT_ASSERT_VALUES_EQUAL(rowCount, rowsExpected);
+ else
+ UNIT_ASSERT_VALUES_EQUAL(rowCount, 0UL);
+
+ // restore
+ eventAction = TTestActorRuntime::EEventAction::PROCESS;
+}
+
+} // namespace
+
+Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
+ Y_UNIT_TEST(ShouldNotBatchWhenDisabled) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
+
+ auto& appData = runtime.GetAppData();
+
+ appData.FeatureFlags.SetEnablePersistentPartitionStats(true);
+
+ // disable batching
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(0);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ auto eventAction = TTestActorRuntime::EEventAction::PROCESS;
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvPeriodicTableStats: {
+ return eventAction;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ WaitAndCheckStatPersisted(runtime, env, INITIAL_ROWS_COUNT, TDuration::Zero(), eventAction);
+ }
+
+ Y_UNIT_TEST(ShouldPersistByBatchSize) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
+ const ui32 batchSize = 2;
+
+ auto& appData = runtime.GetAppData();
+
+ appData.FeatureFlags.SetEnablePersistentPartitionStats(true);
+
+ // set batching in a way it will finish only by batch size
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(10000000);
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(batchSize);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ auto eventAction = TTestActorRuntime::EEventAction::PROCESS;
+ ui64 statsCount = 0;
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvPeriodicTableStats: {
+ ++statsCount;
+ return eventAction;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ auto statsCountBefore = statsCount;
+ eventAction = TTestActorRuntime::EEventAction::PROCESS;
+
+ // now force split, when SS receives all stats it will finish its batch
+
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Simple"
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 20
+ MaxPartitionsCount: 20
+ SizeToSplit: 1
+ }
+ })");
+ env.TestWaitNotification(runtime, txId);
+
+ // we need this to fullfill batch so that actual split happens
+ CreateTable(runtime, env, "/MyRoot", "Simple2", 1, txId);
+
+ while (statsCount <= statsCountBefore + batchSize) {
+ env.SimulateSleep(runtime, TDuration::MilliSeconds(100));
+ }
+
+ WaitAndCheckStatPersisted(runtime, env, INITIAL_ROWS_COUNT, TDuration::Zero(), eventAction);
+ }
+
+ Y_UNIT_TEST(ShouldPersistByBatchTimeout) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
+ TDuration dsWakeupInterval = TDuration::Seconds(5); // hardcoded in DS
+ TDuration batchTimeout = dsWakeupInterval;
+
+ auto& appData = runtime.GetAppData();
+
+ appData.FeatureFlags.SetEnablePersistentPartitionStats(true);
+
+ // set batching only by timeout
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(batchTimeout.MilliSeconds());
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(10000);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ auto eventAction = TTestActorRuntime::EEventAction::PROCESS;
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvPeriodicTableStats: {
+ return eventAction;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ WaitAndCheckStatPersisted(runtime, env, INITIAL_ROWS_COUNT, batchTimeout, eventAction);
+
+ // write more and check if timeout happens second time
+ ui64 newRowsCount = INITIAL_ROWS_COUNT + 100;
+ WriteData(runtime, "Simple", INITIAL_ROWS_COUNT, newRowsCount);
+
+ WaitAndCheckStatPersisted(runtime, env, newRowsCount, batchTimeout, eventAction);
+ }
+};
diff --git a/ydb/core/tx/schemeshard/ut_stats/ya.make b/ydb/core/tx/schemeshard/ut_stats/ya.make
new file mode 100644
index 0000000000..4022c50a7c
--- /dev/null
+++ b/ydb/core/tx/schemeshard/ut_stats/ya.make
@@ -0,0 +1,37 @@
+UNITTEST_FOR(ydb/core/tx/schemeshard)
+
+OWNER(
+ eivanov89
+ g:kikimr
+)
+
+FORK_SUBTESTS()
+
+SPLIT_FACTOR(10)
+
+IF (SANITIZER_TYPE OR WITH_VALGRIND)
+ TIMEOUT(3600)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+PEERDIR(
+ library/cpp/getopt
+ library/cpp/regex/pcre
+ ydb/core/cms
+ ydb/core/testlib
+ ydb/core/tx
+ ydb/core/tx/schemeshard/ut_helpers
+ ydb/core/wrappers/ut_helpers
+)
+
+SRCS(
+ ut_stats.cpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/schemeshard/ut_subdomain.cpp b/ydb/core/tx/schemeshard/ut_subdomain.cpp
index d0d30bf7da..006607097a 100644
--- a/ydb/core/tx/schemeshard/ut_subdomain.cpp
+++ b/ydb/core/tx/schemeshard/ut_subdomain.cpp
@@ -2809,7 +2809,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
Y_UNIT_TEST(DiskSpaceUsage) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnablePersistentPartitionStats(true));
+ TTestEnvOptions opts;
+ opts.DisableStatsBatching(true);
+ opts.EnablePersistentPartitionStats(true);
+ TTestEnv env(runtime, opts);
const auto sender = runtime.AllocateEdgeActor();
auto writeRow = [&](ui64 tabletId, ui32 key, const TString& value, const char* table) {
@@ -2898,7 +2901,11 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
Y_UNIT_TEST(DiskSpaceQuotas) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnablePersistentPartitionStats(true));
+ TTestEnvOptions opts;
+ opts.DisableStatsBatching(true);
+ opts.EnablePersistentPartitionStats(true);
+
+ TTestEnv env(runtime, opts);
ui64 txId = 100;
auto writeRow = [&](ui64 tabletId, ui32 key, const TString& value, const char* table) {
diff --git a/ydb/core/tx/schemeshard/ut_ttl.cpp b/ydb/core/tx/schemeshard/ut_ttl.cpp
index 82a554c66e..4b54114f3a 100644
--- a/ydb/core/tx/schemeshard/ut_ttl.cpp
+++ b/ydb/core/tx/schemeshard/ut_ttl.cpp
@@ -931,7 +931,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardTTLTests) {
Y_UNIT_TEST(CheckCounters) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ TTestEnvOptions opts;
+ opts.DisableStatsBatching(true);
+
+ TTestEnv env(runtime, opts);
ui64 txId = 100;
runtime.UpdateCurrentTime(TInstant::Now());
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index 4cc983a6d9..e52400b5f2 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -32,6 +32,7 @@ RECURSE_FOR_TESTS(
ut_sequence
ut_sequence_reboots
ut_serverless
+ ut_stats
ut_split_merge
ut_split_merge_reboots
ut_subdomain