aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-07 13:05:02 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-07 13:05:02 +0300
commitaa3ba3163b1c51da3a1c01a4205f3c1b1f387928 (patch)
tree88d895b00645bdc9cfce59acb4e0d7b4237a570c
parentef854dbcf2e5f1dd1d75c96b2ea4fba819f7ae3f (diff)
downloadydb-aa3ba3163b1c51da3a1c01a4205f3c1b1f387928.tar.gz
speedup compaction
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp108
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h76
-rw-r--r--ydb/core/tx/columnshard/defs.cpp8
-rw-r--r--ydb/core/tx/columnshard/defs.h6
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp67
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h12
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h35
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp103
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h114
12 files changed, 357 insertions, 208 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index c7b59eb4bd5..5559af7dc0e 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -169,9 +169,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
// Eviction to S3. TTxExportFinish will delete src blob when dst blob get EEvictState::EXTERN state.
} else if (!protectedBlobs.contains(blobId)) {
// We could drop the blob immediately
- if (!blobsToDrop.contains(blobId)) {
+ if (blobsToDrop.emplace(blobId).second) {
LOG_S_TRACE("Delete evicted blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
- blobsToDrop.insert(blobId);
}
}
@@ -181,9 +180,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
for (const auto& portionInfo : changes->PortionsToDrop) {
for (const auto& rec : portionInfo.Records) {
const auto& blobId = rec.BlobRange.BlobId;
- if (!blobsToDrop.contains(blobId)) {
+ if (blobsToDrop.emplace(blobId).second) {
LOG_S_TRACE("Delete blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
- blobsToDrop.insert(blobId);
}
}
Self->IncCounter(COUNTER_RAW_BYTES_ERASED, portionInfo.RawBytesSum());
@@ -258,6 +256,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
}
Self->FinishWriteIndex(ctx, Ev, ok, blobsWritten, bytesWritten);
+ Self->EnqueueProgressTx(ctx);
return true;
}
@@ -291,19 +290,17 @@ void TColumnShard::FinishWriteIndex(const TActorContext& ctx, TEvPrivate::TEvWri
TablesManager.MutablePrimaryIndex().FreeLocks(changes);
if (changes->IsInsert()) {
- ActiveIndexing = false;
+ BackgroundController.FinishIndexing();
IncCounter(ok ? COUNTER_INDEXING_SUCCESS : COUNTER_INDEXING_FAIL);
IncCounter(COUNTER_INDEXING_BLOBS_WRITTEN, blobsWritten);
IncCounter(COUNTER_INDEXING_BYTES_WRITTEN, bytesWritten);
IncCounter(COUNTER_INDEXING_TIME, ev->Get()->Duration.MilliSeconds());
} else if (changes->IsCompaction()) {
- ActiveCompaction--;
-
Y_VERIFY(changes->CompactionInfo);
- bool inGranule = changes->CompactionInfo->InGranule();
+ BackgroundController.FinishCompaction(changes->CompactionInfo->GetPlanCompaction());
- if (inGranule) {
+ if (changes->CompactionInfo->InGranule()) {
IncCounter(ok ? COUNTER_COMPACTION_SUCCESS : COUNTER_COMPACTION_FAIL);
IncCounter(COUNTER_COMPACTION_BLOBS_WRITTEN, blobsWritten);
IncCounter(COUNTER_COMPACTION_BYTES_WRITTEN, bytesWritten);
@@ -314,11 +311,11 @@ void TColumnShard::FinishWriteIndex(const TActorContext& ctx, TEvPrivate::TEvWri
}
IncCounter(COUNTER_COMPACTION_TIME, ev->Get()->Duration.MilliSeconds());
} else if (changes->IsCleanup()) {
- ActiveCleanup = false;
+ BackgroundController.FinishCleanup();
IncCounter(ok ? COUNTER_CLEANUP_SUCCESS : COUNTER_CLEANUP_FAIL);
} else if (changes->IsTtl()) {
- ActiveTtl = false;
+ BackgroundController.FinishTtl();
IncCounter(ok ? COUNTER_TTL_SUCCESS : COUNTER_TTL_FAIL);
IncCounter(COUNTER_EVICTION_BLOBS_WRITTEN, blobsWritten);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 75c04412999..34fe9d09952 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -589,17 +589,22 @@ void TColumnShard::ScheduleNextGC(const TActorContext& ctx, bool cleanupOnly) {
UpdateBlobMangerCounters();
if (BlobManager->CanCollectGarbage(cleanupOnly)) {
+ BlobManager->GetCounters().StartCollection->Add(1);
Execute(CreateTxRunGc(), ctx);
+ } else {
+ BlobManager->GetCounters().SkipCollection->Add(1);
}
}
void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) {
if (periodic) {
if (LastPeriodicBackActivation > TInstant::Now() - ActivationPeriod) {
+ CSCounters.OnTooEarly();
return;
}
LastPeriodicBackActivation = TInstant::Now();
}
+ CSCounters.OnStartBackground();
SendPeriodicStats();
@@ -612,39 +617,14 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivit
// Schedule either indexing or compaction.
if (activity.HasIndexation() || activity.HasCompaction()) {
[&] {
- if (ActiveIndexing || ActiveCompaction > 0) {
- LOG_S_DEBUG("Indexing or compaction already in progress at tablet " << TabletID());
+ if (BackgroundController.IsIndexingActive()) {
return;
}
- // Preventing conflicts between indexing and compaction leads to election between them.
- // Indexing vs compaction probability depends on index and insert table overload status.
- // Prefer compaction: 25% by default; 50% if IndexOverloaded(); 6.25% if InsertTableOverloaded().
- const bool preferIndexing = RandomNumber<ui32>(1000) < 750;
-
- if (preferIndexing) {
- if (activity.HasIndexation()) {
- if (SetupIndexation()) {
- BackgroundActivation++;
- return;
- }
- }
- if (activity.HasCompaction()) {
- if (SetupCompaction()) {
- return;
- }
- }
- } else {
- if (activity.HasCompaction()) {
- if (SetupCompaction()) {
- BackgroundActivation++;
- return;
- }
- }
- if (activity.HasIndexation()) {
- if (SetupIndexation()) {
- return;
- }
- }
+ if (activity.HasCompaction()) {
+ SetupCompaction();
+ }
+ if (activity.HasIndexation()) {
+ SetupIndexation();
}
}();
}
@@ -671,16 +651,17 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivit
}
}
-bool TColumnShard::SetupIndexation() {
+void TColumnShard::SetupIndexation() {
+ CSCounters.OnSetupIndexation();
ui32 blobs = 0;
ui32 ignored = 0;
ui64 size = 0;
ui64 bytesToIndex = 0;
std::vector<const NOlap::TInsertedData*> dataToIndex;
dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT);
- THashMap<ui64, ui64> overloadedPathGranules;
for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) {
for (auto* pathInfo : it->second) {
+ const bool hasSplitCompaction = BackgroundController.HasSplitCompaction(pathInfo->GetPathId());
const bool granulesOverloaded = TablesManager.GetPrimaryIndex()->HasOverloadedGranules(pathInfo->GetPathId());
for (auto& data : pathInfo->GetCommitted()) {
ui32 dataSize = data.BlobSize();
@@ -695,6 +676,11 @@ bool TColumnShard::SetupIndexation() {
CSCounters.SkipIndexationInputDueToGranuleOverload(dataSize);
continue;
}
+ if (hasSplitCompaction) {
+ ++ignored;
+ CSCounters.SkipIndexationInputDueToSplitCompaction(dataSize);
+ continue;
+ }
++blobs;
bytesToIndex += dataSize;
dataToIndex.push_back(&data);
@@ -702,12 +688,6 @@ bool TColumnShard::SetupIndexation() {
}
}
- for (auto& [p, cnt] : overloadedPathGranules) {
- ui64 pathId(p);
- ui64 count(cnt);
- LOG_S_INFO("Overloaded granules (" << count << ") for pathId " << pathId << " at tablet " << TabletID());
- }
-
if (bytesToIndex < (ui64)Limits.MinInsertBytes && blobs < TLimits::MIN_SMALL_BLOBS_TO_INSERT) {
LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored "
<< ignored << ") at tablet " << TabletID());
@@ -715,9 +695,10 @@ bool TColumnShard::SetupIndexation() {
// Force small indexations sometimes to keep BatchCache smaller
if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) {
++SkippedIndexations;
- return false;
+ return;
}
}
+ CSCounters.IndexationInput(bytesToIndex);
SkippedIndexations = 0;
LOG_S_DEBUG("Prepare indexing " << bytesToIndex << " bytes in " << dataToIndex.size() << " batches of committed "
@@ -739,11 +720,11 @@ bool TColumnShard::SetupIndexation() {
auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(CompactionLimits.Get(), std::move(data));
if (!indexChanges) {
LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID());
- return false;
+ return;
}
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
- ActiveIndexing = true;
+ BackgroundController.StartIndexing();
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterIndexing, std::move(cachedBlobs));
if (Tiers) {
@@ -751,17 +732,16 @@ bool TColumnShard::SetupIndexation() {
}
ActorContext().Send(IndexingActor, std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev)));
- return true;
}
-bool TColumnShard::SetupCompaction() {
- std::vector<std::unique_ptr<TEvPrivate::TEvCompaction>> events;
+void TColumnShard::SetupCompaction() {
+ CSCounters.OnSetupCompaction();
- while (ActiveCompaction < TSettings::MAX_ACTIVE_COMPACTIONS) {
+ while (BackgroundController.GetCompactionsCount() < TSettings::MAX_ACTIVE_COMPACTIONS) {
auto limits = CompactionLimits.Get();
auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(limits);
if (!compactionInfo) {
- if (events.empty()) {
+ if (!BackgroundController.GetCompactionsCount()) {
LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID());
}
break;
@@ -769,38 +749,41 @@ bool TColumnShard::SetupCompaction() {
LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID());
+ auto& g = compactionInfo->GetObject<NOlap::TGranuleMeta>();
+ if (compactionInfo->InGranule()) {
+ CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
+ } else {
+ CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
+ }
+
ui64 outdatedStep = GetOutdatedStep();
+ const NOlap::TPlanCompactionInfo planInfo = compactionInfo->GetPlanCompaction();
auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), NOlap::TSnapshot(outdatedStep, 0), limits);
if (!indexChanges) {
- if (events.empty()) {
+ if (!BackgroundController.GetCompactionsCount()) {
LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
}
break;
}
+ BackgroundController.StartCompaction(planInfo);
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
- ActiveCompaction++;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterCompaction);
if (Tiers) {
ev->SetTiering(Tiers->GetTiering());
}
- events.push_back(std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager));
- }
-
- LOG_S_DEBUG("Compaction events " << events.size() << " ActiveCompaction " << ActiveCompaction << " at tablet " << TabletID());
-
- for (auto& ev : events) {
- ActorContext().Send(CompactionActor, std::move(ev));
+ ActorContext().Send(CompactionActor, std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager));
}
- return events.size() != 0;
+ LOG_S_DEBUG("ActiveCompactions: " << BackgroundController.GetCompactionsCount() << " at tablet " << TabletID());
}
std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls,
bool force) {
- if (ActiveTtl) {
+ CSCounters.OnSetupTtl();
+ if (BackgroundController.IsTtlActive()) {
LOG_S_DEBUG("TTL already in progress at tablet " << TabletID());
return {};
}
@@ -814,7 +797,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
if (Tiers) {
eviction = Tiers->GetTiering(); // TODO: pathIds
}
- TablesManager.AddTtls(eviction, TInstant::Now(), force);
+ TablesManager.AddTtls(eviction, AppData()->TimeProvider->Now(), force);
}
if (eviction.empty()) {
@@ -842,14 +825,15 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
bool needWrites = !indexChanges->PortionsToEvict.empty();
- ActiveTtl = true;
+ BackgroundController.StartTtl();
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
ev->SetTiering(eviction);
return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites);
}
std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
- if (ActiveCleanup) {
+ CSCounters.OnSetupCleanup();
+ if (BackgroundController.IsCleanupActive()) {
LOG_S_DEBUG("Cleanup already in progress at tablet " << TabletID());
return {};
}
@@ -899,7 +883,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false);
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
- ActiveCleanup = true;
+ BackgroundController.StartCleanup();
return ev;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 226c99d0e4c..f2302d96466 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -331,6 +331,72 @@ private:
}
};
+ class TBackgroundController {
+ private:
+ using TCurrentCompaction = THashMap<ui64, NOlap::TPlanCompactionInfo>;
+ bool ActiveIndexing = false;
+ TCurrentCompaction ActiveCompactionInfo;
+ bool ActiveCleanup = false;
+ bool ActiveTtl = false;
+ public:
+ void StartCompaction(const NOlap::TPlanCompactionInfo& info) {
+ Y_VERIFY(ActiveCompactionInfo.emplace(info.GetPathId(), info).second);
+ }
+ void FinishCompaction(const NOlap::TPlanCompactionInfo& info) {
+ Y_VERIFY(ActiveCompactionInfo.erase(info.GetPathId()));
+ }
+ const TCurrentCompaction& GetActiveCompaction() const {
+ return ActiveCompactionInfo;
+ }
+ ui32 GetCompactionsCount() const {
+ return ActiveCompactionInfo.size();
+ }
+
+ void StartIndexing() {
+ Y_VERIFY(!ActiveIndexing);
+ ActiveIndexing = true;
+ }
+ void FinishIndexing() {
+ Y_VERIFY(ActiveIndexing);
+ ActiveIndexing = false;
+ }
+ bool IsIndexingActive() const {
+ return ActiveIndexing;
+ }
+
+ void StartCleanup() {
+ Y_VERIFY(!ActiveCleanup);
+ ActiveCleanup = true;
+ }
+ void FinishCleanup() {
+ Y_VERIFY(ActiveCleanup);
+ ActiveCleanup = false;
+ }
+ bool IsCleanupActive() const {
+ return ActiveCleanup;
+ }
+
+ void StartTtl() {
+ Y_VERIFY(!ActiveTtl);
+ ActiveTtl = true;
+ }
+ void FinishTtl() {
+ Y_VERIFY(ActiveTtl);
+ ActiveTtl = false;
+ }
+ bool IsTtlActive() const {
+ return ActiveTtl;
+ }
+
+ bool HasSplitCompaction(const ui64 pathId) const {
+ auto it = ActiveCompactionInfo.find(pathId);
+ if (it == ActiveCompactionInfo.end()) {
+ return false;
+ }
+ return !it->second.IsInternal();
+ }
+ };
+
using TSchemaPreset = TSchemaPreset;
using TTableInfo = TTableInfo;
@@ -353,7 +419,6 @@ private:
ui64 WritesInFly = 0;
ui64 OwnerPathId = 0;
ui64 StatsReportRound = 0;
- ui64 BackgroundActivation = 0;
ui32 SkippedIndexations = TSettings::MAX_INDEXATIONS_TO_SKIP; // Force indexation on tablet init
TString OwnerPath;
@@ -401,11 +466,8 @@ private:
THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId;
TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads;
TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans;
- bool ActiveIndexing = false;
- ui32 ActiveCompaction = 0;
- bool ActiveCleanup = false;
- bool ActiveTtl = false;
ui32 ActiveEvictions = 0;
+ TBackgroundController BackgroundController;
std::unique_ptr<TBlobManager> BlobManager;
TInFlightReadsTracker InFlightReadsTracker;
TSettings Settings;
@@ -465,8 +527,8 @@ private:
void ScheduleNextGC(const TActorContext& ctx, bool cleanupOnly = false);
- bool SetupIndexation();
- bool SetupCompaction();
+ void SetupIndexation();
+ void SetupCompaction();
std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {},
bool force = false);
std::unique_ptr<TEvPrivate::TEvWriteIndex> SetupCleanup();
diff --git a/ydb/core/tx/columnshard/defs.cpp b/ydb/core/tx/columnshard/defs.cpp
index 1f061db77c4..5728645fab8 100644
--- a/ydb/core/tx/columnshard/defs.cpp
+++ b/ydb/core/tx/columnshard/defs.cpp
@@ -20,4 +20,12 @@ void TLimits::SetMaxBlobSize(const ui64 value) {
MaxBlobSize = value;
}
+TLimits::TLimits()
+ : MinInsertBytes(MIN_BYTES_TO_INSERT, 1, 2 * MAX_BYTES_TO_INSERT)
+ , MaxInsertBytes(15 * MAX_BYTES_TO_INSERT, 0, 30 * MAX_BYTES_TO_INSERT)
+ , InsertTableSize(MIN_SMALL_BLOBS_TO_INSERT, 0, 1000)
+{
+
+}
+
}
diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h
index 7717ea64a70..49f59feae87 100644
--- a/ydb/core/tx/columnshard/defs.h
+++ b/ydb/core/tx/columnshard/defs.h
@@ -47,11 +47,7 @@ struct TLimits {
TControlWrapper MaxInsertBytes;
TControlWrapper InsertTableSize;
- TLimits()
- : MinInsertBytes(MIN_BYTES_TO_INSERT, 1, 2 * MAX_BYTES_TO_INSERT)
- , MaxInsertBytes(MAX_BYTES_TO_INSERT, 0, 2 * MAX_BYTES_TO_INSERT)
- , InsertTableSize(MIN_SMALL_BLOBS_TO_INSERT, 0, 1000)
- {}
+ TLimits();
void RegisterControls(TControlBoard& icb) {
icb.RegisterSharedControl(MinInsertBytes, "ColumnShardControls.MinBytesToIndex");
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 299d75dea0a..fbc893ae5a1 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -567,7 +567,7 @@ public:
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return GetIndexInfo().GetReplaceKey(); }
virtual const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return GetIndexInfo().GetSortingKey(); }
virtual const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return GetIndexInfo().GetIndexKey(); }
- virtual const THashSet<ui64>* GetOverloadedGranules(ui64 /*pathId*/) const { return nullptr; }
+ virtual const THashSet<ui64>* GetOverloadedGranules(const ui64 pathId) const = 0;
bool HasOverloadedGranules(const ui64 pathId) const {
return GetOverloadedGranules(pathId) != nullptr;
}
@@ -588,7 +588,7 @@ public:
ui32 maxRecords) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema,
ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0;
- virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0;
+ virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) = 0;
virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
//virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 076100fe535..f838be03282 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -312,11 +312,12 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
if (spg->Empty()) {
EmptyGranules.insert(granule);
emptyGranulePaths.insert(spg->PathId());
- } else {
- CleanupGranules.insert(granule);
}
for (const auto& [_, portionInfo] : spg->GetPortions()) {
UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD);
+ if (portionInfo.CheckForCleanup()) {
+ CleanupPortions.emplace(portionInfo.GetAddress());
+ }
}
}
@@ -472,7 +473,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
const TCompactionLimits& limits,
THashSet<ui64>& pathsToDrop,
ui32 maxRecords) {
- auto changes = TChanges::BuildClenupChanges(DefaultMark(), snapshot, limits);
+ auto changes = TChanges::BuildCleanupChanges(DefaultMark(), snapshot, limits);
ui32 affectedRecords = 0;
// Add all portions from dropped paths
@@ -514,39 +515,31 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
// Add stale portions of alive paths
THashSet<ui64> cleanGranules;
- for (ui64 granule : CleanupGranules) {
- auto spg = Granules.find(granule)->second;
- Y_VERIFY(spg);
-
- bool isClean = true;
- for (auto& [portion, info] : spg->GetPortions()) {
- if (info.IsActive() || dropPortions.contains(portion)) {
+ std::shared_ptr<TGranuleMeta> granuleMeta;
+ for (auto it = CleanupPortions.begin(); it != CleanupPortions.end();) {
+ if (!granuleMeta || granuleMeta->GetGranuleId() != it->GetGranuleId()) {
+ auto itGranule = Granules.find(it->GetGranuleId());
+ if (itGranule == Granules.end()) {
+ it = CleanupPortions.erase(it);
continue;
}
-
- isClean = false;
- if (info.GetXSnapshot() < snapshot) {
- affectedRecords += info.NumRecords();
- changes->PortionsToDrop.push_back(info);
- }
-
+ granuleMeta = itGranule->second;
+ }
+ Y_VERIFY(granuleMeta);
+ auto* portionInfo = granuleMeta->GetPortionPointer(it->GetPortionId());
+ if (!portionInfo) {
+ it = CleanupPortions.erase(it);
+ } else if (portionInfo->CheckForCleanup(snapshot)) {
+ affectedRecords += portionInfo->NumRecords();
+ changes->PortionsToDrop.push_back(*portionInfo);
+ it = CleanupPortions.erase(it);
if (affectedRecords > maxRecords) {
+ changes->NeedRepeat = true;
break;
}
+ } else if (portionInfo->CheckForCleanup()) {
+ ++it;
}
-
- if (isClean) {
- cleanGranules.insert(granule);
- }
-
- if (affectedRecords > maxRecords) {
- changes->NeedRepeat = true;
- break;
- }
- }
-
- for (ui64 granule : cleanGranules) {
- CleanupGranules.erase(granule);
}
return changes;
@@ -593,12 +586,13 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
if (auto max = info.MaxValue(ttlColumnId)) {
bool keep = NArrow::ScalarLess(expireTimestamp, max);
-
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", allowDrop);
if (keep && tryEvictPortion) {
TString tierName;
for (auto& tierRef : ttl.GetOrderedTiers()) { // TODO: lower/upper_bound + move into TEviction
auto& tierInfo = tierRef.Get();
if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
+ SignalCounters.OnPortionNoTtlColumn(info.BlobsBytes());
continue; // Ignore tiers with bad ttl column
}
if (NArrow::ScalarLess(tierInfo.EvictScalar(schema), max)) {
@@ -612,12 +606,17 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
bool needExport = ttl.NeedExport(tierName);
changes->PortionsToEvict.emplace_back(
info, TPortionEvictionFeatures(tierName, pathId, needExport));
+ SignalCounters.OnPortionToEvict(info.BlobsBytes());
}
}
if (!keep && allowDrop) {
dropBlobs += info.NumRecords();
changes->PortionsToDrop.push_back(info);
+ SignalCounters.OnPortionToDrop(info.BlobsBytes());
}
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max");
+ SignalCounters.OnPortionNoBorder(info.BlobsBytes());
}
}
}
@@ -660,7 +659,7 @@ std::vector<std::vector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGran
}
bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
- const TSnapshot& snapshot) {
+ const TSnapshot& snapshot) noexcept {
auto changes = std::static_pointer_cast<TChanges>(indexChanges);
// Update tmp granules with real ids
@@ -713,13 +712,13 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
if (changes->IsCompaction()) {
Y_VERIFY(changes->CompactionInfo);
for (auto& portionInfo : changes->SwitchedPortions) {
- CleanupGranules.insert(portionInfo.Granule());
+ CleanupPortions.insert(portionInfo.GetAddress());
}
}
return true;
}
-bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply) {
+bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply) noexcept {
const std::vector<TPortionInfo>* switchedPortions = nullptr;
if (changes.IsCompaction()) {
Y_VERIFY(changes.CompactionInfo);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 4a225e60dfe..5e8e1b53a9e 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -102,7 +102,7 @@ public:
return changes;
}
- static std::shared_ptr<TChanges> BuildClenupChanges(const TMark& defaultMark, const TSnapshot& initSnapshot, const TCompactionLimits& limits) {
+ static std::shared_ptr<TChanges> BuildCleanupChanges(const TMark& defaultMark, const TSnapshot& initSnapshot, const TCompactionLimits& limits) {
auto changes = std::make_shared<TChanges>(TColumnEngineChanges::CLEANUP, defaultMark, limits, TPrivateTag());
changes->InitSnapshot = initSnapshot;
return changes;
@@ -197,7 +197,7 @@ public:
return VersionedIndex;
}
- const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override {
+ const THashSet<ui64>* GetOverloadedGranules(const ui64 pathId) const override {
return GranulesStorage->GetOverloaded(pathId);
}
@@ -239,7 +239,7 @@ public:
ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
- const TSnapshot& snapshot) override;
+ const TSnapshot& snapshot) noexcept override;
void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) override;
@@ -286,7 +286,7 @@ private:
/// Set of empty granules.
/// Just for providing count of empty granules.
THashSet<ui64> EmptyGranules;
- THashSet<ui64> CleanupGranules;
+ TSet<TPortionAddress> CleanupPortions;
TColumnEngineStats Counters;
ui64 LastPortion;
ui64 LastGranule;
@@ -314,7 +314,7 @@ private:
PathGranules.clear();
PathStats.clear();
EmptyGranules.clear();
- CleanupGranules.clear();
+ CleanupPortions.clear();
Counters.Clear();
LastPortion = 0;
@@ -325,7 +325,7 @@ private:
bool LoadGranules(IDbWrapper& db);
bool LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs);
bool LoadCounters(IDbWrapper& db);
- bool ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply);
+ bool ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply) noexcept;
void EraseGranule(ui64 pathId, ui64 granule, const TMark& mark);
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index ef6a6649ff2..e4e693f1205 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -92,11 +92,11 @@ public:
, Batch(batch)
, Schema(schema)
{
- if (granuleMeta && granuleMeta->GetSummary().GetOther().GetRecordsCount()) {
- SortedColumnIds = granuleMeta->GetSummary().GetColumnIdsSortedBySizeDescending();
+ if (granuleMeta && granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount()) {
+ SortedColumnIds = granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending();
const auto biggestColumn = SortedColumnIds.front();
Y_VERIFY(biggestColumn.GetPackedBlobsSize());
- const double expectedPackedRecordSize = 1.0 * biggestColumn.GetPackedBlobsSize() / granuleMeta->GetSummary().GetOther().GetRecordsCount();
+ const double expectedPackedRecordSize = 1.0 * biggestColumn.GetPackedBlobsSize() / granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount();
BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize;
for (ui32 i = 1; i < SortedColumnIds.size(); ++i) {
Y_VERIFY(SortedColumnIds[i - 1].GetPackedBlobsSize() >= SortedColumnIds[i].GetPackedBlobsSize());
@@ -360,17 +360,10 @@ TConclusion<std::vector<TString>> TIndexationLogic::DoApply(std::shared_ptr<TCol
changes->AddPathIfNotExists(pathId);
// We could merge data here cause tablet limits indexing data portions
-#if 0
- auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortDescription()); // insert: no replace
- Y_VERIFY(merged);
- Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey()));
-#else
auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
Y_VERIFY(merged);
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, resultSchema->GetIndexInfo().GetReplaceKey()));
-#endif
-
auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], resultSchema->GetIndexInfo());
for (auto& [granule, batch] : granuleBatches) {
auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta());
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index bd96e230781..e943c29f710 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -221,6 +221,23 @@ struct TPortionMeta {
}
};
+class TPortionAddress {
+private:
+ YDB_READONLY(ui64, GranuleId, 0);
+ YDB_READONLY(ui64, PortionId, 0);
+public:
+ TPortionAddress(const ui64 granuleId, const ui64 portionId)
+ : GranuleId(granuleId)
+ , PortionId(portionId)
+ {
+
+ }
+
+ bool operator<(const TPortionAddress& item) const {
+ return std::tie(GranuleId, PortionId) < std::tie(item.GranuleId, item.PortionId);
+ }
+};
+
struct TPortionInfo {
static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024;
@@ -237,6 +254,18 @@ struct TPortionInfo {
bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); }
size_t NumRecords() const { return Records.size(); }
+ bool CheckForCleanup(const TSnapshot& snapshot) const {
+ if (!CheckForCleanup()) {
+ return false;
+ }
+
+ return GetXSnapshot() < snapshot;
+ }
+
+ bool CheckForCleanup() const {
+ return !IsActive();
+ }
+
bool AllowEarlyFilter() const {
return Meta.Produced == TPortionMeta::COMPACTED
|| Meta.Produced == TPortionMeta::SPLIT_COMPACTED;
@@ -261,6 +290,12 @@ struct TPortionInfo {
return rec.Granule;
}
+ TPortionAddress GetAddress() const {
+ Y_VERIFY(!Empty());
+ auto& rec = Records[0];
+ return TPortionAddress(rec.Granule, rec.Portion);
+ }
+
void SetGranule(ui64 granule) {
for (auto& rec : Records) {
rec.Granule = granule;
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 43f973f9761..f41202c8cfc 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -4,22 +4,30 @@
namespace NKikimr::NOlap {
bool TGranuleMeta::NeedSplit(const TCompactionLimits& limits, bool& inserted) const {
- inserted = GetSummary().GetInserted().GetPortionsCount();
- bool differentBorders = GetSummary().GetDifferentBorders();
- if (GetSummary().GetActivePortionsCount() < 2) {
+ inserted = GetAdditiveSummary().GetInserted().GetPortionsCount();
+ bool differentBorders = GetHardSummary().GetDifferentBorders();
+ if (GetAdditiveSummary().GetActivePortionsCount() < 2) {
inserted = false;
return false;
}
- return differentBorders && (GetSummary().GetMaxColumnsSize() >= limits.GranuleBlobSplitSize || GetSummary().GetGranuleSize() >= limits.GranuleOverloadSize);
+ return differentBorders && (GetAdditiveSummary().GetMaxColumnsSize() >= limits.GranuleBlobSplitSize ||
+ GetAdditiveSummary().GetGranuleSize() >= limits.GranuleOverloadSize);
}
ui64 TGranuleMeta::Size() const {
- return GetSummary().GetGranuleSize();
+ return GetAdditiveSummary().GetGranuleSize();
}
void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
- Portions[info.Portion()] = info;
- OnAfterChangePortion(info.Portion());
+ auto it = Portions.find(info.Portion());
+ if (it == Portions.end()) {
+ OnBeforeChangePortion(nullptr, &info);
+ Portions.emplace(info.Portion(), info);
+ } else {
+ OnBeforeChangePortion(&it->second, &info);
+ it->second = info;
+ }
+ OnAfterChangePortion();
}
bool TGranuleMeta::ErasePortion(const ui64 portion) {
@@ -28,21 +36,37 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) {
return false;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second)("pathId", Record.PathId);
+ OnBeforeChangePortion(&it->second, nullptr);
Portions.erase(it);
- OnAfterChangePortion(portion);
+ OnAfterChangePortion();
return true;
}
void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) {
- Portions[rec.Portion].AddRecord(indexInfo, rec);
- OnAfterChangePortion(rec.Portion);
+ auto& portion = Portions[rec.Portion];
+ auto portionNew = portion;
+ portionNew.AddRecord(indexInfo, rec);
+ OnBeforeChangePortion(&portion, &portionNew);
+ portion = std::move(portionNew);
+ OnAfterChangePortion();
}
-void TGranuleMeta::OnAfterChangePortion(const ui64 /*portion*/) {
- ResetCaches();
+void TGranuleMeta::OnAfterChangePortion() {
Owner->UpdateGranuleInfo(*this);
}
+void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, const TPortionInfo* portionAfter) {
+ HardSummaryCache = {};
+ if (!!AdditiveSummaryCache) {
+ if (portionBefore && portionBefore->IsActive()) {
+ AdditiveSummaryCache->RemovePortion(*portionBefore);
+ }
+ if (portionAfter && portionAfter->IsActive()) {
+ AdditiveSummaryCache->AddPortion(*portionAfter);
+ }
+ }
+}
+
void TGranuleMeta::OnCompactionFinished() {
AllowInsertionFlag = false;
Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction));
@@ -77,38 +101,27 @@ void TGranuleMeta::OnCompactionStarted(const bool inGranule) {
}
}
-void TGranuleMeta::RebuildMetrics() const {
- TGranuleSummary result;
+void TGranuleMeta::RebuildHardMetrics() const {
+ TGranuleHardSummary result;
std::map<ui32, TColumnSummary> packedSizeByColumns;
bool differentBorders = false;
THashSet<NArrow::TReplaceKey> borders;
for (auto&& i : Portions) {
- if (i.second.IsActive()) {
- if (!differentBorders) {
- borders.insert(i.second.IndexKeyStart());
- borders.insert(i.second.IndexKeyEnd());
- differentBorders = (borders.size() > 1);
- }
- auto sizes = i.second.BlobsSizes();
- for (auto&& c : i.second.Records) {
- auto it = packedSizeByColumns.find(c.ColumnId);
- if (it == packedSizeByColumns.end()) {
- it = packedSizeByColumns.emplace(c.ColumnId, TColumnSummary(c.ColumnId)).first;
- }
- it->second.AddData(i.second.IsInserted(), c.BlobRange.Size, i.second.NumRows());
- }
- if (i.second.IsInserted()) {
- result.Inserted.PortionsSize += sizes.first;
- result.Inserted.MaxColumnsSize += sizes.second;
- result.Inserted.RecordsCount += i.second.NumRows();
- ++result.Inserted.PortionsCount;
- } else {
- result.Other.PortionsSize += sizes.first;
- result.Other.MaxColumnsSize += sizes.second;
- result.Other.RecordsCount += i.second.NumRows();
- ++result.Other.PortionsCount;
+ if (!i.second.IsActive()) {
+ continue;
+ }
+ if (!differentBorders) {
+ borders.insert(i.second.IndexKeyStart());
+ borders.insert(i.second.IndexKeyEnd());
+ differentBorders = (borders.size() > 1);
+ }
+ for (auto&& c : i.second.Records) {
+ auto it = packedSizeByColumns.find(c.ColumnId);
+ if (it == packedSizeByColumns.end()) {
+ it = packedSizeByColumns.emplace(c.ColumnId, TColumnSummary(c.ColumnId)).first;
}
+ it->second.AddData(i.second.IsInserted(), c.BlobRange.Size, i.second.NumRows());
}
}
{
@@ -124,7 +137,19 @@ void TGranuleMeta::RebuildMetrics() const {
std::swap(result.ColumnIdsSortedBySizeDescending, transpSorted);
}
result.DifferentBorders = differentBorders;
- SummaryCache = result;
+ HardSummaryCache = result;
+}
+
+void TGranuleMeta::RebuildAdditiveMetrics() const {
+ TGranuleAdditiveSummary result;
+
+ for (auto&& i : Portions) {
+ if (!i.second.IsActive()) {
+ continue;
+ }
+ result.AddPortion(i.second);
+ }
+ AdditiveSummaryCache = result;
}
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 1446af52ffb..054f73de995 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -32,25 +32,45 @@ public:
class TDataClassSummary {
private:
- ui64 PortionsSize = 0;
- ui64 MaxColumnsSize = 0;
- ui64 PortionsCount = 0;
- ui64 RecordsCount = 0;
+ i64 PortionsSize = 0;
+ i64 MaxColumnsSize = 0;
+ i64 PortionsCount = 0;
+ i64 RecordsCount = 0;
friend class TGranuleMeta;
public:
- ui64 GetPortionsSize() const {
+ i64 GetPortionsSize() const {
return PortionsSize;
}
- ui64 GetRecordsCount() const {
+ i64 GetRecordsCount() const {
return RecordsCount;
}
- ui64 GetMaxColumnsSize() const {
+ i64 GetMaxColumnsSize() const {
return MaxColumnsSize;
}
- ui64 GetPortionsCount() const {
+ i64 GetPortionsCount() const {
return PortionsCount;
}
+ void AddPortion(const TPortionInfo& info) {
+ const auto sizes = info.BlobsSizes();
+ PortionsSize += sizes.first;
+ MaxColumnsSize += sizes.second;
+ RecordsCount += info.NumRows();
+ ++PortionsCount;
+ }
+
+ void RemovePortion(const TPortionInfo& info) {
+ const auto sizes = info.BlobsSizes();
+ PortionsSize -= sizes.first;
+ Y_VERIFY(PortionsSize >= 0);
+ MaxColumnsSize -= sizes.second;
+ Y_VERIFY(MaxColumnsSize >= 0);
+ RecordsCount -= info.NumRows();
+ Y_VERIFY(RecordsCount >= 0);
+ --PortionsCount;
+ Y_VERIFY(PortionsCount >= 0);
+ }
+
TDataClassSummary operator+(const TDataClassSummary& item) const {
TDataClassSummary result;
result.PortionsSize = PortionsSize + item.PortionsSize;
@@ -106,26 +126,32 @@ public:
}
};
-class TGranuleSummary {
+class TGranuleHardSummary {
private:
- TDataClassSummary Inserted;
- TDataClassSummary Other;
- friend class TGranuleMeta;
std::vector<TColumnSummary> ColumnIdsSortedBySizeDescending;
bool DifferentBorders = false;
+ friend class TGranuleMeta;
public:
+ bool GetDifferentBorders() const {
+ return DifferentBorders;
+ }
const std::vector<TColumnSummary>& GetColumnIdsSortedBySizeDescending() const {
return ColumnIdsSortedBySizeDescending;
}
+};
+
+class TGranuleAdditiveSummary {
+private:
+ TDataClassSummary Inserted;
+ TDataClassSummary Other;
+ friend class TGranuleMeta;
+public:
const TDataClassSummary& GetInserted() const {
return Inserted;
}
const TDataClassSummary& GetOther() const {
return Other;
}
- bool GetDifferentBorders() const {
- return DifferentBorders;
- }
ui64 GetGranuleSize() const {
return (Inserted + Other).GetPortionsSize();
}
@@ -135,23 +161,34 @@ public:
ui64 GetMaxColumnsSize() const {
return (Inserted + Other).GetMaxColumnsSize();
}
+ void AddPortion(const TPortionInfo& info) {
+ if (info.IsInserted()) {
+ Inserted.AddPortion(info);
+ } else {
+ Other.AddPortion(info);
+ }
+ }
+ void RemovePortion(const TPortionInfo& info) {
+ if (info.IsInserted()) {
+ Inserted.RemovePortion(info);
+ } else {
+ Other.RemovePortion(info);
+ }
+ }
};
class TCompactionPriority: public TCompactionPriorityInfo {
private:
using TBase = TCompactionPriorityInfo;
- TGranuleSummary GranuleSummary;
+ TGranuleAdditiveSummary GranuleSummary;
ui64 GetWeightCorrected() const {
- if (!GranuleSummary.GetDifferentBorders()) {
- return 0;
- }
if (GranuleSummary.GetActivePortionsCount() <= 1) {
return 0;
}
return GranuleSummary.GetGranuleSize() * GranuleSummary.GetActivePortionsCount() * GranuleSummary.GetActivePortionsCount();
}
public:
- TCompactionPriority(const TCompactionPriorityInfo& data, const TGranuleSummary& granuleSummary)
+ TCompactionPriority(const TCompactionPriorityInfo& data, const TGranuleAdditiveSummary& granuleSummary)
: TBase(data)
, GranuleSummary(granuleSummary)
{
@@ -167,13 +204,11 @@ public:
class TGranuleMeta: public ICompactionObjectCallback, TNonCopyable {
private:
THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo
- mutable std::optional<TGranuleSummary> SummaryCache;
+ mutable std::optional<TGranuleAdditiveSummary> AdditiveSummaryCache;
+ mutable std::optional<TGranuleHardSummary> HardSummaryCache;
bool NeedSplit(const TCompactionLimits& limits, bool& inserted) const;
- void RebuildMetrics() const;
-
- void ResetCaches() {
- SummaryCache = {};
- }
+ void RebuildHardMetrics() const;
+ void RebuildAdditiveMetrics() const;
enum class EActivity {
SplitCompaction,
@@ -185,16 +220,23 @@ private:
mutable bool AllowInsertionFlag = false;
std::shared_ptr<TGranulesStorage> Owner;
- void OnAfterChangePortion(const ui64 portion);
+ void OnBeforeChangePortion(const TPortionInfo* portionBefore, const TPortionInfo* portionAfter);
+ void OnAfterChangePortion();
public:
- const TGranuleSummary& GetSummary() const {
- if (!SummaryCache) {
- RebuildMetrics();
+ const TGranuleHardSummary& GetHardSummary() const {
+ if (!HardSummaryCache) {
+ RebuildHardMetrics();
+ }
+ return *HardSummaryCache;
+ }
+ const TGranuleAdditiveSummary& GetAdditiveSummary() const {
+ if (!AdditiveSummaryCache) {
+ RebuildAdditiveMetrics();
}
- return *SummaryCache;
+ return *AdditiveSummaryCache;
}
TCompactionPriority GetCompactionPriority() const {
- return TCompactionPriority(CompactionPriorityInfo, GetSummary());
+ return TCompactionPriority(CompactionPriorityInfo, GetAdditiveSummary());
}
bool NeedCompaction(const TCompactionLimits& limits) const {
@@ -252,6 +294,14 @@ public:
return it->second;
}
+ const TPortionInfo* GetPortionPointer(const ui64 portion) const {
+ auto it = Portions.find(portion);
+ if (it == Portions.end()) {
+ return nullptr;
+ }
+ return &it->second;
+ }
+
bool ErasePortion(const ui64 portion);
explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner)