diff options
author | nsofya <nsofya@ydb.tech> | 2024-01-29 15:38:43 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-29 15:38:43 +0300 |
commit | a108d4764281fa7b16511bb739171461bb0ab0c4 (patch) | |
tree | 4cd9e1d36dd4d32b30d6f977a2a97ee1a062fd46 | |
parent | 80de5e67c27ad49466db61e140b842f25f0d23d8 (diff) | |
download | ydb-a108d4764281fa7b16511bb739171461bb0ab0c4.tar.gz |
Fix counter for granules normalizer (#1368)
5 files changed, 12 insertions, 24 deletions
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index 904fbe6de1..89e75a67e0 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -75,10 +75,19 @@ namespace NKikimr::NOlap { virtual ~INormalizerComponent() {} + bool WaitResult() const { + return AtomicGet(ActiveTasksCount) > 0; + } + + void OnResultReady() { + AFL_VERIFY(ActiveTasksCount > 0); + AtomicDecrement(ActiveTasksCount); + } + virtual const TString& GetName() const = 0; - virtual bool WaitResult() const = 0; - virtual void OnResultReady() {} virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0; + protected: + TAtomic ActiveTasksCount = 0; }; class TNormalizationController { diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp index dbf2cc3ba5..5eeaba2ae5 100644 --- a/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp @@ -147,6 +147,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TGranulesNormalizer::Init(const for (auto&& c : *changes) { tasks.emplace_back(std::make_shared<TGranulesNormalizerTask>(c)); } + AtomicSet(ActiveTasksCount, tasks.size()); return tasks; } diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.h b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h index eb424eaac1..a3c78e4ba8 100644 --- a/ydb/core/tx/columnshard/normalizer/granule/normalizer.h +++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h @@ -14,10 +14,6 @@ public: return name; } - virtual bool WaitResult() const override { - return false; - } - virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; }; diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.h b/ydb/core/tx/columnshard/normalizer/portion/chunks.h index 70049b81b6..d174b89038 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/chunks.h +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.h @@ -93,18 +93,9 @@ namespace NKikimr::NOlap { return name; } - virtual bool WaitResult() const override { - return AtomicGet(ActiveTasksCount) > 0; - } - - void OnResultReady() override { - AtomicDecrement(ActiveTasksCount); - } - virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; private: NColumnShard::TBlobGroupSelector DsGroupSelector; - TAtomic ActiveTasksCount = 0; }; } diff --git a/ydb/core/tx/columnshard/normalizer/portion/min_max.h b/ydb/core/tx/columnshard/normalizer/portion/min_max.h index 267a379e2b..6b7f8cd1eb 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/min_max.h +++ b/ydb/core/tx/columnshard/normalizer/portion/min_max.h @@ -26,19 +26,10 @@ public: return name; } - virtual bool WaitResult() const override { - return AtomicGet(ActiveTasksCount) > 0; - } - - void OnResultReady() override { - AtomicDecrement(ActiveTasksCount); - } - virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; private: NColumnShard::TBlobGroupSelector DsGroupSelector; - TAtomic ActiveTasksCount = 0; }; } |