aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-09-24 13:26:10 +0300
committerGitHub <noreply@github.com>2024-09-24 13:26:10 +0300
commit25fdf52ca302b14d474afca9da0f13f9fb312c5c (patch)
treeea02dd9b22d0f04a64089b36e28bafd9617d6e9e
parentaf0844f2962ed3e1ad5710c44db08e8c04323720 (diff)
downloadydb-25fdf52ca302b14d474afca9da0f13f9fb312c5c.tar.gz
clean trash on versions switching (#9679)
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp102
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h94
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/ya.make1
3 files changed, 197 insertions, 0 deletions
diff --git a/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp
new file mode 100644
index 0000000000..2302d13654
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp
@@ -0,0 +1,102 @@
+#include "special_cleaner.h"
+
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+
+namespace NKikimr::NOlap::NNormalizer::NSpecialColumns {
+
+namespace {
+
+class TChanges: public INormalizerChanges {
+public:
+ TChanges(TDeleteTrashImpl::TKeyBatch&& keys)
+ : Keys(keys) {
+ }
+ bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normController*/) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+ for (const auto& k : Keys) {
+ db.Table<Schema::IndexColumns>().Key(k.Index, k.Granule, k.ColumnIdx, k.PlanStep, k.TxId, k.Portion, k.Chunk).Delete();
+ }
+ AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")("message", TStringBuilder() << GetSize() << " rows deleted");
+ return true;
+ }
+
+ ui64 GetSize() const override {
+ return Keys.size();
+ }
+
+private:
+ const TDeleteTrashImpl::TKeyBatch Keys;
+};
+
+} //namespace
+
+TConclusion<std::vector<INormalizerTask::TPtr>> TDeleteTrashImpl::DoInit(
+ const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+ const size_t MaxBatchSize = 10000;
+ auto keysToDelete = KeysToDelete(txc, MaxBatchSize);
+ if (!keysToDelete) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+ ui32 removeCount = 0;
+ for (auto&& i : *keysToDelete) {
+ removeCount += i.size();
+ }
+ AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")(
+ "message", TStringBuilder() << "found " << removeCount << " rows to delete grouped in " << keysToDelete->size() << " batches");
+
+ std::vector<INormalizerTask::TPtr> result;
+ for (auto&& batch : *keysToDelete) {
+ AFL_VERIFY(!batch.empty());
+ result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(batch))));
+ }
+ return result;
+}
+
+std::optional<std::vector<TDeleteTrashImpl::TKeyBatch>> TDeleteTrashImpl::KeysToDelete(
+ NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize) {
+ NIceDb::TNiceDb db(txc.DB);
+ using namespace NColumnShard;
+ if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
+ return std::nullopt;
+ }
+ const std::set<ui64> columnIdsToDelete = GetColumnIdsToDelete();
+ std::vector<TKeyBatch> result;
+ TKeyBatch currentBatch;
+ auto rowset =
+ db.Table<Schema::IndexColumns>()
+ .Select<Schema::IndexColumns::Index, Schema::IndexColumns::Granule, Schema::IndexColumns::ColumnIdx, Schema::IndexColumns::PlanStep,
+ Schema::IndexColumns::TxId, Schema::IndexColumns::Portion, Schema::IndexColumns::Chunk>();
+ if (!rowset.IsReady()) {
+ return std::nullopt;
+ }
+ while (!rowset.EndOfSet()) {
+ if (columnIdsToDelete.contains(rowset.GetValue<Schema::IndexColumns::ColumnIdx>())) {
+ auto key = TKey{
+ .Index = rowset.GetValue<Schema::IndexColumns::Index>(),
+ .Granule = rowset.GetValue<Schema::IndexColumns::Granule>(),
+ .ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(),
+ .PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(),
+ .TxId = rowset.GetValue<Schema::IndexColumns::TxId>(),
+ .Portion = rowset.GetValue<Schema::IndexColumns::Portion>(),
+ .Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>() };
+ currentBatch.emplace_back(std::move(key));
+ if (currentBatch.size() == maxBatchSize) {
+ result.emplace_back(std::move(currentBatch));
+ currentBatch = TKeyBatch{};
+ }
+ }
+ if (!rowset.Next()) {
+ return std::nullopt;
+ }
+ }
+ if (!currentBatch.empty()) {
+ result.emplace_back(std::move(currentBatch));
+ }
+
+ return result;
+}
+
+} // namespace NKikimr::NOlap::NNormalizer::NSpecialColumns
diff --git a/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h
new file mode 100644
index 0000000000..bac01c3f3d
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h
@@ -0,0 +1,94 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
+
+namespace NKikimr::NOlap::NNormalizer::NSpecialColumns {
+
+class TDeleteTrashImpl: public TNormalizationController::INormalizerComponent {
+public:
+ struct TKey {
+ ui32 Index;
+ ui64 Granule;
+ ui32 ColumnIdx;
+ ui64 PlanStep;
+ ui64 TxId;
+ ui64 Portion;
+ ui32 Chunk;
+ };
+
+ using TKeyBatch = std::vector<TKey>;
+
+private:
+
+ std::optional<std::vector<TKeyBatch>> KeysToDelete(NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize);
+
+ virtual std::set<ui64> GetColumnIdsToDelete() const = 0;
+
+public:
+ TDeleteTrashImpl(const TNormalizationController::TInitContext&) {
+ }
+
+ virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
+ const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
+};
+
+class TRemoveDeleteFlag: public TDeleteTrashImpl {
+private:
+ using TBase = TDeleteTrashImpl;
+public:
+ static TString GetClassNameStatic() {
+ return "RemoveDeleteFlag";
+ }
+
+private:
+ static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveDeleteFlag>(GetClassNameStatic());
+
+ virtual std::set<ui64> GetColumnIdsToDelete() const override {
+ return { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX };
+ }
+
+ virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
+ return {};
+ }
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+
+public:
+ TRemoveDeleteFlag(const TNormalizationController::TInitContext& context)
+ : TBase(context) {
+ }
+};
+
+class TRemoveWriteId: public TDeleteTrashImpl {
+private:
+ using TBase = TDeleteTrashImpl;
+
+public:
+ static TString GetClassNameStatic() {
+ return "RemoveWriteId";
+ }
+
+private:
+ static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveWriteId>(GetClassNameStatic());
+
+ virtual std::set<ui64> GetColumnIdsToDelete() const override {
+ return { NPortion::TSpecialColumns::SPEC_COL_WRITE_ID_INDEX };
+ }
+
+ virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
+ return {};
+ }
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+
+public:
+ TRemoveWriteId(const TNormalizationController::TInitContext& context)
+ : TBase(context)
+ {
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make
index ff813694a9..e53cf5497b 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/ya.make
+++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make
@@ -7,6 +7,7 @@ SRCS(
GLOBAL clean.cpp
GLOBAL clean_empty.cpp
GLOBAL broken_blobs.cpp
+ GLOBAL special_cleaner.cpp
)
PEERDIR(