diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-09-24 13:26:10 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-24 13:26:10 +0300 |
commit | 25fdf52ca302b14d474afca9da0f13f9fb312c5c (patch) | |
tree | ea02dd9b22d0f04a64089b36e28bafd9617d6e9e | |
parent | af0844f2962ed3e1ad5710c44db08e8c04323720 (diff) | |
download | ydb-25fdf52ca302b14d474afca9da0f13f9fb312c5c.tar.gz |
clean trash on versions switching (#9679)
3 files changed, 197 insertions, 0 deletions
diff --git a/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp new file mode 100644 index 0000000000..2302d13654 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp @@ -0,0 +1,102 @@ +#include "special_cleaner.h" + +#include <ydb/core/tx/columnshard/columnshard_private_events.h> + +namespace NKikimr::NOlap::NNormalizer::NSpecialColumns { + +namespace { + +class TChanges: public INormalizerChanges { +public: + TChanges(TDeleteTrashImpl::TKeyBatch&& keys) + : Keys(keys) { + } + bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normController*/) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + for (const auto& k : Keys) { + db.Table<Schema::IndexColumns>().Key(k.Index, k.Granule, k.ColumnIdx, k.PlanStep, k.TxId, k.Portion, k.Chunk).Delete(); + } + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")("message", TStringBuilder() << GetSize() << " rows deleted"); + return true; + } + + ui64 GetSize() const override { + return Keys.size(); + } + +private: + const TDeleteTrashImpl::TKeyBatch Keys; +}; + +} //namespace + +TConclusion<std::vector<INormalizerTask::TPtr>> TDeleteTrashImpl::DoInit( + const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + const size_t MaxBatchSize = 10000; + auto keysToDelete = KeysToDelete(txc, MaxBatchSize); + if (!keysToDelete) { + return TConclusionStatus::Fail("Not ready"); + } + ui32 removeCount = 0; + for (auto&& i : *keysToDelete) { + removeCount += i.size(); + } + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")( + "message", TStringBuilder() << "found " << removeCount << " rows to delete grouped in " << keysToDelete->size() << " batches"); + + std::vector<INormalizerTask::TPtr> result; + for (auto&& batch : *keysToDelete) { + AFL_VERIFY(!batch.empty()); + result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(batch)))); + } + return result; +} + +std::optional<std::vector<TDeleteTrashImpl::TKeyBatch>> TDeleteTrashImpl::KeysToDelete( + NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize) { + NIceDb::TNiceDb db(txc.DB); + using namespace NColumnShard; + if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) { + return std::nullopt; + } + const std::set<ui64> columnIdsToDelete = GetColumnIdsToDelete(); + std::vector<TKeyBatch> result; + TKeyBatch currentBatch; + auto rowset = + db.Table<Schema::IndexColumns>() + .Select<Schema::IndexColumns::Index, Schema::IndexColumns::Granule, Schema::IndexColumns::ColumnIdx, Schema::IndexColumns::PlanStep, + Schema::IndexColumns::TxId, Schema::IndexColumns::Portion, Schema::IndexColumns::Chunk>(); + if (!rowset.IsReady()) { + return std::nullopt; + } + while (!rowset.EndOfSet()) { + if (columnIdsToDelete.contains(rowset.GetValue<Schema::IndexColumns::ColumnIdx>())) { + auto key = TKey{ + .Index = rowset.GetValue<Schema::IndexColumns::Index>(), + .Granule = rowset.GetValue<Schema::IndexColumns::Granule>(), + .ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(), + .PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(), + .TxId = rowset.GetValue<Schema::IndexColumns::TxId>(), + .Portion = rowset.GetValue<Schema::IndexColumns::Portion>(), + .Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>() }; + currentBatch.emplace_back(std::move(key)); + if (currentBatch.size() == maxBatchSize) { + result.emplace_back(std::move(currentBatch)); + currentBatch = TKeyBatch{}; + } + } + if (!rowset.Next()) { + return std::nullopt; + } + } + if (!currentBatch.empty()) { + result.emplace_back(std::move(currentBatch)); + } + + return result; +} + +} // namespace NKikimr::NOlap::NNormalizer::NSpecialColumns diff --git a/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h new file mode 100644 index 0000000000..bac01c3f3d --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h @@ -0,0 +1,94 @@ +#pragma once + +#include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> + +namespace NKikimr::NOlap::NNormalizer::NSpecialColumns { + +class TDeleteTrashImpl: public TNormalizationController::INormalizerComponent { +public: + struct TKey { + ui32 Index; + ui64 Granule; + ui32 ColumnIdx; + ui64 PlanStep; + ui64 TxId; + ui64 Portion; + ui32 Chunk; + }; + + using TKeyBatch = std::vector<TKey>; + +private: + + std::optional<std::vector<TKeyBatch>> KeysToDelete(NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize); + + virtual std::set<ui64> GetColumnIdsToDelete() const = 0; + +public: + TDeleteTrashImpl(const TNormalizationController::TInitContext&) { + } + + virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; +}; + +class TRemoveDeleteFlag: public TDeleteTrashImpl { +private: + using TBase = TDeleteTrashImpl; +public: + static TString GetClassNameStatic() { + return "RemoveDeleteFlag"; + } + +private: + static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveDeleteFlag>(GetClassNameStatic()); + + virtual std::set<ui64> GetColumnIdsToDelete() const override { + return { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX }; + } + + virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { + return {}; + } + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + +public: + TRemoveDeleteFlag(const TNormalizationController::TInitContext& context) + : TBase(context) { + } +}; + +class TRemoveWriteId: public TDeleteTrashImpl { +private: + using TBase = TDeleteTrashImpl; + +public: + static TString GetClassNameStatic() { + return "RemoveWriteId"; + } + +private: + static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveWriteId>(GetClassNameStatic()); + + virtual std::set<ui64> GetColumnIdsToDelete() const override { + return { NPortion::TSpecialColumns::SPEC_COL_WRITE_ID_INDEX }; + } + + virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { + return {}; + } + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + +public: + TRemoveWriteId(const TNormalizationController::TInitContext& context) + : TBase(context) + { + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index ff813694a9..e53cf5497b 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -7,6 +7,7 @@ SRCS( GLOBAL clean.cpp GLOBAL clean_empty.cpp GLOBAL broken_blobs.cpp + GLOBAL special_cleaner.cpp ) PEERDIR( |