diff options
author | tesseract <tesseract@yandex-team.com> | 2023-04-11 10:24:44 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-04-11 10:24:44 +0300 |
commit | a628bac8edf4191cdb5679f39ae4815f2affbbd9 (patch) | |
tree | ed5d91b9b2999d9456fc6cd2d0103004ca9e0367 | |
parent | 975c50fadeade30acd3c68211922d25c45739d46 (diff) | |
download | ydb-a628bac8edf4191cdb5679f39ae4815f2affbbd9.tar.gz |
Поправить ошибку, когда обработка сообщений статистики может обрабатываться без батча
4 files changed, 19 insertions, 3 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp index 90ada5af23..93b2f4cd8e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp @@ -23,6 +23,7 @@ public: // returns true to continue batching bool PersistSingleStats(const TPathId& pathId, const TStatsQueue<TEvPersQueue::TEvPeriodicTopicStats>::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override; + void ScheduleNextBatch(const TActorContext& ctx) override; }; @@ -82,6 +83,10 @@ void TTxStoreTopicStats::Complete(const TActorContext&) { Queue.WriteQueueSizeMetric(); } +void TTxStoreTopicStats::ScheduleNextBatch(const TActorContext& ctx) { + Self->ExecuteTopicStatsBatch(ctx); +} + void TSchemeShard::Handle(TEvPersQueue::TEvPeriodicTopicStats::TPtr& ev, const TActorContext& ctx) { const auto& rec = ev->Get()->Record; @@ -122,7 +127,7 @@ void TSchemeShard::ExecuteTopicStatsBatch(const TActorContext& ctx) { "Will execute TTxStoreStats, queue# " << TopicStatsQueue.Size()); TopicPersistStatsPending = true; - Execute(new TTxStoreTopicStats(this, TopicStatsQueue, TopicPersistStatsPending), ctx); + EnqueueExecute(new TTxStoreTopicStats(this, TopicStatsQueue, TopicPersistStatsPending)); ScheduleTopicStatsBatch(ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard__stats.h b/ydb/core/tx/schemeshard/schemeshard__stats.h index b9abf7ab34..ed9edffe9e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__stats.h +++ b/ydb/core/tx/schemeshard/schemeshard__stats.h @@ -74,6 +74,7 @@ public: TDuration Age() const; TDuration Delay() const; + EStatsQueueStatus Status() const; bool Empty() const; size_t Size() const; @@ -90,7 +91,6 @@ public: const EPercentileCounters LatencyCounter; private: - EStatsQueueStatus Status() const; TSchemeShard* SS; @@ -126,6 +126,8 @@ public: // returns true to continue batching virtual bool PersistSingleStats(const TPathId& pathId, const TItem& item, TTransactionContext& txc, const TActorContext& ctx) = 0; + + virtual void ScheduleNextBatch(const TActorContext& ctx) = 0; }; } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__stats_impl.h b/ydb/core/tx/schemeshard/schemeshard__stats_impl.h index 338de84899..25048a35af 100644 --- a/ydb/core/tx/schemeshard/schemeshard__stats_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard__stats_impl.h @@ -135,6 +135,10 @@ bool TTxStoreStats<TEvent>::Execute(NTabletFlatExecutor::TTransactionContext& tx Self->TabletCounters->Cumulative()[Queue.WrittenCounter].Increment(batchSize); + if (READY == Queue.Status()) { + ScheduleNextBatch(ctx); + } + return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index 8e96b09715..43b45c7521 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -90,6 +90,7 @@ public: // returns true to continue batching bool PersistSingleStats(const TPathId& pathId, const TStatsQueue<TEvDataShard::TEvPeriodicTableStats>::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override; + void ScheduleNextBatch(const TActorContext& ctx) override; }; @@ -399,6 +400,10 @@ void TTxStoreTableStats::Complete(const TActorContext& ctx) { Queue.WriteQueueSizeMetric(); } +void TTxStoreTableStats::ScheduleNextBatch(const TActorContext& ctx) { + Self->ExecuteTableStatsBatch(ctx); +} + void TSchemeShard::Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const TActorContext& ctx) { const auto& rec = ev->Get()->Record; @@ -448,7 +453,7 @@ void TSchemeShard::Handle(TEvPrivate::TEvPersistTableStats::TPtr&, const TActorC void TSchemeShard::ExecuteTableStatsBatch(const TActorContext& ctx) { if (!TablePersistStatsPending && !TableStatsQueue.Empty()) { TablePersistStatsPending = true; - Execute(new TTxStoreTableStats(this, TableStatsQueue, TablePersistStatsPending), ctx); + EnqueueExecute(new TTxStoreTableStats(this, TableStatsQueue, TablePersistStatsPending)); LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Will execute TTxStoreStats, queue# " << TableStatsQueue.Size()); ScheduleTableStatsBatch(ctx); |