aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-04-11 10:24:44 +0300
committertesseract <tesseract@yandex-team.com>2023-04-11 10:24:44 +0300
commita628bac8edf4191cdb5679f39ae4815f2affbbd9 (patch)
treeed5d91b9b2999d9456fc6cd2d0103004ca9e0367
parent975c50fadeade30acd3c68211922d25c45739d46 (diff)
downloadydb-a628bac8edf4191cdb5679f39ae4815f2affbbd9.tar.gz
Поправить ошибку, когда обработка сообщений статистики может обрабатываться без батча
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__stats.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__stats_impl.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp7
4 files changed, 19 insertions, 3 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
index 90ada5af23..93b2f4cd8e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
@@ -23,6 +23,7 @@ public:
// returns true to continue batching
bool PersistSingleStats(const TPathId& pathId, const TStatsQueue<TEvPersQueue::TEvPeriodicTopicStats>::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override;
+ void ScheduleNextBatch(const TActorContext& ctx) override;
};
@@ -82,6 +83,10 @@ void TTxStoreTopicStats::Complete(const TActorContext&) {
Queue.WriteQueueSizeMetric();
}
+void TTxStoreTopicStats::ScheduleNextBatch(const TActorContext& ctx) {
+ Self->ExecuteTopicStatsBatch(ctx);
+}
+
void TSchemeShard::Handle(TEvPersQueue::TEvPeriodicTopicStats::TPtr& ev, const TActorContext& ctx) {
const auto& rec = ev->Get()->Record;
@@ -122,7 +127,7 @@ void TSchemeShard::ExecuteTopicStatsBatch(const TActorContext& ctx) {
"Will execute TTxStoreStats, queue# " << TopicStatsQueue.Size());
TopicPersistStatsPending = true;
- Execute(new TTxStoreTopicStats(this, TopicStatsQueue, TopicPersistStatsPending), ctx);
+ EnqueueExecute(new TTxStoreTopicStats(this, TopicStatsQueue, TopicPersistStatsPending));
ScheduleTopicStatsBatch(ctx);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__stats.h b/ydb/core/tx/schemeshard/schemeshard__stats.h
index b9abf7ab34..ed9edffe9e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__stats.h
+++ b/ydb/core/tx/schemeshard/schemeshard__stats.h
@@ -74,6 +74,7 @@ public:
TDuration Age() const;
TDuration Delay() const;
+ EStatsQueueStatus Status() const;
bool Empty() const;
size_t Size() const;
@@ -90,7 +91,6 @@ public:
const EPercentileCounters LatencyCounter;
private:
- EStatsQueueStatus Status() const;
TSchemeShard* SS;
@@ -126,6 +126,8 @@ public:
// returns true to continue batching
virtual bool PersistSingleStats(const TPathId& pathId, const TItem& item, TTransactionContext& txc, const TActorContext& ctx) = 0;
+
+ virtual void ScheduleNextBatch(const TActorContext& ctx) = 0;
};
} // NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard__stats_impl.h b/ydb/core/tx/schemeshard/schemeshard__stats_impl.h
index 338de84899..25048a35af 100644
--- a/ydb/core/tx/schemeshard/schemeshard__stats_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard__stats_impl.h
@@ -135,6 +135,10 @@ bool TTxStoreStats<TEvent>::Execute(NTabletFlatExecutor::TTransactionContext& tx
Self->TabletCounters->Cumulative()[Queue.WrittenCounter].Increment(batchSize);
+ if (READY == Queue.Status()) {
+ ScheduleNextBatch(ctx);
+ }
+
return true;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index 8e96b09715..43b45c7521 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -90,6 +90,7 @@ public:
// returns true to continue batching
bool PersistSingleStats(const TPathId& pathId, const TStatsQueue<TEvDataShard::TEvPeriodicTableStats>::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override;
+ void ScheduleNextBatch(const TActorContext& ctx) override;
};
@@ -399,6 +400,10 @@ void TTxStoreTableStats::Complete(const TActorContext& ctx) {
Queue.WriteQueueSizeMetric();
}
+void TTxStoreTableStats::ScheduleNextBatch(const TActorContext& ctx) {
+ Self->ExecuteTableStatsBatch(ctx);
+}
+
void TSchemeShard::Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const TActorContext& ctx) {
const auto& rec = ev->Get()->Record;
@@ -448,7 +453,7 @@ void TSchemeShard::Handle(TEvPrivate::TEvPersistTableStats::TPtr&, const TActorC
void TSchemeShard::ExecuteTableStatsBatch(const TActorContext& ctx) {
if (!TablePersistStatsPending && !TableStatsQueue.Empty()) {
TablePersistStatsPending = true;
- Execute(new TTxStoreTableStats(this, TableStatsQueue, TablePersistStatsPending), ctx);
+ EnqueueExecute(new TTxStoreTableStats(this, TableStatsQueue, TablePersistStatsPending));
LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Will execute TTxStoreStats, queue# " << TableStatsQueue.Size());
ScheduleTableStatsBatch(ctx);