aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-23 15:33:58 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-23 15:33:58 +0300
commit7906a59862a305224d8eb58ec9add4fc04d3c25f (patch)
tree45b7642a0268a4411532f03ac8c633c7e2447c8f
parentb71a866bb25a4da400b09b0dc754094946a9f67d (diff)
downloadydb-7906a59862a305224d8eb58ec9add4fc04d3c25f.tar.gz
KIKIMR-18796:split logic
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp21
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h28
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.cpp46
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.h39
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction_info.h11
-rw-r--r--ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp128
-rw-r--r--ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h15
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/split_compaction.cpp19
-rw-r--r--ydb/core/tx/columnshard/engines/changes/split_compaction.h12
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h6
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp157
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h5
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp2
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.cpp13
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp3
17 files changed, 286 insertions, 222 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 990415922de..7ae7309501f 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -1,5 +1,6 @@
#include "columnshard_impl.h"
#include "columnshard_schema.h"
+#include "engines/changes/ttl.h"
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tablet/tablet_counters_protobuf.h>
#include <ydb/core/tx/tiering/external_data.h>
@@ -83,6 +84,19 @@ bool ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset
}
+void TColumnShard::TBackgroundController::StartTtl(const NOlap::TColumnEngineChanges& changes) {
+ const NOlap::TTTLColumnEngineChanges* ttlChanges = dynamic_cast<const NOlap::TTTLColumnEngineChanges*>(&changes);
+ Y_VERIFY(ttlChanges);
+ Y_VERIFY(ActiveTtlGranules.empty());
+
+ for (const auto& portionInfo : ttlChanges->PortionsToDrop) {
+ ActiveTtlGranules.emplace(portionInfo.Granule());
+ }
+ for (const auto& [portionInfo, _] : ttlChanges->PortionsToEvict) {
+ ActiveTtlGranules.emplace(portionInfo.Granule());
+ }
+}
+
bool TColumnShard::TAlterMeta::Validate(const NOlap::ISnapshotSchema::TPtr& schema) const {
switch (Body.TxBody_case()) {
case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
@@ -747,13 +761,6 @@ void TColumnShard::SetupCompaction() {
LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID());
- auto& g = compactionInfo->GetObject<NOlap::TGranuleMeta>();
- if (compactionInfo->InGranule()) {
- CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
- } else {
- CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
- }
-
auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), limits);
if (!indexChanges) {
if (!BackgroundController.GetCompactionsCount()) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 79c9f9ccc0f..b5ffb7b7d8b 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -10,9 +10,6 @@
#include "tx_controller.h"
#include "inflight_request_tracker.h"
#include "counters/columnshard.h"
-#include "engines/changes/ttl.h"
-#include "engines/changes/compaction.h"
-#include "engines/changes/indexation.h"
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
@@ -24,6 +21,16 @@
#include <ydb/core/tx/tx_processing.h>
#include <ydb/services/metadata/service.h>
+namespace NKikimr::NOlap {
+class TCleanupColumnEngineChanges;
+class TTTLColumnEngineChanges;
+class TChangesWithAppend;
+class TCompactColumnEngineChanges;
+class TInGranuleCompactColumnEngineChanges;
+class TSplitCompactColumnEngineChanges;
+class TInsertColumnEngineChanges;
+}
+
namespace NKikimr::NColumnShard {
extern bool gAllowLogBatchingDefaultValue;
@@ -133,6 +140,8 @@ class TColumnShard
friend class NOlap::TTTLColumnEngineChanges;
friend class NOlap::TChangesWithAppend;
friend class NOlap::TCompactColumnEngineChanges;
+ friend class NOlap::TInGranuleCompactColumnEngineChanges;
+ friend class NOlap::TSplitCompactColumnEngineChanges;
friend class NOlap::TInsertColumnEngineChanges;
friend class TTxController;
@@ -365,18 +374,7 @@ private:
return ActiveCleanup;
}
- void StartTtl(const NOlap::TColumnEngineChanges& changes) {
- const NOlap::TTTLColumnEngineChanges* ttlChanges = dynamic_cast<const NOlap::TTTLColumnEngineChanges*>(&changes);
- Y_VERIFY(ttlChanges);
- Y_VERIFY(ActiveTtlGranules.empty());
-
- for (const auto& portionInfo : ttlChanges->PortionsToDrop) {
- ActiveTtlGranules.emplace(portionInfo.Granule());
- }
- for (const auto& [portionInfo, _] : ttlChanges->PortionsToEvict) {
- ActiveTtlGranules.emplace(portionInfo.Granule());
- }
- }
+ void StartTtl(const NOlap::TColumnEngineChanges& changes);
void FinishTtl() {
Y_VERIFY(!ActiveTtlGranules.empty());
ActiveTtlGranules.clear();
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index c771ce04c59..149704ea01b 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -29,7 +29,7 @@ public:
TxEvent = std::move(event.TxEvent);
auto compactChanges = dynamic_pointer_cast<NOlap::TCompactColumnEngineChanges>(TxEvent->IndexChanges);
Y_VERIFY(compactChanges);
- IsSplitCurrently = !compactChanges->CompactionInfo->InGranule();
+ IsSplitCurrently = compactChanges->IsSplit();
auto& indexChanges = TxEvent->IndexChanges;
Y_VERIFY(indexChanges);
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
index 8a1a7fd9515..9c76092b4ec 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
@@ -40,6 +40,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
id = granuleRemap[id];
}
+ const TPortionMeta::EProduced producedClassResultCompaction = GetResultProducedClass();
for (auto& portionInfo : AppendedPortions) {
if (granuleRemap.size()) {
auto it = granuleRemap.find(portionInfo.Granule());
@@ -50,7 +51,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
TPortionMeta::EProduced produced = TPortionMeta::INSERTED;
// If it's a split compaction with moves appended portions are INSERTED (could have overlaps with others)
if (PortionsToMove.empty()) {
- produced = CompactionInfo->InGranule() ? TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED;
+ produced = producedClassResultCompaction;
}
portionInfo.UpdateRecordsMeta(produced);
}
@@ -178,16 +179,16 @@ void TCompactColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self,
THashMap<ui64, ui64> TCompactColumnEngineChanges::TmpToNewGranules(TFinalizationContext& context, THashMap<ui64, std::pair<ui64, TMark>>& newGranules) const {
THashMap<ui64, ui64> granuleRemap;
for (const auto& [mark, counter] : TmpGranuleIds) {
- if (mark == SrcGranule->Mark) {
+ if (mark == SrcGranule.Mark) {
Y_VERIFY(!counter);
- granuleRemap[counter] = SrcGranule->Granule;
+ granuleRemap[counter] = SrcGranule.Granule;
} else {
Y_VERIFY(counter);
auto it = granuleRemap.find(counter);
if (it == granuleRemap.end()) {
it = granuleRemap.emplace(counter, context.NextGranuleId()).first;
}
- newGranules.emplace(it->second, std::make_pair(SrcGranule->PathId, mark));
+ newGranules.emplace(it->second, std::make_pair(SrcGranule.PathId, mark));
}
}
return granuleRemap;
@@ -203,19 +204,12 @@ bool TCompactColumnEngineChanges::IsMovedPortion(const TPortionInfo& info) {
}
void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
+ TBase::DoStart(self);
self.BackgroundController.StartCompaction(CompactionInfo->GetPlanCompaction());
}
void TCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
- if (CompactionInfo->InGranule()) {
- self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL);
- self.IncCounter(NColumnShard::COUNTER_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
- self.IncCounter(NColumnShard::COUNTER_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
- } else {
- self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
- self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
- self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
- }
+ TBase::DoWriteIndexComplete(self, context);
self.IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds());
}
@@ -224,4 +218,30 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T
CompactionInfo->GetObject<TGranuleMeta>().AllowedInsertion();
}
+ui64 TCompactColumnEngineChanges::SetTmpGranule(ui64 pathId, const TMark& mark) {
+ Y_VERIFY(pathId == SrcGranule.PathId);
+ if (!TmpGranuleIds.contains(mark)) {
+ TmpGranuleIds[mark] = FirstGranuleId;
+ ++FirstGranuleId;
+ }
+ return TmpGranuleIds[mark];
+}
+
+TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info, const TCompactionSrcGranule& srcGranule)
+ : Limits(limits)
+ , CompactionInfo(std::move(info))
+ , SrcGranule(srcGranule) {
+ Y_VERIFY(CompactionInfo);
+ const auto& granuleInfo = CompactionInfo->GetObject<TGranuleMeta>();
+
+ SwitchedPortions.reserve(granuleInfo.GetPortions().size());
+ for (const auto& [_, portionInfo] : granuleInfo.GetPortions()) {
+ if (portionInfo.IsActive()) {
+ SwitchedPortions.push_back(portionInfo);
+ Y_VERIFY(portionInfo.Granule() == granuleInfo.GetGranuleId());
+ }
+ }
+ Y_VERIFY(SwitchedPortions.size());
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h
index 0b5e50e350f..01cbc999b3d 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.h
@@ -12,6 +12,10 @@ private:
using TBase = TChangesWithAppend;
THashMap<TMark, ui32> TmpGranuleIds; // mark -> tmp granule id
protected:
+ const TCompactionLimits Limits;
+ std::unique_ptr<TCompactionInfo> CompactionInfo;
+ TCompactionSrcGranule SrcGranule;
+
virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
@@ -19,16 +23,9 @@ protected:
virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoCompile(TFinalizationContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override;
+ virtual TPortionMeta::EProduced GetResultProducedClass() const = 0;
public:
- struct TSrcGranule {
- ui64 PathId{ 0 };
- ui64 Granule{ 0 };
- TMark Mark;
-
- TSrcGranule(ui64 pathId, ui64 granule, const TMark& mark)
- : PathId(pathId), Granule(granule), Mark(mark) {
- }
- };
+ virtual bool IsSplit() const = 0;
const THashMap<TMark, ui32>& GetTmpGranuleIds() const {
return TmpGranuleIds;
@@ -36,28 +33,12 @@ public:
virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const override;
- const TCompactionLimits Limits;
- std::unique_ptr<TCompactionInfo> CompactionInfo;
std::vector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule}
std::vector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones
- std::optional<TSrcGranule> SrcGranule;
- TMarksGranules MergeBorders;
- TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info)
- : Limits(limits)
- , CompactionInfo(std::move(info))
- {
- Y_VERIFY(CompactionInfo);
- }
+ TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info, const TCompactionSrcGranule& srcGranule);
- ui64 SetTmpGranule(ui64 pathId, const TMark& mark) {
- Y_VERIFY(pathId == SrcGranule->PathId);
- if (!TmpGranuleIds.contains(mark)) {
- TmpGranuleIds[mark] = FirstGranuleId;
- ++FirstGranuleId;
- }
- return TmpGranuleIds[mark];
- }
+ ui64 SetTmpGranule(ui64 pathId, const TMark& mark);
THashMap<ui64, ui64> TmpToNewGranules(TFinalizationContext& context, THashMap<ui64, std::pair<ui64, TMark>>& newGranules) const;
@@ -73,10 +54,6 @@ public:
ui32 NumSplitInto(const ui32 srcRows) const;
bool IsMovedPortion(const TPortionInfo& info);
-
- virtual TString TypeString() const override {
- return CompactionInfo->InGranule() ? "compaction in granule" : "compaction split granule";
- }
};
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction_info.h b/ydb/core/tx/columnshard/engines/changes/compaction_info.h
index 9ad1d657e74..bc69904ce0b 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction_info.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction_info.h
@@ -1,4 +1,5 @@
#pragma once
+#include "mark.h"
#include <util/generic/string.h>
#include <util/system/yassert.h>
#include <util/stream/output.h>
@@ -100,4 +101,14 @@ public:
}
};
+struct TCompactionSrcGranule {
+ ui64 PathId = 0;
+ ui64 Granule = 0;
+ TMark Mark;
+
+ TCompactionSrcGranule(ui64 pathId, ui64 granule, const TMark& mark)
+ : PathId(pathId), Granule(granule), Mark(mark) {
+ }
+};
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
index d50e17de65a..7a76ab4aa15 100644
--- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
@@ -1,7 +1,119 @@
#include "in_granule_compaction.h"
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/engines/storage/granule.h>
namespace NKikimr::NOlap {
+namespace {
+
+TConclusionStatus InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& portions, const TCompactionLimits& limits,
+ TMarksGranules& marksGranules) {
+ ui32 insertedCount = 0;
+
+ THashSet<ui64> filtered;
+ THashSet<ui64> goodCompacted;
+ THashSet<ui64> nextToGood;
+ {
+ TMap<NArrow::TReplaceKey, std::vector<const TPortionInfo*>> points;
+
+ for (const auto& portionInfo : portions) {
+ if (portionInfo.IsInserted()) {
+ ++insertedCount;
+ } else if (portionInfo.BlobsSizes().second >= limits.GoodBlobSize) {
+ goodCompacted.insert(portionInfo.Portion());
+ }
+
+ const NArrow::TReplaceKey& start = portionInfo.IndexKeyStart();
+ const NArrow::TReplaceKey& end = portionInfo.IndexKeyEnd();
+
+ points[start].push_back(&portionInfo);
+ points[end].push_back(nullptr);
+ }
+
+ ui32 countInBucket = 0;
+ ui64 bucketStartPortion = 0;
+ bool isGood = false;
+ int sum = 0;
+ for (const auto& [key, vec] : points) {
+ for (const auto* portionInfo : vec) {
+ if (portionInfo) {
+ ++sum;
+ ui64 currentPortion = portionInfo->Portion();
+ if (!bucketStartPortion) {
+ bucketStartPortion = currentPortion;
+ }
+ ++countInBucket;
+
+ ui64 prevIsGood = isGood;
+ isGood = goodCompacted.contains(currentPortion);
+ if (prevIsGood && !isGood) {
+ nextToGood.insert(currentPortion);
+ }
+ } else {
+ --sum;
+ }
+ }
+
+ if (!sum) { // count(start) == count(end), start new range
+ Y_VERIFY(bucketStartPortion);
+
+ if (countInBucket == 1) {
+ // We do not want to merge big compacted portions with inserted ones if there's no intersections.
+ if (isGood) {
+ filtered.insert(bucketStartPortion);
+ }
+ }
+ countInBucket = 0;
+ bucketStartPortion = 0;
+ }
+ }
+ }
+
+ Y_VERIFY(insertedCount);
+
+ // Nothing to filter. Leave portions as is, no borders needed.
+ if (filtered.empty() && goodCompacted.empty()) {
+ return TConclusionStatus::Success();
+ }
+
+ // It's a map for SliceIntoGranules(). We use fake granule ids here to slice batch with borders.
+ // We could merge inserted portions alltogether and slice result with filtered borders to prevent intersections.
+ std::vector<TMark> borders;
+ borders.push_back(granuleMark);
+
+ std::vector<TPortionInfo> tmp;
+ tmp.reserve(portions.size());
+ for (auto& portionInfo : portions) {
+ ui64 curPortion = portionInfo.Portion();
+
+ // Prevent merge of compacted portions with no intersections
+ if (filtered.contains(curPortion)) {
+ const auto& start = portionInfo.IndexKeyStart();
+ borders.emplace_back(TMark(start));
+ } else {
+ // nextToGood borders potentially split good compacted portions into 2 parts:
+ // the first one without intersections and the second with them
+ if (goodCompacted.contains(curPortion) || nextToGood.contains(curPortion)) {
+ const auto& start = portionInfo.IndexKeyStart();
+ borders.emplace_back(TMark(start));
+ }
+
+ tmp.emplace_back(std::move(portionInfo));
+ }
+ }
+ tmp.swap(portions);
+
+ if (borders.size() == 1) {
+ Y_VERIFY(borders[0] == granuleMark);
+ borders.clear();
+ }
+
+ marksGranules = TMarksGranules(std::move(borders));
+ return TConclusionStatus::Success();
+}
+
+} // namespace
+
std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColumnEngineChanges::CompactInOneGranule(ui64 granule,
const std::vector<TPortionInfo>& portions,
const THashMap<TBlobRange, TString>& blobs, TConstructionContext& context) const {
@@ -29,7 +141,7 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColum
}
TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
- const ui64 pathId = SrcGranule->PathId;
+ const ui64 pathId = SrcGranule.PathId;
std::vector<TString> blobs;
auto& switchedPortions = SwitchedPortions;
Y_VERIFY(switchedPortions.size());
@@ -65,4 +177,18 @@ TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstr
return blobs;
}
+void TInGranuleCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
+ TBase::DoWriteIndexComplete(self, context);
+ self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL);
+ self.IncCounter(NColumnShard::COUNTER_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
+ self.IncCounter(NColumnShard::COUNTER_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
+}
+
+void TInGranuleCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
+ TBase::DoStart(self);
+ auto& g = CompactionInfo->GetObject<NOlap::TGranuleMeta>();
+ self.CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
+ Y_VERIFY(InitInGranuleMerge(SrcGranule.Mark, SwitchedPortions, Limits, MergeBorders).Ok());
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h
index 5a1cba91c9f..aecc121a688 100644
--- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h
@@ -11,10 +11,25 @@ private:
std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> CompactInOneGranule(ui64 granule,
const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs,
TConstructionContext& context) const;
+ TMarksGranules MergeBorders;
+
+ virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
+
protected:
virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override;
+ virtual TPortionMeta::EProduced GetResultProducedClass() const override {
+ return TPortionMeta::COMPACTED;
+ }
+ virtual void DoStart(NColumnShard::TColumnShard& self) override;
public:
+ virtual bool IsSplit() const override {
+ return false;
+ }
using TBase::TBase;
+
+ virtual TString TypeString() const override {
+ return "INTERNAL_COMPACTION";
+ }
};
}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 83b6f562196..fe4c8b7ca79 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -1,4 +1,5 @@
#include "indexation.h"
+#include "mark_granules.h"
#include <ydb/core/tx/columnshard/blob_cache.h>
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
index cf19d14241b..6db2a5a988d 100644
--- a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
@@ -1,10 +1,12 @@
#include "split_compaction.h"
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/engines/storage/granule.h>
namespace NKikimr::NOlap {
TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
- const ui64 pathId = SrcGranule->PathId;
- const TMark ts0 = SrcGranule->Mark;
+ const ui64 pathId = SrcGranule.PathId;
+ const TMark ts0 = SrcGranule.Mark;
std::vector<TPortionInfo>& portions = SwitchedPortions;
std::vector<std::pair<TMark, ui64>> tsIds;
@@ -388,4 +390,17 @@ ui64 TSplitCompactColumnEngineChanges::TryMovePortions(const TMark& ts0, std::ve
return numRows;
}
+void TSplitCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
+ TBase::DoWriteIndexComplete(self, context);
+ self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
+ self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
+ self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
+}
+
+void TSplitCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
+ TBase::DoStart(self);
+ auto& g = CompactionInfo->GetObject<TGranuleMeta>();
+ self.CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.h b/ydb/core/tx/columnshard/engines/changes/split_compaction.h
index aed71cf5630..1473274e4c5 100644
--- a/ydb/core/tx/columnshard/engines/changes/split_compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.h
@@ -18,10 +18,22 @@ private:
std::vector<std::pair<TMark, ui64>>& tsIds,
std::vector<std::pair<TPortionInfo, ui64>>& toMove, TConstructionContext& context);
+ virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
protected:
virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override;
+ virtual TPortionMeta::EProduced GetResultProducedClass() const override {
+ return TPortionMeta::SPLIT_COMPACTED;
+ }
+ virtual void DoStart(NColumnShard::TColumnShard& self) override;
public:
+ virtual bool IsSplit() const override {
+ return true;
+ }
using TBase::TBase;
+
+ virtual TString TypeString() const override {
+ return "SPLIT_COMPACTION";
+ }
};
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index f258f9a834e..d526da6f5b7 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -10,6 +10,12 @@ protected:
virtual void DoCompile(TFinalizationContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
+ virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& /*context*/) override {
+
+ }
+ virtual void DoStart(NColumnShard::TColumnShard& /*self*/) override {
+
+ }
std::vector<TPortionInfo> MakeAppendedPortions(const ui64 pathId,
const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 granule,
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 9175a753612..20ba4fb8852 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -16,123 +16,13 @@
namespace NKikimr::NOlap {
-namespace {
-
-TConclusionStatus InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& portions, const TCompactionLimits& limits,
- TMarksGranules& marksGranules) {
- ui32 insertedCount = 0;
-
- THashSet<ui64> filtered;
- THashSet<ui64> goodCompacted;
- THashSet<ui64> nextToGood;
- {
- TMap<NArrow::TReplaceKey, std::vector<const TPortionInfo*>> points;
-
- for (const auto& portionInfo : portions) {
- if (portionInfo.IsInserted()) {
- ++insertedCount;
- } else if (portionInfo.BlobsSizes().second >= limits.GoodBlobSize) {
- goodCompacted.insert(portionInfo.Portion());
- }
-
- const NArrow::TReplaceKey& start = portionInfo.IndexKeyStart();
- const NArrow::TReplaceKey& end = portionInfo.IndexKeyEnd();
-
- points[start].push_back(&portionInfo);
- points[end].push_back(nullptr);
- }
-
- ui32 countInBucket = 0;
- ui64 bucketStartPortion = 0;
- bool isGood = false;
- int sum = 0;
- for (const auto& [key, vec] : points) {
- for (const auto* portionInfo : vec) {
- if (portionInfo) {
- ++sum;
- ui64 currentPortion = portionInfo->Portion();
- if (!bucketStartPortion) {
- bucketStartPortion = currentPortion;
- }
- ++countInBucket;
-
- ui64 prevIsGood = isGood;
- isGood = goodCompacted.contains(currentPortion);
- if (prevIsGood && !isGood) {
- nextToGood.insert(currentPortion);
- }
- } else {
- --sum;
- }
- }
-
- if (!sum) { // count(start) == count(end), start new range
- Y_VERIFY(bucketStartPortion);
-
- if (countInBucket == 1) {
- // We do not want to merge big compacted portions with inserted ones if there's no intersections.
- if (isGood) {
- filtered.insert(bucketStartPortion);
- }
- }
- countInBucket = 0;
- bucketStartPortion = 0;
- }
- }
- }
-
- Y_VERIFY(insertedCount);
-
- // Nothing to filter. Leave portions as is, no borders needed.
- if (filtered.empty() && goodCompacted.empty()) {
- return TConclusionStatus::Success();
- }
-
- // It's a map for SliceIntoGranules(). We use fake granule ids here to slice batch with borders.
- // We could merge inserted portions alltogether and slice result with filtered borders to prevent intersections.
- std::vector<TMark> borders;
- borders.push_back(granuleMark);
-
- std::vector<TPortionInfo> tmp;
- tmp.reserve(portions.size());
- for (auto& portionInfo : portions) {
- ui64 curPortion = portionInfo.Portion();
-
- // Prevent merge of compacted portions with no intersections
- if (filtered.contains(curPortion)) {
- const auto& start = portionInfo.IndexKeyStart();
- borders.emplace_back(TMark(start));
- } else {
- // nextToGood borders potentially split good compacted portions into 2 parts:
- // the first one without intersections and the second with them
- if (goodCompacted.contains(curPortion) || nextToGood.contains(curPortion)) {
- const auto& start = portionInfo.IndexKeyStart();
- borders.emplace_back(TMark(start));
- }
-
- tmp.emplace_back(std::move(portionInfo));
- }
- }
- tmp.swap(portions);
-
- if (borders.size() == 1) {
- Y_VERIFY(borders[0] == granuleMark);
- borders.clear();
- }
-
- marksGranules = TMarksGranules(std::move(borders));
- return TConclusionStatus::Success();
-}
-
-} // namespace
-
std::shared_ptr<NKikimr::NOlap::TCompactColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildCompactionChanges(std::unique_ptr<TCompactionInfo>&& info,
- const TCompactionLimits& limits, const TSnapshot& initSnapshot) {
+ const TCompactionLimits& limits, const TSnapshot& initSnapshot, const TCompactionSrcGranule& srcGranule) {
std::shared_ptr<TCompactColumnEngineChanges> result;
if (info->InGranule()) {
- result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, std::move(info));
+ result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, std::move(info), srcGranule);
} else {
- result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, std::move(info));
+ result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, std::move(info), srcGranule);
}
result->InitSnapshot = initSnapshot;
return result;
@@ -417,41 +307,20 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(co
std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info,
const TCompactionLimits& limits) {
- auto changes = TChangesConstructor::BuildCompactionChanges(std::move(info), limits, LastSnapshot);
-
- const auto& granuleInfo = changes->CompactionInfo->GetObject<TGranuleMeta>();
-
- changes->SwitchedPortions.reserve(granuleInfo.GetPortions().size());
- // Collect active portions for the granule.
- for (const auto& [_, portionInfo] : granuleInfo.GetPortions()) {
- if (portionInfo.IsActive()) {
- changes->SwitchedPortions.push_back(portionInfo);
- Y_VERIFY(portionInfo.Granule() == granuleInfo.GetGranuleId());
- }
- }
-
- const ui64 pathId = granuleInfo.Record.PathId;
+ const ui64 pathId = info->GetPlanCompaction().GetPathId();
Y_VERIFY(PathGranules.contains(pathId));
- // Locate mark for the granule.
- for (const auto& [mark, pathGranule] : PathGranules[pathId]) {
- if (pathGranule == granuleInfo.GetGranuleId()) {
- changes->SrcGranule = TCompactColumnEngineChanges::TSrcGranule(pathId, granuleInfo.GetGranuleId(), mark);
- break;
- }
- }
- Y_VERIFY(changes->SrcGranule);
- if (changes->CompactionInfo->InGranule()) {
- auto mergeInitResult = InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, limits, changes->MergeBorders);
- if (!mergeInitResult) {
- // Return granule to Compaction list. This is equal to single compaction worker behavior.
- changes->CompactionInfo->CompactionCanceled("cannot init in granule merge: " + mergeInitResult.GetErrorMessage());
- return {};
+ auto& g = info->GetObject<TGranuleMeta>();
+ for (const auto& [mark, pathGranule] : PathGranules[pathId]) {
+ if (pathGranule == g.GetGranuleId()) {
+ TCompactionSrcGranule srcGranule = TCompactionSrcGranule(pathId, g.GetGranuleId(), mark);
+ auto changes = TChangesConstructor::BuildCompactionChanges(std::move(info), limits, LastSnapshot, srcGranule);
+ NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes);
+ return changes;
}
}
- NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes);
- Y_VERIFY(!changes->SwitchedPortions.empty());
- return changes;
+ Y_VERIFY(false);
+ return nullptr;
}
std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 54192815f65..0f9ab568bdf 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -6,6 +6,9 @@
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
#include "storage/granule.h"
#include "storage/storage.h"
+#include "changes/indexation.h"
+#include "changes/compaction.h"
+#include "changes/ttl.h"
namespace NKikimr::NArrow {
struct TSortDescription;
@@ -39,7 +42,7 @@ public:
std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot);
static std::shared_ptr<TCompactColumnEngineChanges> BuildCompactionChanges(std::unique_ptr<TCompactionInfo>&& info,
- const TCompactionLimits& limits, const TSnapshot& initSnapshot);
+ const TCompactionLimits& limits, const TSnapshot& initSnapshot, const TCompactionSrcGranule& srcGranules);
static std::shared_ptr<TCleanupColumnEngineChanges> BuildCleanupChanges(const TSnapshot& initSnapshot);
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 914ef584cb0..39098c91a41 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -291,9 +291,9 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
const TExpected& expected) {
auto compactionInfo = engine.Compact(TestLimits(), {});
UNIT_ASSERT(!!compactionInfo);
- UNIT_ASSERT(!compactionInfo->InGranule());
std::shared_ptr<TCompactColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TestLimits());
+ UNIT_ASSERT(changes->IsSplit());
UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions);
changes->SetBlobs(std::move(blobs));
changes->StartEmergency();
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.cpp b/ydb/core/tx/columnshard/hooks/testing/controller.cpp
index 9868ac01378..2557103c73e 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.cpp
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.cpp
@@ -3,6 +3,8 @@
#include <ydb/core/tx/columnshard/engines/reader/order_control/default.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
+#include <ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h>
+#include <ydb/core/tx/columnshard/engines/changes/split_compaction.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::NYDBTest::NColumnShard {
@@ -30,12 +32,11 @@ bool TController::DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordB
}
bool TController::DoOnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& changes) {
- if (auto compaction = dynamic_pointer_cast<NOlap::TCompactColumnEngineChanges>(changes)) {
- if (compaction->CompactionInfo->InGranule()) {
- InternalCompactions.Inc();
- } else {
- SplitCompactions.Inc();
- }
+ if (dynamic_pointer_cast<NOlap::TInGranuleCompactColumnEngineChanges>(changes)) {
+ InternalCompactions.Inc();
+ }
+ if (dynamic_pointer_cast<NOlap::TSplitCompactColumnEngineChanges>(changes)) {
+ SplitCompactions.Inc();
}
return true;
}
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index bc2912c0bfe..fb7f456f368 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -5,6 +5,9 @@
#include <arrow/api.h>
#include <arrow/ipc/reader.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
+#include <ydb/core/tx/columnshard/engines/changes/with_appended.h>
+#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
+#include <ydb/core/tx/columnshard/engines/changes/cleanup.h>
namespace NKikimr {