diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2024-08-09 08:48:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-09 08:48:32 +0300 |
commit | b16060eebe0468af8307a0d31644cb6aeef53637 (patch) | |
tree | 5a1fca95b45dda3290980b5d2f204ff64aa79fb1 | |
parent | fb79e67e23c1fc51639e9135e099ee3bca7d6a4f (diff) | |
download | ydb-b16060eebe0468af8307a0d31644cb6aeef53637.tar.gz |
Delete empty portions normalizer (#7596)
6 files changed, 173 insertions, 3 deletions
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index aaa6b6be1d..55b0da0e6b 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -56,6 +56,7 @@ enum class ENormalizerSequentialId: ui32 { TablesCleaner, PortionsMetadata, CleanGranuleId, + EmptyPortionsCleaner, MAX }; diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp new file mode 100644 index 0000000000..56a258e0be --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp @@ -0,0 +1,120 @@ +#include "clean_empty.h" +#include <ydb/core/tx/columnshard/columnshard_schema.h> + + +namespace NKikimr::NOlap { + +namespace { +std::optional<THashSet<TPortionAddress>> GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) { + return std::nullopt; + } + THashSet<TPortionAddress> usedPortions; + auto rowset = db.Table<Schema::IndexColumns>().Select< + Schema::IndexColumns::PathId, + Schema::IndexColumns::Portion + >(); + if (!rowset.IsReady()) { + return std::nullopt; + } + while (!rowset.EndOfSet()) { + usedPortions.emplace( + rowset.GetValue<Schema::IndexColumns::PathId>(), + rowset.GetValue<Schema::IndexColumns::Portion>() + ); + if (!rowset.Next()) { + return std::nullopt; + } + } + return usedPortions; +} + +using TBatch = std::vector<TPortionAddress>; + +std::optional<std::vector<TBatch>> GetPortionsToDelete(NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + const auto usedPortions = GetColumnPortionAddresses(txc); + if (!usedPortions) { + return std::nullopt; + } + const size_t MaxBatchSize = 10000; + NIceDb::TNiceDb db(txc.DB); + if (!Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme())) { + return std::nullopt; + } + auto rowset = db.Table<Schema::IndexPortions>().Select< + Schema::IndexPortions::PathId, + Schema::IndexPortions::PortionId + >(); + if (!rowset.IsReady()) { + return std::nullopt; + } + std::vector<TBatch> result; + TBatch portionsToDelete; + while (!rowset.EndOfSet()) { + TPortionAddress addr( + rowset.GetValue<Schema::IndexPortions::PathId>(), + rowset.GetValue<Schema::IndexPortions::PortionId>() + ); + if (!usedPortions->contains(addr)) { + ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << addr.DebugString() << " marked for deletion"); + portionsToDelete.emplace_back(std::move(addr)); + if (portionsToDelete.size() == MaxBatchSize) { + result.emplace_back(std::move(portionsToDelete)); + portionsToDelete = TBatch{}; + } + } + if (!rowset.Next()) { + return std::nullopt; + } + } + if (!portionsToDelete.empty()) { + result.emplace_back(std::move(portionsToDelete)); + } + return result; +} + +class TChanges : public INormalizerChanges { +public: + TChanges(TBatch&& addresses) + : Addresses(addresses) + {} + bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + for(const auto& a: Addresses) { + db.Table<Schema::IndexPortions>().Key( + a.GetPathId(), + a.GetPortionId() + ).Delete(); + } + ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << GetSize() << " portions deleted"); + return true; + } + + ui64 GetSize() const override { + return Addresses.size(); + } +private: + const TBatch Addresses; +}; + +} //namespace + +TConclusion<std::vector<INormalizerTask::TPtr>> TCleanEmptyPortionsNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + auto batchesToDelete = GetPortionsToDelete(txc); + if (!batchesToDelete) { + return TConclusionStatus::Fail("Not ready"); + } + + std::vector<INormalizerTask::TPtr> result; + for (auto&& b: *batchesToDelete) { + result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(b)))); + } + return result; +} + +} //namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h new file mode 100644 index 0000000000..920b3d8c0f --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h @@ -0,0 +1,28 @@ +#pragma once + +#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> + +namespace NKikimr::NOlap { + +class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormalizerComponent { + + static TString ClassName() { + return ToString(ENormalizerSequentialId::EmptyPortionsCleaner); + } + static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TCleanEmptyPortionsNormalizer>(ClassName()); +public: + TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext&) + {} + + std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { + return ENormalizerSequentialId::EmptyPortionsCleaner; + } + + TString GetClassName() const override { + return ClassName(); + } + + TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; +}; + +} //namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index b286281c2d..63cea8b199 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NOlap { TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { - auto initRes = DoInitImpl(controller,txc); + auto initRes = DoInitImpl(controller, txc); if (initRes.IsFail()) { return initRes; diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index ec31c82f7b..ff813694a9 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -5,6 +5,7 @@ SRCS( GLOBAL portion.cpp GLOBAL chunks.cpp GLOBAL clean.cpp + GLOBAL clean_empty.cpp GLOBAL broken_blobs.cpp ) diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 50f305bf4f..68eecd9f7b 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -3,6 +3,7 @@ #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/columnshard/hooks/testing/controller.h> +#include <ydb/core/tx/columnshard/engines/portions/constructor.h> #include <ydb/core/tx/columnshard/operations/write_data.h> @@ -161,7 +162,7 @@ public: } }; -class TPortinosCleaner : public NYDBTest::ILocalDBModifier { +class TPortionsCleaner : public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { using namespace NColumnShard; @@ -185,6 +186,21 @@ public: } }; + +class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier { +public: + virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + for (size_t pathId = 100; pathId != 299; ++pathId) { + for (size_t portionId = 1000; portionId != 1199; ++portionId) { + db.Table<Schema::IndexPortions>().Key(pathId, portionId).Update(); + } + } + } +}; + + class TTablesCleaner : public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { @@ -317,7 +333,11 @@ Y_UNIT_TEST_SUITE(Normalizers) { } Y_UNIT_TEST(PortionsNormalizer) { - TestNormalizerImpl<TPortinosCleaner>(); + TestNormalizerImpl<TPortionsCleaner>(); + } + + Y_UNIT_TEST(CleanEmptyPortionsNormalizer) { + TestNormalizerImpl<TEmptyPortionsCleaner>(); } Y_UNIT_TEST(EmptyTablesNormalizer) { |