aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2024-08-09 08:48:32 +0300
committerGitHub <noreply@github.com>2024-08-09 08:48:32 +0300
commitb16060eebe0468af8307a0d31644cb6aeef53637 (patch)
tree5a1fca95b45dda3290980b5d2f204ff64aa79fb1
parentfb79e67e23c1fc51639e9135e099ee3bca7d6a4f (diff)
downloadydb-b16060eebe0468af8307a0d31644cb6aeef53637.tar.gz
Delete empty portions normalizer (#7596)
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/abstract.h1
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp120
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/clean_empty.h28
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/ya.make1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp24
6 files changed, 173 insertions, 3 deletions
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
index aaa6b6be1d..55b0da0e6b 100644
--- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
@@ -56,6 +56,7 @@ enum class ENormalizerSequentialId: ui32 {
TablesCleaner,
PortionsMetadata,
CleanGranuleId,
+ EmptyPortionsCleaner,
MAX
};
diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp
new file mode 100644
index 0000000000..56a258e0be
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp
@@ -0,0 +1,120 @@
+#include "clean_empty.h"
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+
+
+namespace NKikimr::NOlap {
+
+namespace {
+std::optional<THashSet<TPortionAddress>> GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc) {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+ if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
+ return std::nullopt;
+ }
+ THashSet<TPortionAddress> usedPortions;
+ auto rowset = db.Table<Schema::IndexColumns>().Select<
+ Schema::IndexColumns::PathId,
+ Schema::IndexColumns::Portion
+ >();
+ if (!rowset.IsReady()) {
+ return std::nullopt;
+ }
+ while (!rowset.EndOfSet()) {
+ usedPortions.emplace(
+ rowset.GetValue<Schema::IndexColumns::PathId>(),
+ rowset.GetValue<Schema::IndexColumns::Portion>()
+ );
+ if (!rowset.Next()) {
+ return std::nullopt;
+ }
+ }
+ return usedPortions;
+}
+
+using TBatch = std::vector<TPortionAddress>;
+
+std::optional<std::vector<TBatch>> GetPortionsToDelete(NTabletFlatExecutor::TTransactionContext& txc) {
+ using namespace NColumnShard;
+ const auto usedPortions = GetColumnPortionAddresses(txc);
+ if (!usedPortions) {
+ return std::nullopt;
+ }
+ const size_t MaxBatchSize = 10000;
+ NIceDb::TNiceDb db(txc.DB);
+ if (!Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme())) {
+ return std::nullopt;
+ }
+ auto rowset = db.Table<Schema::IndexPortions>().Select<
+ Schema::IndexPortions::PathId,
+ Schema::IndexPortions::PortionId
+ >();
+ if (!rowset.IsReady()) {
+ return std::nullopt;
+ }
+ std::vector<TBatch> result;
+ TBatch portionsToDelete;
+ while (!rowset.EndOfSet()) {
+ TPortionAddress addr(
+ rowset.GetValue<Schema::IndexPortions::PathId>(),
+ rowset.GetValue<Schema::IndexPortions::PortionId>()
+ );
+ if (!usedPortions->contains(addr)) {
+ ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << addr.DebugString() << " marked for deletion");
+ portionsToDelete.emplace_back(std::move(addr));
+ if (portionsToDelete.size() == MaxBatchSize) {
+ result.emplace_back(std::move(portionsToDelete));
+ portionsToDelete = TBatch{};
+ }
+ }
+ if (!rowset.Next()) {
+ return std::nullopt;
+ }
+ }
+ if (!portionsToDelete.empty()) {
+ result.emplace_back(std::move(portionsToDelete));
+ }
+ return result;
+}
+
+class TChanges : public INormalizerChanges {
+public:
+ TChanges(TBatch&& addresses)
+ : Addresses(addresses)
+ {}
+ bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+ for(const auto& a: Addresses) {
+ db.Table<Schema::IndexPortions>().Key(
+ a.GetPathId(),
+ a.GetPortionId()
+ ).Delete();
+ }
+ ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << GetSize() << " portions deleted");
+ return true;
+ }
+
+ ui64 GetSize() const override {
+ return Addresses.size();
+ }
+private:
+ const TBatch Addresses;
+};
+
+} //namespace
+
+TConclusion<std::vector<INormalizerTask::TPtr>> TCleanEmptyPortionsNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
+ using namespace NColumnShard;
+ auto batchesToDelete = GetPortionsToDelete(txc);
+ if (!batchesToDelete) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ std::vector<INormalizerTask::TPtr> result;
+ for (auto&& b: *batchesToDelete) {
+ result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(b))));
+ }
+ return result;
+}
+
+} //namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h
new file mode 100644
index 0000000000..920b3d8c0f
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h
@@ -0,0 +1,28 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
+
+namespace NKikimr::NOlap {
+
+class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormalizerComponent {
+
+ static TString ClassName() {
+ return ToString(ENormalizerSequentialId::EmptyPortionsCleaner);
+ }
+ static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TCleanEmptyPortionsNormalizer>(ClassName());
+public:
+ TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext&)
+ {}
+
+ std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
+ return ENormalizerSequentialId::EmptyPortionsCleaner;
+ }
+
+ TString GetClassName() const override {
+ return ClassName();
+ }
+
+ TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
+};
+
+} //namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
index b286281c2d..63cea8b199 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
@@ -7,7 +7,7 @@
namespace NKikimr::NOlap {
TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
- auto initRes = DoInitImpl(controller,txc);
+ auto initRes = DoInitImpl(controller, txc);
if (initRes.IsFail()) {
return initRes;
diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make
index ec31c82f7b..ff813694a9 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/ya.make
+++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make
@@ -5,6 +5,7 @@ SRCS(
GLOBAL portion.cpp
GLOBAL chunks.cpp
GLOBAL clean.cpp
+ GLOBAL clean_empty.cpp
GLOBAL broken_blobs.cpp
)
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
index 50f305bf4f..68eecd9f7b 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+#include <ydb/core/tx/columnshard/engines/portions/constructor.h>
#include <ydb/core/tx/columnshard/operations/write_data.h>
@@ -161,7 +162,7 @@ public:
}
};
-class TPortinosCleaner : public NYDBTest::ILocalDBModifier {
+class TPortionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
@@ -185,6 +186,21 @@ public:
}
};
+
+class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier {
+public:
+ virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+ for (size_t pathId = 100; pathId != 299; ++pathId) {
+ for (size_t portionId = 1000; portionId != 1199; ++portionId) {
+ db.Table<Schema::IndexPortions>().Key(pathId, portionId).Update();
+ }
+ }
+ }
+};
+
+
class TTablesCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
@@ -317,7 +333,11 @@ Y_UNIT_TEST_SUITE(Normalizers) {
}
Y_UNIT_TEST(PortionsNormalizer) {
- TestNormalizerImpl<TPortinosCleaner>();
+ TestNormalizerImpl<TPortionsCleaner>();
+ }
+
+ Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
+ TestNormalizerImpl<TEmptyPortionsCleaner>();
}
Y_UNIT_TEST(EmptyTablesNormalizer) {