aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@ydb.tech>2024-01-29 15:38:43 +0300
committerGitHub <noreply@github.com>2024-01-29 15:38:43 +0300
commita108d4764281fa7b16511bb739171461bb0ab0c4 (patch)
tree4cd9e1d36dd4d32b30d6f977a2a97ee1a062fd46
parent80de5e67c27ad49466db61e140b842f25f0d23d8 (diff)
downloadydb-a108d4764281fa7b16511bb739171461bb0ab0c4.tar.gz
Fix counter for granules normalizer (#1368)
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/abstract.h13
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp1
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/normalizer.h4
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/chunks.h9
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/min_max.h9
5 files changed, 12 insertions, 24 deletions
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
index 904fbe6de1..89e75a67e0 100644
--- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
@@ -75,10 +75,19 @@ namespace NKikimr::NOlap {
virtual ~INormalizerComponent() {}
+ bool WaitResult() const {
+ return AtomicGet(ActiveTasksCount) > 0;
+ }
+
+ void OnResultReady() {
+ AFL_VERIFY(ActiveTasksCount > 0);
+ AtomicDecrement(ActiveTasksCount);
+ }
+
virtual const TString& GetName() const = 0;
- virtual bool WaitResult() const = 0;
- virtual void OnResultReady() {}
virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0;
+ protected:
+ TAtomic ActiveTasksCount = 0;
};
class TNormalizationController {
diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
index dbf2cc3ba5..5eeaba2ae5 100644
--- a/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
+++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
@@ -147,6 +147,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TGranulesNormalizer::Init(const
for (auto&& c : *changes) {
tasks.emplace_back(std::make_shared<TGranulesNormalizerTask>(c));
}
+ AtomicSet(ActiveTasksCount, tasks.size());
return tasks;
}
diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.h b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h
index eb424eaac1..a3c78e4ba8 100644
--- a/ydb/core/tx/columnshard/normalizer/granule/normalizer.h
+++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h
@@ -14,10 +14,6 @@ public:
return name;
}
- virtual bool WaitResult() const override {
- return false;
- }
-
virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};
diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.h b/ydb/core/tx/columnshard/normalizer/portion/chunks.h
index 70049b81b6..d174b89038 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/chunks.h
+++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.h
@@ -93,18 +93,9 @@ namespace NKikimr::NOlap {
return name;
}
- virtual bool WaitResult() const override {
- return AtomicGet(ActiveTasksCount) > 0;
- }
-
- void OnResultReady() override {
- AtomicDecrement(ActiveTasksCount);
- }
-
virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
private:
NColumnShard::TBlobGroupSelector DsGroupSelector;
- TAtomic ActiveTasksCount = 0;
};
}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/min_max.h b/ydb/core/tx/columnshard/normalizer/portion/min_max.h
index 267a379e2b..6b7f8cd1eb 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/min_max.h
+++ b/ydb/core/tx/columnshard/normalizer/portion/min_max.h
@@ -26,19 +26,10 @@ public:
return name;
}
- virtual bool WaitResult() const override {
- return AtomicGet(ActiveTasksCount) > 0;
- }
-
- void OnResultReady() override {
- AtomicDecrement(ActiveTasksCount);
- }
-
virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
private:
NColumnShard::TBlobGroupSelector DsGroupSelector;
- TAtomic ActiveTasksCount = 0;
};
}