diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-23 15:33:58 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-23 15:33:58 +0300 |
commit | 7906a59862a305224d8eb58ec9add4fc04d3c25f (patch) | |
tree | 45b7642a0268a4411532f03ac8c633c7e2447c8f | |
parent | b71a866bb25a4da400b09b0dc754094946a9f67d (diff) | |
download | ydb-7906a59862a305224d8eb58ec9add4fc04d3c25f.tar.gz |
KIKIMR-18796:split logic
17 files changed, 286 insertions, 222 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 990415922de..7ae7309501f 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1,5 +1,6 @@ #include "columnshard_impl.h" #include "columnshard_schema.h" +#include "engines/changes/ttl.h" #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/tablet/tablet_counters_protobuf.h> #include <ydb/core/tx/tiering/external_data.h> @@ -83,6 +84,19 @@ bool ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset } +void TColumnShard::TBackgroundController::StartTtl(const NOlap::TColumnEngineChanges& changes) { + const NOlap::TTTLColumnEngineChanges* ttlChanges = dynamic_cast<const NOlap::TTTLColumnEngineChanges*>(&changes); + Y_VERIFY(ttlChanges); + Y_VERIFY(ActiveTtlGranules.empty()); + + for (const auto& portionInfo : ttlChanges->PortionsToDrop) { + ActiveTtlGranules.emplace(portionInfo.Granule()); + } + for (const auto& [portionInfo, _] : ttlChanges->PortionsToEvict) { + ActiveTtlGranules.emplace(portionInfo.Granule()); + } +} + bool TColumnShard::TAlterMeta::Validate(const NOlap::ISnapshotSchema::TPtr& schema) const { switch (Body.TxBody_case()) { case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: @@ -747,13 +761,6 @@ void TColumnShard::SetupCompaction() { LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); - auto& g = compactionInfo->GetObject<NOlap::TGranuleMeta>(); - if (compactionInfo->InGranule()) { - CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); - } else { - CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); - } - auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), limits); if (!indexChanges) { if (!BackgroundController.GetCompactionsCount()) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 79c9f9ccc0f..b5ffb7b7d8b 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -10,9 +10,6 @@ #include "tx_controller.h" #include "inflight_request_tracker.h" #include "counters/columnshard.h" -#include "engines/changes/ttl.h" -#include "engines/changes/compaction.h" -#include "engines/changes/indexation.h" #include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet/tablet_pipe_client_cache.h> @@ -24,6 +21,16 @@ #include <ydb/core/tx/tx_processing.h> #include <ydb/services/metadata/service.h> +namespace NKikimr::NOlap { +class TCleanupColumnEngineChanges; +class TTTLColumnEngineChanges; +class TChangesWithAppend; +class TCompactColumnEngineChanges; +class TInGranuleCompactColumnEngineChanges; +class TSplitCompactColumnEngineChanges; +class TInsertColumnEngineChanges; +} + namespace NKikimr::NColumnShard { extern bool gAllowLogBatchingDefaultValue; @@ -133,6 +140,8 @@ class TColumnShard friend class NOlap::TTTLColumnEngineChanges; friend class NOlap::TChangesWithAppend; friend class NOlap::TCompactColumnEngineChanges; + friend class NOlap::TInGranuleCompactColumnEngineChanges; + friend class NOlap::TSplitCompactColumnEngineChanges; friend class NOlap::TInsertColumnEngineChanges; friend class TTxController; @@ -365,18 +374,7 @@ private: return ActiveCleanup; } - void StartTtl(const NOlap::TColumnEngineChanges& changes) { - const NOlap::TTTLColumnEngineChanges* ttlChanges = dynamic_cast<const NOlap::TTTLColumnEngineChanges*>(&changes); - Y_VERIFY(ttlChanges); - Y_VERIFY(ActiveTtlGranules.empty()); - - for (const auto& portionInfo : ttlChanges->PortionsToDrop) { - ActiveTtlGranules.emplace(portionInfo.Granule()); - } - for (const auto& [portionInfo, _] : ttlChanges->PortionsToEvict) { - ActiveTtlGranules.emplace(portionInfo.Granule()); - } - } + void StartTtl(const NOlap::TColumnEngineChanges& changes); void FinishTtl() { Y_VERIFY(!ActiveTtlGranules.empty()); ActiveTtlGranules.clear(); diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index c771ce04c59..149704ea01b 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -29,7 +29,7 @@ public: TxEvent = std::move(event.TxEvent); auto compactChanges = dynamic_pointer_cast<NOlap::TCompactColumnEngineChanges>(TxEvent->IndexChanges); Y_VERIFY(compactChanges); - IsSplitCurrently = !compactChanges->CompactionInfo->InGranule(); + IsSplitCurrently = compactChanges->IsSplit(); auto& indexChanges = TxEvent->IndexChanges; Y_VERIFY(indexChanges); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 8a1a7fd9515..9c76092b4ec 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -40,6 +40,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { id = granuleRemap[id]; } + const TPortionMeta::EProduced producedClassResultCompaction = GetResultProducedClass(); for (auto& portionInfo : AppendedPortions) { if (granuleRemap.size()) { auto it = granuleRemap.find(portionInfo.Granule()); @@ -50,7 +51,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { TPortionMeta::EProduced produced = TPortionMeta::INSERTED; // If it's a split compaction with moves appended portions are INSERTED (could have overlaps with others) if (PortionsToMove.empty()) { - produced = CompactionInfo->InGranule() ? TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED; + produced = producedClassResultCompaction; } portionInfo.UpdateRecordsMeta(produced); } @@ -178,16 +179,16 @@ void TCompactColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, THashMap<ui64, ui64> TCompactColumnEngineChanges::TmpToNewGranules(TFinalizationContext& context, THashMap<ui64, std::pair<ui64, TMark>>& newGranules) const { THashMap<ui64, ui64> granuleRemap; for (const auto& [mark, counter] : TmpGranuleIds) { - if (mark == SrcGranule->Mark) { + if (mark == SrcGranule.Mark) { Y_VERIFY(!counter); - granuleRemap[counter] = SrcGranule->Granule; + granuleRemap[counter] = SrcGranule.Granule; } else { Y_VERIFY(counter); auto it = granuleRemap.find(counter); if (it == granuleRemap.end()) { it = granuleRemap.emplace(counter, context.NextGranuleId()).first; } - newGranules.emplace(it->second, std::make_pair(SrcGranule->PathId, mark)); + newGranules.emplace(it->second, std::make_pair(SrcGranule.PathId, mark)); } } return granuleRemap; @@ -203,19 +204,12 @@ bool TCompactColumnEngineChanges::IsMovedPortion(const TPortionInfo& info) { } void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { + TBase::DoStart(self); self.BackgroundController.StartCompaction(CompactionInfo->GetPlanCompaction()); } void TCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { - if (CompactionInfo->InGranule()) { - self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL); - self.IncCounter(NColumnShard::COUNTER_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten); - self.IncCounter(NColumnShard::COUNTER_COMPACTION_BYTES_WRITTEN, context.BytesWritten); - } else { - self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL); - self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten); - self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten); - } + TBase::DoWriteIndexComplete(self, context); self.IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds()); } @@ -224,4 +218,30 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T CompactionInfo->GetObject<TGranuleMeta>().AllowedInsertion(); } +ui64 TCompactColumnEngineChanges::SetTmpGranule(ui64 pathId, const TMark& mark) { + Y_VERIFY(pathId == SrcGranule.PathId); + if (!TmpGranuleIds.contains(mark)) { + TmpGranuleIds[mark] = FirstGranuleId; + ++FirstGranuleId; + } + return TmpGranuleIds[mark]; +} + +TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info, const TCompactionSrcGranule& srcGranule) + : Limits(limits) + , CompactionInfo(std::move(info)) + , SrcGranule(srcGranule) { + Y_VERIFY(CompactionInfo); + const auto& granuleInfo = CompactionInfo->GetObject<TGranuleMeta>(); + + SwitchedPortions.reserve(granuleInfo.GetPortions().size()); + for (const auto& [_, portionInfo] : granuleInfo.GetPortions()) { + if (portionInfo.IsActive()) { + SwitchedPortions.push_back(portionInfo); + Y_VERIFY(portionInfo.Granule() == granuleInfo.GetGranuleId()); + } + } + Y_VERIFY(SwitchedPortions.size()); +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h index 0b5e50e350f..01cbc999b3d 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction.h @@ -12,6 +12,10 @@ private: using TBase = TChangesWithAppend; THashMap<TMark, ui32> TmpGranuleIds; // mark -> tmp granule id protected: + const TCompactionLimits Limits; + std::unique_ptr<TCompactionInfo> CompactionInfo; + TCompactionSrcGranule SrcGranule; + virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override; @@ -19,16 +23,9 @@ protected: virtual void DoDebugString(TStringOutput& out) const override; virtual void DoCompile(TFinalizationContext& context) override; virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override; + virtual TPortionMeta::EProduced GetResultProducedClass() const = 0; public: - struct TSrcGranule { - ui64 PathId{ 0 }; - ui64 Granule{ 0 }; - TMark Mark; - - TSrcGranule(ui64 pathId, ui64 granule, const TMark& mark) - : PathId(pathId), Granule(granule), Mark(mark) { - } - }; + virtual bool IsSplit() const = 0; const THashMap<TMark, ui32>& GetTmpGranuleIds() const { return TmpGranuleIds; @@ -36,28 +33,12 @@ public: virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const override; - const TCompactionLimits Limits; - std::unique_ptr<TCompactionInfo> CompactionInfo; std::vector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule} std::vector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones - std::optional<TSrcGranule> SrcGranule; - TMarksGranules MergeBorders; - TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info) - : Limits(limits) - , CompactionInfo(std::move(info)) - { - Y_VERIFY(CompactionInfo); - } + TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info, const TCompactionSrcGranule& srcGranule); - ui64 SetTmpGranule(ui64 pathId, const TMark& mark) { - Y_VERIFY(pathId == SrcGranule->PathId); - if (!TmpGranuleIds.contains(mark)) { - TmpGranuleIds[mark] = FirstGranuleId; - ++FirstGranuleId; - } - return TmpGranuleIds[mark]; - } + ui64 SetTmpGranule(ui64 pathId, const TMark& mark); THashMap<ui64, ui64> TmpToNewGranules(TFinalizationContext& context, THashMap<ui64, std::pair<ui64, TMark>>& newGranules) const; @@ -73,10 +54,6 @@ public: ui32 NumSplitInto(const ui32 srcRows) const; bool IsMovedPortion(const TPortionInfo& info); - - virtual TString TypeString() const override { - return CompactionInfo->InGranule() ? "compaction in granule" : "compaction split granule"; - } }; } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction_info.h b/ydb/core/tx/columnshard/engines/changes/compaction_info.h index 9ad1d657e74..bc69904ce0b 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction_info.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction_info.h @@ -1,4 +1,5 @@ #pragma once +#include "mark.h" #include <util/generic/string.h> #include <util/system/yassert.h> #include <util/stream/output.h> @@ -100,4 +101,14 @@ public: } }; +struct TCompactionSrcGranule { + ui64 PathId = 0; + ui64 Granule = 0; + TMark Mark; + + TCompactionSrcGranule(ui64 pathId, ui64 granule, const TMark& mark) + : PathId(pathId), Granule(granule), Mark(mark) { + } +}; + } diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp index d50e17de65a..7a76ab4aa15 100644 --- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp @@ -1,7 +1,119 @@ #include "in_granule_compaction.h" +#include <ydb/core/tx/columnshard/columnshard_impl.h> +#include <ydb/core/tx/columnshard/engines/storage/granule.h> namespace NKikimr::NOlap { +namespace { + +TConclusionStatus InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& portions, const TCompactionLimits& limits, + TMarksGranules& marksGranules) { + ui32 insertedCount = 0; + + THashSet<ui64> filtered; + THashSet<ui64> goodCompacted; + THashSet<ui64> nextToGood; + { + TMap<NArrow::TReplaceKey, std::vector<const TPortionInfo*>> points; + + for (const auto& portionInfo : portions) { + if (portionInfo.IsInserted()) { + ++insertedCount; + } else if (portionInfo.BlobsSizes().second >= limits.GoodBlobSize) { + goodCompacted.insert(portionInfo.Portion()); + } + + const NArrow::TReplaceKey& start = portionInfo.IndexKeyStart(); + const NArrow::TReplaceKey& end = portionInfo.IndexKeyEnd(); + + points[start].push_back(&portionInfo); + points[end].push_back(nullptr); + } + + ui32 countInBucket = 0; + ui64 bucketStartPortion = 0; + bool isGood = false; + int sum = 0; + for (const auto& [key, vec] : points) { + for (const auto* portionInfo : vec) { + if (portionInfo) { + ++sum; + ui64 currentPortion = portionInfo->Portion(); + if (!bucketStartPortion) { + bucketStartPortion = currentPortion; + } + ++countInBucket; + + ui64 prevIsGood = isGood; + isGood = goodCompacted.contains(currentPortion); + if (prevIsGood && !isGood) { + nextToGood.insert(currentPortion); + } + } else { + --sum; + } + } + + if (!sum) { // count(start) == count(end), start new range + Y_VERIFY(bucketStartPortion); + + if (countInBucket == 1) { + // We do not want to merge big compacted portions with inserted ones if there's no intersections. + if (isGood) { + filtered.insert(bucketStartPortion); + } + } + countInBucket = 0; + bucketStartPortion = 0; + } + } + } + + Y_VERIFY(insertedCount); + + // Nothing to filter. Leave portions as is, no borders needed. + if (filtered.empty() && goodCompacted.empty()) { + return TConclusionStatus::Success(); + } + + // It's a map for SliceIntoGranules(). We use fake granule ids here to slice batch with borders. + // We could merge inserted portions alltogether and slice result with filtered borders to prevent intersections. + std::vector<TMark> borders; + borders.push_back(granuleMark); + + std::vector<TPortionInfo> tmp; + tmp.reserve(portions.size()); + for (auto& portionInfo : portions) { + ui64 curPortion = portionInfo.Portion(); + + // Prevent merge of compacted portions with no intersections + if (filtered.contains(curPortion)) { + const auto& start = portionInfo.IndexKeyStart(); + borders.emplace_back(TMark(start)); + } else { + // nextToGood borders potentially split good compacted portions into 2 parts: + // the first one without intersections and the second with them + if (goodCompacted.contains(curPortion) || nextToGood.contains(curPortion)) { + const auto& start = portionInfo.IndexKeyStart(); + borders.emplace_back(TMark(start)); + } + + tmp.emplace_back(std::move(portionInfo)); + } + } + tmp.swap(portions); + + if (borders.size() == 1) { + Y_VERIFY(borders[0] == granuleMark); + borders.clear(); + } + + marksGranules = TMarksGranules(std::move(borders)); + return TConclusionStatus::Success(); +} + +} // namespace + std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColumnEngineChanges::CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, TConstructionContext& context) const { @@ -29,7 +141,7 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColum } TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { - const ui64 pathId = SrcGranule->PathId; + const ui64 pathId = SrcGranule.PathId; std::vector<TString> blobs; auto& switchedPortions = SwitchedPortions; Y_VERIFY(switchedPortions.size()); @@ -65,4 +177,18 @@ TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstr return blobs; } +void TInGranuleCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { + TBase::DoWriteIndexComplete(self, context); + self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL); + self.IncCounter(NColumnShard::COUNTER_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten); + self.IncCounter(NColumnShard::COUNTER_COMPACTION_BYTES_WRITTEN, context.BytesWritten); +} + +void TInGranuleCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { + TBase::DoStart(self); + auto& g = CompactionInfo->GetObject<NOlap::TGranuleMeta>(); + self.CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); + Y_VERIFY(InitInGranuleMerge(SrcGranule.Mark, SwitchedPortions, Limits, MergeBorders).Ok()); +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h index 5a1cba91c9f..aecc121a688 100644 --- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h @@ -11,10 +11,25 @@ private: std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, TConstructionContext& context) const; + TMarksGranules MergeBorders; + + virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; + protected: virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; + virtual TPortionMeta::EProduced GetResultProducedClass() const override { + return TPortionMeta::COMPACTED; + } + virtual void DoStart(NColumnShard::TColumnShard& self) override; public: + virtual bool IsSplit() const override { + return false; + } using TBase::TBase; + + virtual TString TypeString() const override { + return "INTERNAL_COMPACTION"; + } }; } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 83b6f562196..fe4c8b7ca79 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -1,4 +1,5 @@ #include "indexation.h" +#include "mark_granules.h" #include <ydb/core/tx/columnshard/blob_cache.h> #include <ydb/core/protos/counters_columnshard.pb.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp index cf19d14241b..6db2a5a988d 100644 --- a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp @@ -1,10 +1,12 @@ #include "split_compaction.h" +#include <ydb/core/tx/columnshard/columnshard_impl.h> +#include <ydb/core/tx/columnshard/engines/storage/granule.h> namespace NKikimr::NOlap { TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { - const ui64 pathId = SrcGranule->PathId; - const TMark ts0 = SrcGranule->Mark; + const ui64 pathId = SrcGranule.PathId; + const TMark ts0 = SrcGranule.Mark; std::vector<TPortionInfo>& portions = SwitchedPortions; std::vector<std::pair<TMark, ui64>> tsIds; @@ -388,4 +390,17 @@ ui64 TSplitCompactColumnEngineChanges::TryMovePortions(const TMark& ts0, std::ve return numRows; } +void TSplitCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { + TBase::DoWriteIndexComplete(self, context); + self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL); + self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten); + self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten); +} + +void TSplitCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { + TBase::DoStart(self); + auto& g = CompactionInfo->GetObject<TGranuleMeta>(); + self.CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.h b/ydb/core/tx/columnshard/engines/changes/split_compaction.h index aed71cf5630..1473274e4c5 100644 --- a/ydb/core/tx/columnshard/engines/changes/split_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.h @@ -18,10 +18,22 @@ private: std::vector<std::pair<TMark, ui64>>& tsIds, std::vector<std::pair<TPortionInfo, ui64>>& toMove, TConstructionContext& context); + virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; protected: virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; + virtual TPortionMeta::EProduced GetResultProducedClass() const override { + return TPortionMeta::SPLIT_COMPACTED; + } + virtual void DoStart(NColumnShard::TColumnShard& self) override; public: + virtual bool IsSplit() const override { + return true; + } using TBase::TBase; + + virtual TString TypeString() const override { + return "SPLIT_COMPACTION"; + } }; } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index f258f9a834e..d526da6f5b7 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -10,6 +10,12 @@ protected: virtual void DoCompile(TFinalizationContext& context) override; virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override; virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; + virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& /*context*/) override { + + } + virtual void DoStart(NColumnShard::TColumnShard& /*self*/) override { + + } std::vector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 9175a753612..20ba4fb8852 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -16,123 +16,13 @@ namespace NKikimr::NOlap { -namespace { - -TConclusionStatus InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& portions, const TCompactionLimits& limits, - TMarksGranules& marksGranules) { - ui32 insertedCount = 0; - - THashSet<ui64> filtered; - THashSet<ui64> goodCompacted; - THashSet<ui64> nextToGood; - { - TMap<NArrow::TReplaceKey, std::vector<const TPortionInfo*>> points; - - for (const auto& portionInfo : portions) { - if (portionInfo.IsInserted()) { - ++insertedCount; - } else if (portionInfo.BlobsSizes().second >= limits.GoodBlobSize) { - goodCompacted.insert(portionInfo.Portion()); - } - - const NArrow::TReplaceKey& start = portionInfo.IndexKeyStart(); - const NArrow::TReplaceKey& end = portionInfo.IndexKeyEnd(); - - points[start].push_back(&portionInfo); - points[end].push_back(nullptr); - } - - ui32 countInBucket = 0; - ui64 bucketStartPortion = 0; - bool isGood = false; - int sum = 0; - for (const auto& [key, vec] : points) { - for (const auto* portionInfo : vec) { - if (portionInfo) { - ++sum; - ui64 currentPortion = portionInfo->Portion(); - if (!bucketStartPortion) { - bucketStartPortion = currentPortion; - } - ++countInBucket; - - ui64 prevIsGood = isGood; - isGood = goodCompacted.contains(currentPortion); - if (prevIsGood && !isGood) { - nextToGood.insert(currentPortion); - } - } else { - --sum; - } - } - - if (!sum) { // count(start) == count(end), start new range - Y_VERIFY(bucketStartPortion); - - if (countInBucket == 1) { - // We do not want to merge big compacted portions with inserted ones if there's no intersections. - if (isGood) { - filtered.insert(bucketStartPortion); - } - } - countInBucket = 0; - bucketStartPortion = 0; - } - } - } - - Y_VERIFY(insertedCount); - - // Nothing to filter. Leave portions as is, no borders needed. - if (filtered.empty() && goodCompacted.empty()) { - return TConclusionStatus::Success(); - } - - // It's a map for SliceIntoGranules(). We use fake granule ids here to slice batch with borders. - // We could merge inserted portions alltogether and slice result with filtered borders to prevent intersections. - std::vector<TMark> borders; - borders.push_back(granuleMark); - - std::vector<TPortionInfo> tmp; - tmp.reserve(portions.size()); - for (auto& portionInfo : portions) { - ui64 curPortion = portionInfo.Portion(); - - // Prevent merge of compacted portions with no intersections - if (filtered.contains(curPortion)) { - const auto& start = portionInfo.IndexKeyStart(); - borders.emplace_back(TMark(start)); - } else { - // nextToGood borders potentially split good compacted portions into 2 parts: - // the first one without intersections and the second with them - if (goodCompacted.contains(curPortion) || nextToGood.contains(curPortion)) { - const auto& start = portionInfo.IndexKeyStart(); - borders.emplace_back(TMark(start)); - } - - tmp.emplace_back(std::move(portionInfo)); - } - } - tmp.swap(portions); - - if (borders.size() == 1) { - Y_VERIFY(borders[0] == granuleMark); - borders.clear(); - } - - marksGranules = TMarksGranules(std::move(borders)); - return TConclusionStatus::Success(); -} - -} // namespace - std::shared_ptr<NKikimr::NOlap::TCompactColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildCompactionChanges(std::unique_ptr<TCompactionInfo>&& info, - const TCompactionLimits& limits, const TSnapshot& initSnapshot) { + const TCompactionLimits& limits, const TSnapshot& initSnapshot, const TCompactionSrcGranule& srcGranule) { std::shared_ptr<TCompactColumnEngineChanges> result; if (info->InGranule()) { - result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, std::move(info)); + result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, std::move(info), srcGranule); } else { - result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, std::move(info)); + result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, std::move(info), srcGranule); } result->InitSnapshot = initSnapshot; return result; @@ -417,41 +307,20 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(co std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits) { - auto changes = TChangesConstructor::BuildCompactionChanges(std::move(info), limits, LastSnapshot); - - const auto& granuleInfo = changes->CompactionInfo->GetObject<TGranuleMeta>(); - - changes->SwitchedPortions.reserve(granuleInfo.GetPortions().size()); - // Collect active portions for the granule. - for (const auto& [_, portionInfo] : granuleInfo.GetPortions()) { - if (portionInfo.IsActive()) { - changes->SwitchedPortions.push_back(portionInfo); - Y_VERIFY(portionInfo.Granule() == granuleInfo.GetGranuleId()); - } - } - - const ui64 pathId = granuleInfo.Record.PathId; + const ui64 pathId = info->GetPlanCompaction().GetPathId(); Y_VERIFY(PathGranules.contains(pathId)); - // Locate mark for the granule. - for (const auto& [mark, pathGranule] : PathGranules[pathId]) { - if (pathGranule == granuleInfo.GetGranuleId()) { - changes->SrcGranule = TCompactColumnEngineChanges::TSrcGranule(pathId, granuleInfo.GetGranuleId(), mark); - break; - } - } - Y_VERIFY(changes->SrcGranule); - if (changes->CompactionInfo->InGranule()) { - auto mergeInitResult = InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, limits, changes->MergeBorders); - if (!mergeInitResult) { - // Return granule to Compaction list. This is equal to single compaction worker behavior. - changes->CompactionInfo->CompactionCanceled("cannot init in granule merge: " + mergeInitResult.GetErrorMessage()); - return {}; + auto& g = info->GetObject<TGranuleMeta>(); + for (const auto& [mark, pathGranule] : PathGranules[pathId]) { + if (pathGranule == g.GetGranuleId()) { + TCompactionSrcGranule srcGranule = TCompactionSrcGranule(pathId, g.GetGranuleId(), mark); + auto changes = TChangesConstructor::BuildCompactionChanges(std::move(info), limits, LastSnapshot, srcGranule); + NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes); + return changes; } } - NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes); - Y_VERIFY(!changes->SwitchedPortions.empty()); - return changes; + Y_VERIFY(false); + return nullptr; } std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 54192815f65..0f9ab568bdf 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -6,6 +6,9 @@ #include <ydb/core/tx/columnshard/counters/engine_logs.h> #include "storage/granule.h" #include "storage/storage.h" +#include "changes/indexation.h" +#include "changes/compaction.h" +#include "changes/ttl.h" namespace NKikimr::NArrow { struct TSortDescription; @@ -39,7 +42,7 @@ public: std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot); static std::shared_ptr<TCompactColumnEngineChanges> BuildCompactionChanges(std::unique_ptr<TCompactionInfo>&& info, - const TCompactionLimits& limits, const TSnapshot& initSnapshot); + const TCompactionLimits& limits, const TSnapshot& initSnapshot, const TCompactionSrcGranule& srcGranules); static std::shared_ptr<TCleanupColumnEngineChanges> BuildCleanupChanges(const TSnapshot& initSnapshot); diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 914ef584cb0..39098c91a41 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -291,9 +291,9 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T const TExpected& expected) { auto compactionInfo = engine.Compact(TestLimits(), {}); UNIT_ASSERT(!!compactionInfo); - UNIT_ASSERT(!compactionInfo->InGranule()); std::shared_ptr<TCompactColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TestLimits()); + UNIT_ASSERT(changes->IsSplit()); UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions); changes->SetBlobs(std::move(blobs)); changes->StartEmergency(); diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.cpp b/ydb/core/tx/columnshard/hooks/testing/controller.cpp index 9868ac01378..2557103c73e 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.cpp +++ b/ydb/core/tx/columnshard/hooks/testing/controller.cpp @@ -3,6 +3,8 @@ #include <ydb/core/tx/columnshard/engines/reader/order_control/default.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <ydb/core/tx/columnshard/engines/changes/compaction.h> +#include <ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h> +#include <ydb/core/tx/columnshard/engines/changes/split_compaction.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::NYDBTest::NColumnShard { @@ -30,12 +32,11 @@ bool TController::DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordB } bool TController::DoOnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& changes) { - if (auto compaction = dynamic_pointer_cast<NOlap::TCompactColumnEngineChanges>(changes)) { - if (compaction->CompactionInfo->InGranule()) { - InternalCompactions.Inc(); - } else { - SplitCompactions.Inc(); - } + if (dynamic_pointer_cast<NOlap::TInGranuleCompactColumnEngineChanges>(changes)) { + InternalCompactions.Inc(); + } + if (dynamic_pointer_cast<NOlap::TSplitCompactColumnEngineChanges>(changes)) { + SplitCompactions.Inc(); } return true; } diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index bc2912c0bfe..fb7f456f368 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -5,6 +5,9 @@ #include <arrow/api.h> #include <arrow/ipc/reader.h> #include <ydb/library/yverify_stream/yverify_stream.h> +#include <ydb/core/tx/columnshard/engines/changes/with_appended.h> +#include <ydb/core/tx/columnshard/engines/changes/compaction.h> +#include <ydb/core/tx/columnshard/engines/changes/cleanup.h> namespace NKikimr { |