aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-05 19:04:21 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-05 19:50:21 +0300
commit4b6ad89d8b3d7e73e4286ee2a5bc63fc22f4a578 (patch)
tree15049f86ba068f9dc3a68ce2c1e1f28d16ee9db1
parentd88f74d6fb31cb53b756b784efd505222c7f4b5f (diff)
downloadydb-4b6ad89d8b3d7e73e4286ee2a5bc63fc22f4a578.tar.gz
KIKIMR-19211: special wave-optimizer (level in future)
-rw-r--r--.mapping.json15
-rw-r--r--ydb/core/formats/arrow/replace_key.h14
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp44
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/write.cpp6
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/write.h12
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp149
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.h4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.h4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp30
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp36
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt18
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt18
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt18
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt18
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp)0
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h113
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make14
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.h145
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h98
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp212
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h)52
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make16
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp342
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt23
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h93
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h517
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make15
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h59
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/ya.make12
-rw-r--r--ydb/core/tx/columnshard/engines/storage/storage.h3
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp16
62 files changed, 1929 insertions, 667 deletions
diff --git a/.mapping.json b/.mapping.json
index 8c7d862863b..7d001aaae85 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -5366,6 +5366,21 @@
"ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt":"",
"ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.txt":"",
"ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt":"",
"ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt":"",
"ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h
index ede577eb68f..9894195ab60 100644
--- a/ydb/core/formats/arrow/replace_key.h
+++ b/ydb/core/formats/arrow/replace_key.h
@@ -53,7 +53,7 @@ public:
: Columns(columns)
, Position(position)
{
- Y_VERIFY_DEBUG(Size() > 0 && Position < (ui64)Column(0).length());
+ Y_VERIFY(Size() > 0 && Position < (ui64)Column(0).length());
}
template<typename T = TArrayVecPtr> requires IsOwning
@@ -61,7 +61,7 @@ public:
: Columns(std::make_shared<TArrayVec>(std::move(columns)))
, Position(position)
{
- Y_VERIFY_DEBUG(Size() > 0 && Position < (ui64)Column(0).length());
+ Y_VERIFY(Size() > 0 && Position < (ui64)Column(0).length());
}
size_t Hash() const {
@@ -70,7 +70,7 @@ public:
template<typename T>
bool operator == (const TReplaceKeyTemplate<T>& key) const {
- Y_VERIFY_DEBUG(Size() == key.Size());
+ Y_VERIFY(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
auto cmp = CompareColumnValue(i, key, i);
@@ -83,7 +83,7 @@ public:
template<typename T>
std::partial_ordering operator <=> (const TReplaceKeyTemplate<T>& key) const {
- Y_VERIFY_DEBUG(Size() == key.Size());
+ Y_VERIFY(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
auto cmp = CompareColumnValue(i, key, i);
@@ -96,7 +96,7 @@ public:
template<typename T>
std::partial_ordering CompareNotNull(const TReplaceKeyTemplate<T>& key) const {
- Y_VERIFY_DEBUG(Size() == key.Size());
+ Y_VERIFY(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
auto cmp = CompareColumnValueNotNull(i, key, i);
@@ -109,8 +109,8 @@ public:
template<typename T>
std::partial_ordering ComparePartNotNull(const TReplaceKeyTemplate<T>& key, int size) const {
- Y_VERIFY_DEBUG(size <= key.Size());
- Y_VERIFY_DEBUG(size <= Size());
+ Y_VERIFY(size <= key.Size());
+ Y_VERIFY(size <= Size());
for (int i = 0; i < size; ++i) {
auto cmp = CompareColumnValueNotNull(i, key, i);
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index c6ad1b82fa8..35745d5b376 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -19,6 +19,13 @@ TLogoBlobID ParseLogoBlobId(TString blobId) {
}
struct TBlobBatch::TBatchInfo : TNonCopyable {
+private:
+ std::vector<TUnifiedBlobId> BlobIds;
+public:
+ const std::vector<TUnifiedBlobId>& GetBlobIds() const {
+ return BlobIds;
+ }
+
TIntrusivePtr<TTabletStorageInfo> TabletInfo;
TAllocatedGenStepConstPtr GenStepRef;
const TBlobsManagerCounters Counters;
@@ -26,7 +33,6 @@ struct TBlobBatch::TBatchInfo : TNonCopyable {
const ui32 Step;
const ui32 Channel;
- std::vector<ui32> BlobSizes;
std::vector<bool> InFlight;
i32 InFlightCount;
ui64 TotalSizeBytes;
@@ -42,18 +48,15 @@ struct TBlobBatch::TBatchInfo : TNonCopyable {
, TotalSizeBytes(0) {
}
- TUnifiedBlobId NextBlobId(ui32 blobSize) {
- BlobSizes.push_back(blobSize);
+ TUnifiedBlobId NextBlobId(const ui32 blobSize) {
InFlight.push_back(true);
++InFlightCount;
TotalSizeBytes += blobSize;
- return MakeBlobId(BlobSizes.size() - 1);
- }
- TUnifiedBlobId MakeBlobId(ui32 i) const {
- Y_VERIFY(i < BlobSizes.size());
const ui32 dsGroup = TabletInfo->GroupFor(Channel, Gen);
- return TUnifiedBlobId(dsGroup, TLogoBlobID(TabletInfo->TabletID, Gen, Step, Channel, BlobSizes[i], i));
+ TUnifiedBlobId nextBlobId(dsGroup, TLogoBlobID(TabletInfo->TabletID, Gen, Step, Channel, blobSize, BlobIds.size()));
+ BlobIds.emplace_back(std::move(nextBlobId));
+ return BlobIds.back();
}
};
@@ -90,6 +93,7 @@ void TBlobBatch::OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto
BatchInfo->Counters.OnPutResult(blobId.BlobSize());
Y_VERIFY(status == NKikimrProto::OK, "The caller must handle unsuccessful status");
Y_VERIFY(BatchInfo);
+ Y_VERIFY(blobId.Cookie() < BatchInfo->InFlight.size());
Y_VERIFY(BatchInfo->InFlight[blobId.Cookie()], "Blob %s is already acked!", blobId.ToString().c_str());
BatchInfo->InFlight[blobId.Cookie()] = false;
@@ -104,7 +108,7 @@ bool TBlobBatch::AllBlobWritesCompleted() const {
ui64 TBlobBatch::GetBlobCount() const {
if (BatchInfo) {
- return BatchInfo->BlobSizes.size();
+ return BatchInfo->GetBlobIds().size();
}
return 0;
}
@@ -217,7 +221,7 @@ TGenStep TBlobManager::FindNewGCBarrier() {
std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTask(const TString& storageId, const std::shared_ptr<TBlobManager>& manager) {
if (BlobsToKeep.empty() && BlobsToDelete.empty() && LastCollectedGenStep == TGenStep{CurrentGen, CurrentStep}) {
- ACFL_DEBUG("event", "TBlobManager::NeedStorageGC skip");
+ ACFL_DEBUG("event", "TBlobManager::BuildGCTask skip")("current_gen", CurrentGen)("current_step", CurrentStep);
return nullptr;
}
@@ -231,7 +235,7 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui
NOlap::NBlobOperations::NBlobStorage::TGCTask::TGCListsByGroup perGroupGCListsInFlight;
- // Clear all possibly not keeped trash in channel's groups: create an event for each group
+ // Clear all possibly not kept trash in channel's groups: create an event for each group
if (FirstGC) {
FirstGC = false;
@@ -256,6 +260,7 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui
}
ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation());
perGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc", *keepBlobIt);
}
BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt);
BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
@@ -267,10 +272,12 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui
if (genStep > newCollectGenStep) {
break;
}
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", *blobIt);
ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation());
NOlap::NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[blobGroup];
bool skipDontKeep = false;
if (gl.KeepList.erase(*blobIt)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc_remove", *blobIt);
// Skipped blobs still need to be deleted from BlobsToKeep table
keepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt));
@@ -317,21 +324,21 @@ void TBlobManager::DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) {
LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID
<< " Save Batch GenStep: " << blobBatch.BatchInfo->Gen << ":" << blobBatch.BatchInfo->Step
- << " Blob count: " << blobBatch.BatchInfo->BlobSizes.size());
+ << " Blob count: " << blobBatch.BatchInfo->GetBlobIds().size());
// Add this batch to KeepQueue
TGenStep edgeGenStep = EdgeGenStep();
- for (ui32 i = 0; i < blobBatch.BatchInfo->BlobSizes.size(); ++i) {
- const TUnifiedBlobId blobId = blobBatch.BatchInfo->MakeBlobId(i);
+ for (auto&& blobId: blobBatch.BatchInfo->GetBlobIds()) {
Y_VERIFY_DEBUG(blobId.IsDsBlob(), "Not a DS blob id: %s", blobId.ToStringNew().c_str());
- auto logoblobId = blobId.GetLogoBlobId();
- TGenStep genStep{logoblobId.Generation(), logoblobId.Step()};
+ auto logoBlobId = blobId.GetLogoBlobId();
+ TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()};
AFL_VERIFY(genStep > edgeGenStep)("gen_step", genStep)("edge_gen_step", edgeGenStep)("blob_id", blobId.ToStringNew());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep", logoBlobId.ToString());
- BlobsManagerCounters.OnKeepMarker(logoblobId.BlobSize());
- BlobsToKeep.insert(std::move(logoblobId));
+ BlobsManagerCounters.OnKeepMarker(logoBlobId.BlobSize());
+ BlobsToKeep.insert(std::move(logoBlobId));
db.AddBlobToKeep(blobId);
}
BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
@@ -344,6 +351,7 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db)
// Persist deletion intent
db.AddBlobToDelete(blobId);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete", blobId);
// Check if the deletion needs to be delayed until the blob is no longer
// used by in-flight requests
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
index 301ca21deac..5604d9d98b5 100644
--- a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
@@ -11,11 +11,13 @@ TWriteCounters::TWriteCounters(const TConsumerCounters& owner)
RepliesCount = TBase::GetDeriviative("Replies/Count");
ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
- ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000));
+ ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1));
+ ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1));
FailsCount = TBase::GetDeriviative("Fails/Count");
FailBytes = TBase::GetDeriviative("Fails/Bytes");
- FailDuration = TBase::GetHistogram("Fails/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000));
+ FailDurationBySize = TBase::GetHistogram("Fails/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 2));
+ FailDurationByCount = TBase::GetHistogram("Fails/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 2));
}
}
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.h b/ydb/core/tx/columnshard/blobs_action/counters/write.h
index 88c474a3fe1..591937f0de2 100644
--- a/ydb/core/tx/columnshard/blobs_action/counters/write.h
+++ b/ydb/core/tx/columnshard/blobs_action/counters/write.h
@@ -14,11 +14,13 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr RepliesCount;
NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
- NMonitoring::THistogramPtr ReplyDuration;
+ NMonitoring::THistogramPtr ReplyDurationByCount;
+ NMonitoring::THistogramPtr ReplyDurationBySize;
NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
NMonitoring::TDynamicCounters::TCounterPtr FailBytes;
- NMonitoring::THistogramPtr FailDuration;
+ NMonitoring::THistogramPtr FailDurationByCount;
+ NMonitoring::THistogramPtr FailDurationBySize;
public:
TWriteCounters(const TConsumerCounters& owner);
@@ -30,13 +32,15 @@ public:
void OnReply(const ui64 bytes, const TDuration d) const {
RepliesCount->Add(1);
ReplyBytes->Add(bytes);
- ReplyDuration->Collect(d.MilliSeconds());
+ ReplyDurationByCount->Collect((i64)d.MilliSeconds());
+ ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
}
void OnFail(const ui64 bytes, const TDuration d) const {
FailsCount->Add(1);
FailBytes->Add(bytes);
- FailDuration->Collect(d.MilliSeconds());
+ FailDurationByCount->Collect((i64)d.MilliSeconds());
+ FailDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
}
};
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
index 8ab7fedda1b..35907ba3e4d 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
@@ -5,12 +5,12 @@
namespace NKikimr::NColumnShard {
bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) {
- TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID()));
+ auto changes = Ev->Get()->IndexChanges;
+ TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("external_task_id", changes->GetTaskIdentifier());
Y_VERIFY(Self->InsertTable);
Y_VERIFY(Self->TablesManager.HasPrimaryIndex());
txc.DB.NoMoreReadsForTx();
- auto changes = Ev->Get()->IndexChanges;
ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", *changes);
if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) {
NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
index b7d50f84cf4..c539d179533 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
@@ -75,7 +75,7 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T
NeedGranuleStatusProvide = false;
}
-TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext)
+TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext)
: TBase(limits.GetSplitSettings(), saverContext, StaticTypeName())
, Limits(limits)
, GranuleMeta(granule)
@@ -83,10 +83,10 @@ TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits
Y_VERIFY(GranuleMeta);
SwitchedPortions.reserve(portions.size());
- for (const auto& [_, portionInfo] : portions) {
+ for (const auto& portionInfo : portions) {
Y_VERIFY(portionInfo->IsActive());
SwitchedPortions.emplace_back(*portionInfo);
- PortionsToRemove.emplace_back(*portionInfo);
+ AFL_VERIFY(PortionsToRemove.emplace(portionInfo->GetAddress(), *portionInfo).second);
Y_VERIFY(portionInfo->GetGranule() == GranuleMeta->GetGranuleId());
}
Y_VERIFY(SwitchedPortions.size());
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h
index fcba1e97cf0..f22c85e124e 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.h
@@ -32,7 +32,7 @@ public:
virtual THashSet<TPortionAddress> GetTouchedPortions() const override;
- TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext);
+ TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext);
~TCompactColumnEngineChanges();
static TString StaticTypeName() {
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index 6a137f7748f..17da977f5e9 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -37,7 +37,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
pkFieldNamesSet.emplace(i);
}
- std::shared_ptr<arrow::RecordBatch> batchResult;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
{
arrow::FieldVector indexFields;
indexFields.emplace_back(portionIdField);
@@ -62,51 +62,53 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey()));
mergeStream.AddPoolSource({}, batch, nullptr);
}
-
- NIndexedReader::TRecordBatchBuilder indexesBuilder(indexFields);
- mergeStream.DrainAll(indexesBuilder);
- batchResult = indexesBuilder.Finalize();
+ batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields, true);
}
-
- auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
- auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
- auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
- auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
- Y_VERIFY(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
- Y_VERIFY(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
- Y_VERIFY(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
- Y_VERIFY(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
- Y_VERIFY(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
- const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
- const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
-
- const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / 10000 + 1) + 1;
+ Y_VERIFY(batchResults.size());
TSerializationStats stats;
for (auto&& i : SwitchedPortions) {
stats.Merge(i.GetSerializationStat(*resultSchema));
}
- std::map<std::string, std::vector<TColumnPortionResult>> columnChunks;
-
+ std::vector<std::map<std::string, std::vector<TColumnPortionResult>>> chunkGroups;
+ chunkGroups.resize(batchResults.size());
for (auto&& f : resultSchema->GetSchema()->fields()) {
const ui32 columnId = resultSchema->GetColumnId(f->name());
auto columnInfo = stats.GetColumnInfo(columnId);
Y_VERIFY(columnInfo);
- TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext);
- TMergedColumn mColumn(context);
- {
-// auto c = batchResult->GetColumnByName(f->name());
-// AFL_VERIFY(!c);
+
+ std::vector<TPortionColumnCursor> cursors;
+ auto loader = resultSchema->GetColumnLoader(f->name());
+ for (auto&& p : portions) {
+ std::vector<const TColumnRecord*> records;
+ std::vector<IPortionColumnChunk::TPtr> chunks;
+ p.ExtractColumnChunks(columnId, records, chunks);
+ cursors.emplace_back(TPortionColumnCursor(chunks, records, loader));
+ }
+
+ ui32 batchesRecordsCount = 0;
+ ui32 columnRecordsCount = 0;
+ std::map<std::string, std::vector<TColumnPortionResult>> columnChunks;
+ ui32 batchIdx = 0;
+ for (auto&& batchResult : batchResults) {
+ const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / 10000 + 1) + 1;
+ TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext);
+ TMergedColumn mColumn(context);
+
+ auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
+ auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
+ auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
+ auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
+ Y_VERIFY(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
+ Y_VERIFY(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
+ Y_VERIFY(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
+ Y_VERIFY(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
+ Y_VERIFY(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
+ const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
+ const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
+
AFL_VERIFY(batchResult->num_rows() == pIdxArray.length());
- std::vector<TPortionColumnCursor> cursors;
- auto loader = resultSchema->GetColumnLoader(f->name());
- for (auto&& p : portions) {
- std::vector<const TColumnRecord*> records;
- std::vector<IPortionColumnChunk::TPtr> chunks;
- p.ExtractColumnChunks(columnId, records, chunks);
- cursors.emplace_back(TPortionColumnCursor(chunks, records, loader));
- }
std::optional<ui16> predPortionIdx;
for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
const ui16 portionIdx = pIdxArray.Value(idx);
@@ -121,46 +123,54 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
}
predPortionIdx = portionIdx;
}
+ chunkGroups[batchIdx][f->name()] = mColumn.BuildResult();
+ batchesRecordsCount += batchResult->num_rows();
+ columnRecordsCount += mColumn.GetRecordsCount();
+ ++batchIdx;
}
- AFL_VERIFY(mColumn.GetRecordsCount() == batchResult->num_rows())("f_name", f->name())("mCount", mColumn.GetRecordsCount())("bCount", batchResult->num_rows());
- columnChunks[f->name()] = mColumn.BuildResult();
- }
-
- Y_VERIFY(columnChunks.size());
+ AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("f_name", f->name())("mCount", columnRecordsCount)("bCount", batchesRecordsCount);
- for (auto&& i : columnChunks) {
- if (i.second.size() != columnChunks.begin()->second.size()) {
- for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())("p", i.second[p].DebugString());
+ }
+ ui32 batchIdx = 0;
+ for (auto&& columnChunks : chunkGroups) {
+ auto batchResult = batchResults[batchIdx];
+ ++batchIdx;
+ Y_VERIFY(columnChunks.size());
+
+ for (auto&& i : columnChunks) {
+ if (i.second.size() != columnChunks.begin()->second.size()) {
+ for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())("p", i.second[p].DebugString());
+ }
}
+ AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())("current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
}
- AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())("current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
- }
- std::vector<TGeneralSerializedSlice> batchSlices;
- std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, std::move(stats)));
+ std::vector<TGeneralSerializedSlice> batchSlices;
+ std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, std::move(stats)));
- for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) {
- std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns;
- for (auto&& p : columnChunks) {
- portionColumns.emplace(resultSchema->GetColumnId(p.first), p.second[i].GetChunks());
+ for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) {
+ std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns;
+ for (auto&& p : columnChunks) {
+ portionColumns.emplace(resultSchema->GetColumnId(p.first), p.second[i].GetChunks());
+ }
+ batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings());
}
- batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings());
- }
- TSimilarSlicer slicer(4 * 1024 * 1024);
- auto packs = slicer.Split(batchSlices);
-
- ui32 recordIdx = 0;
- for (auto&& i : packs) {
- TGeneralSerializedSlice slice(std::move(i));
- auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
- std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
- AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator()));
- NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
- NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
- AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName());
- recordIdx += slice.GetRecordsCount();
+ TSimilarSlicer slicer(4 * 1024 * 1024);
+ auto packs = slicer.Split(batchSlices);
+
+ ui32 recordIdx = 0;
+ for (auto&& i : packs) {
+ TGeneralSerializedSlice slice(std::move(i));
+ auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
+ std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
+ AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator()));
+ NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
+ NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
+ AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName());
+ recordIdx += slice.GetRecordsCount();
+ }
}
if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
TStringBuilder sbSwitched;
@@ -198,4 +208,11 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter
return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL;
}
+void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position) {
+ if (CheckPoints.size()) {
+ AFL_VERIFY(CheckPoints.back().Compare(position) == std::partial_ordering::less);
+ }
+ CheckPoints.emplace_back(position);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
index c246efe9ff4..2a5309646c0 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
@@ -1,5 +1,6 @@
#pragma once
#include "compaction.h"
+#include <ydb/core/formats/arrow/reader/read_filter_merger.h>
namespace NKikimr::NOlap::NCompaction {
@@ -7,6 +8,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
private:
using TBase = TCompactColumnEngineChanges;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
+ std::vector<NIndexedReader::TSortableBatchPosition> CheckPoints;
protected:
virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
virtual TPortionMeta::EProduced GetResultProducedClass() const override {
@@ -17,6 +19,8 @@ protected:
public:
using TBase::TBase;
+ void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position);
+
virtual TString TypeString() const override {
return StaticTypeName();
}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 2d3be5d8839..b280e7a1b5c 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -47,7 +47,7 @@ void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
for (size_t i = 0; i < DataToIndex.size(); ++i) {
const auto& insertedData = DataToIndex[i];
Y_VERIFY(insertedData.GetBlobRange().IsFullBlob());
- reading->AddRange(insertedData.GetBlobRange());
+ reading->AddRange(insertedData.GetBlobRange(), insertedData.GetBlobData().value_or(""));
removing->DeclareRemove(insertedData.GetBlobRange().GetBlobId());
}
@@ -90,26 +90,25 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
Y_VERIFY(indexInfo.IsSorted());
std::shared_ptr<arrow::RecordBatch> batch;
- if (auto* blobData = Blobs.FindPtr(blobRange)) {
- Y_VERIFY(!blobData->empty(), "Blob data not present");
+ {
+ auto itBlobData = Blobs.find(blobRange);
+ Y_VERIFY(itBlobData != Blobs.end(), "Data for range %s has not been read", blobRange.ToString().c_str());
+ Y_VERIFY(!itBlobData->second.empty(), "Blob data not present");
// Prepare batch
- batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema());
- if (!batch) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)
- ("event", "cannot_parse")
- ("data_snapshot", TStringBuilder() << inserted.GetSnapshot())
- ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot());
- }
- } else {
- Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str());
+ batch = NArrow::DeserializeBatch(itBlobData->second, indexInfo.ArrowSchema());
+ Blobs.erase(itBlobData);
+ AFL_VERIFY(batch)("event", "cannot_parse")
+ ("data_snapshot", TStringBuilder() << inserted.GetSnapshot())
+ ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot());
+ ;
}
- Y_VERIFY(batch);
batch = AddSpecials(batch, blobSchema->GetIndexInfo(), inserted);
batch = resultSchema->NormalizeBatch(*blobSchema, batch);
pathBatches[inserted.PathId].push_back(batch);
Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey()));
}
+ Y_VERIFY(Blobs.empty());
for (auto& [pathId, batches] : pathBatches) {
AddPathIfNotExists(pathId);
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index 33dd6d78245..f847bf75faa 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -22,7 +22,6 @@ void TTTLColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
Y_VERIFY(PortionsToEvict.size() || PortionsToRemove.size());
for (const auto& p : PortionsToEvict) {
Y_VERIFY(!p.GetPortionInfo().Empty());
- PortionsToRemove.emplace_back(p.GetPortionInfo());
auto agent = BlobsAction.GetReading(p.GetPortionInfo());
for (const auto& rec : p.GetPortionInfo().Records) {
@@ -73,7 +72,8 @@ NKikimr::TConclusionStatus TTTLColumnEngineChanges::DoConstructBlobs(TConstructi
for (auto&& info : PortionsToEvict) {
if (auto pwb = UpdateEvictedPortion(info, Blobs, context)) {
- PortionsToRemove.emplace_back(info.GetPortionInfo());
+ info.MutablePortionInfo().SetRemoveSnapshot(info.MutablePortionInfo().GetMinSnapshot());
+ AFL_VERIFY(PortionsToRemove.emplace(info.GetPortionInfo().GetAddress(), info.GetPortionInfo()).second);
AppendedPortions.emplace_back(std::move(*pwb));
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h
index 6e7208b8512..2c93c4d1a80 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.h
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.h
@@ -33,6 +33,10 @@ private:
const TPortionInfo& GetPortionInfo() const {
return PortionInfo;
}
+
+ TPortionInfo& MutablePortionInfo() {
+ return PortionInfo;
+ }
};
std::optional<TPortionInfoWithBlobs> UpdateEvictedPortion(TPortionForEviction& info, const THashMap<TBlobRange, TString>& srcBlobs,
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index e60288c7fdc..d69469db795 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -41,7 +41,7 @@ void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIn
self.IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size());
THashSet<TUnifiedBlobId> blobsDeactivated;
- for (auto& portionInfo : PortionsToRemove) {
+ for (auto& [_, portionInfo] : PortionsToRemove) {
for (auto& rec : portionInfo.Records) {
blobsDeactivated.insert(rec.BlobRange.BlobId);
}
@@ -84,7 +84,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
}
auto g = self.GranulesStorage->StartPackModification();
- for (auto& portionInfo : PortionsToRemove) {
+ for (auto& [_, portionInfo] : PortionsToRemove) {
Y_VERIFY(!portionInfo.Empty());
Y_VERIFY(!portionInfo.IsActive());
@@ -100,7 +100,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
}
}
- for (auto& portionInfo : PortionsToRemove) {
+ for (auto& [_, portionInfo] : PortionsToRemove) {
self.CleanupPortions[portionInfo.GetRemoveSnapshot()].emplace_back(portionInfo);
}
@@ -112,9 +112,10 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
i.GetPortionInfo().SetPortion(context.NextPortionId());
i.GetPortionInfo().UpdateRecordsMeta(TPortionMeta::EProduced::INSERTED);
}
- for (auto& portionInfo : PortionsToRemove) {
- Y_VERIFY(portionInfo.IsActive());
- portionInfo.SetRemoveSnapshot(context.GetSnapshot());
+ for (auto& [_, portionInfo] : PortionsToRemove) {
+ if (portionInfo.IsActive()) {
+ portionInfo.SetRemoveSnapshot(context.GetSnapshot());
+ }
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index f107eb637e8..8a454a3d5f1 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -39,12 +39,12 @@ public:
virtual THashSet<TPortionAddress> GetTouchedPortions() const override {
THashSet<TPortionAddress> result;
for (auto&& i : PortionsToRemove) {
- result.emplace(i.GetAddress());
+ result.emplace(i.first);
}
return result;
}
- std::vector<TPortionInfo> PortionsToRemove;
+ THashMap<TPortionAddress, TPortionInfo> PortionsToRemove;
std::vector<TPortionInfoWithBlobs> AppendedPortions;
THashMap<ui64, std::pair<ui64, TMark>> NewGranules;
ui64 FirstGranuleId = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index fd358b9232c..2f70988033d 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -449,7 +449,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
if (!keep && context.AllowDrop) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString());
dropBlobs += info->NumBlobs();
- context.Changes->PortionsToRemove.emplace_back(*info);
+ AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second);
SignalCounters.OnPortionToDrop(info->BlobsBytes());
}
} else {
@@ -573,7 +573,7 @@ void TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec) {
const TMark mark(rec.Mark);
AFL_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second)("event", "marker_duplication")("granule_id", rec.Granule)("old_granule_id", PathGranules[rec.PathId][mark]);
- AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters())).second)("event", "granule_duplication")
+ AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second)("event", "granule_duplication")
("rec_path_id", rec.PathId)("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString());
}
@@ -583,7 +583,7 @@ std::optional<ui64> TColumnEngineForLogs::NewGranule(const TGranuleRecord& rec)
auto insertInfo = PathGranules[rec.PathId].emplace(mark, rec.Granule);
if (insertInfo.second) {
- AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters())).second)("event", "granule_duplication")
+ AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second)("event", "granule_duplication")
("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString());
return {};
} else {
@@ -689,17 +689,21 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
Y_VERIFY(spg);
bool granuleHasDataForSnaphsot = false;
- std::vector<std::shared_ptr<TPortionInfo>> orderedPortions = spg->GroupOrderedPortionsByPK(snapshot);
- for (const auto& portionInfo : orderedPortions) {
- Y_VERIFY(portionInfo->Produced());
- const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo());
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")
- ("granule", granule)("portion", portionInfo->DebugString());
- if (skipPortion) {
- continue;
+ for (const auto& [_, keyPortions] : spg->GroupOrderedPortionsByPK()) {
+ for (auto&& [_, portionInfo] : keyPortions) {
+ if (!portionInfo->IsVisible(snapshot)) {
+ continue;
+ }
+ Y_VERIFY(portionInfo->Produced());
+ const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")
+ ("granule", granule)("portion", portionInfo->DebugString());
+ if (skipPortion) {
+ continue;
+ }
+ out->PortionsOrderedPK.emplace_back(portionInfo);
+ granuleHasDataForSnaphsot = true;
}
- out->PortionsOrderedPK.emplace_back(portionInfo);
- granuleHasDataForSnaphsot = true;
}
if (granuleHasDataForSnaphsot) {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
index 91d4574890e..b15ce2015b8 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
@@ -27,7 +27,6 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, TPlainR
bool TScanHead::BuildNextInterval() {
while (BorderPoints.size()) {
-// Y_VERIFY(FrontEnds.size());
auto position = BorderPoints.begin()->first;
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
const bool isIncludeStart = CurrentSegments.empty();
@@ -49,17 +48,6 @@ bool TScanHead::BuildNextInterval() {
AFL_VERIFY(CurrentSegments.erase(i->GetSourceIdx()))("idx", i->GetSourceIdx());
}
-// const bool isFirstFinished = (position == *FrontEnds.begin());
-// if (firstBorderPointInfo.GetFinishSources().size()) {
-// Y_VERIFY(isFirstFinished);
-// Y_VERIFY(FrontEnds.erase(position));
-// } else {
-// Y_VERIFY(!FrontEnds.erase(position));
-// }
-
-// if (isFirstFinished) {
-// DrainSources();
-// }
CurrentStart = BorderPoints.begin()->first;
BorderPoints.erase(BorderPoints.begin());
if (CurrentSegments.size()) {
@@ -87,9 +75,8 @@ void TScanHead::DrainResults() {
}
void TScanHead::DrainSources() {
- while (Sources.size()/* && (FrontEnds.empty() || Sources.front()->GetStart().Compare(*FrontEnds.begin()) != std::partial_ordering::greater)*/) {
+ while (Sources.size()) {
auto source = Sources.front();
-// FrontEnds.emplace(source->GetFinish());
BorderPoints[source->GetStart()].AddStart(source);
BorderPoints[source->GetFinish()].AddFinish(source);
Sources.pop_front();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
index 9c0525252ab..7e739988b33 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
@@ -26,7 +26,6 @@ private:
std::deque<std::shared_ptr<IDataSource>> Sources;
std::vector<std::shared_ptr<arrow::Field>> ResultFields;
THashMap<ui32, std::shared_ptr<IDataSource>> SourceByIdx;
-// std::set<NIndexedReader::TSortableBatchPosition> FrontEnds;
std::map<NIndexedReader::TSortableBatchPosition, TDataSourceEndpoint> BorderPoints;
std::map<ui32, std::shared_ptr<IDataSource>> CurrentSegments;
std::optional<NIndexedReader::TSortableBatchPosition> CurrentStart;
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
index ef71c3366ab..cd30ee433e0 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -111,7 +111,8 @@ std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition(
while (SortHeap.size() && (isFirst || result.Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) {
auto& anotherIterator = SortHeap.front();
if (!isFirst) {
- Y_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater);
+ AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
+ ("key", result.DebugJson());
}
NextInHeap(true);
isFirst = false;
@@ -122,6 +123,27 @@ std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition(
return result;
}
+std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::vector<TSortableBatchPosition>& positions,
+ const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions)
+{
+ std::vector<std::shared_ptr<arrow::RecordBatch>> result;
+ for (auto&& i : positions) {
+ NIndexedReader::TRecordBatchBuilder indexesBuilder(resultFields);
+ DrainCurrentTo(indexesBuilder, i, includePositions);
+ result.emplace_back(indexesBuilder.Finalize());
+ if (result.back()->num_rows() == 0) {
+ result.pop_back();
+ }
+ }
+ NIndexedReader::TRecordBatchBuilder indexesBuilder(resultFields);
+ DrainAll(indexesBuilder);
+ result.emplace_back(indexesBuilder.Finalize());
+ if (result.back()->num_rows() == 0) {
+ result.pop_back();
+ }
+ return result;
+}
+
NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
NJson::TJsonValue result;
result["is_cp"] = IsControlPoint();
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index ce2bebf5b0c..9fa3888e203 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -220,6 +220,8 @@ public:
bool DrainAll(TRecordBatchBuilder& builder);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish);
+ std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::vector<TSortableBatchPosition>& positions,
+ const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions);
};
class TRecordBatchBuilder {
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 84bd52f6622..37649297d59 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -1,7 +1,8 @@
#include "granule.h"
#include "storage.h"
#include <library/cpp/actors/core/log.h>
-#include "optimizer/intervals_optimizer.h"
+#include "optimizer/intervals/optimizer.h"
+#include "optimizer/levels/optimizer.h"
namespace NKikimr::NOlap {
@@ -33,7 +34,7 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
OnBeforeChangePortion(it->second);
it->second = std::make_shared<TPortionInfo>(info);
}
- OnAfterChangePortion(it->second);
+ OnAfterChangePortion(it->second, nullptr);
}
bool TGranuleMeta::ErasePortion(const ui64 portion) {
@@ -46,7 +47,7 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) {
}
OnBeforeChangePortion(it->second);
Portions.erase(it);
- OnAfterChangePortion(nullptr);
+ OnAfterChangePortion(nullptr, nullptr);
return true;
}
@@ -59,7 +60,7 @@ void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionIn
portionNew->AddRecord(indexInfo, rec, portionMeta);
it = Portions.emplace(portion.GetPortion(), portionNew).first;
} else {
- Y_VERIFY(it->second->IsEqualWithSnapshots(portion));
+ AFL_VERIFY(it->second->IsEqualWithSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString());
it->second->AddRecord(indexInfo, rec, portionMeta);
}
if (portionMeta) {
@@ -67,8 +68,10 @@ void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionIn
}
}
-void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter) {
+void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard) {
if (portionAfter) {
+ AFL_VERIFY(PortionsByPK[portionAfter->IndexKeyStart()].emplace(portionAfter->GetPortion(), portionAfter).second);
+
THashMap<TUnifiedBlobId, ui64> blobIdSize;
for (auto&& i : portionAfter->Records) {
blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size;
@@ -77,7 +80,11 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port
PortionInfoGuard.OnNewBlob(portionAfter->IsActive() ? portionAfter->GetMeta().Produced : NPortion::EProduced::INACTIVE, i.second);
}
if (portionAfter->IsActive()) {
- OptimizerPlanner->AddPortion(portionAfter);
+ if (modificationGuard) {
+ modificationGuard->AddPortion(portionAfter);
+ } else {
+ OptimizerPlanner->StartModificationGuard().AddPortion(portionAfter);
+ }
}
}
if (!!AdditiveSummaryCache) {
@@ -93,6 +100,17 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port
void TGranuleMeta::OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore) {
if (portionBefore) {
+ {
+ auto itByKey = PortionsByPK.find(portionBefore->IndexKeyStart());
+ Y_VERIFY(itByKey != PortionsByPK.end());
+ auto itPortion = itByKey->second.find(portionBefore->GetPortion());
+ Y_VERIFY(itPortion != itByKey->second.end());
+ itByKey->second.erase(itPortion);
+ if (itByKey->second.empty()) {
+ PortionsByPK.erase(itByKey);
+ }
+ }
+
THashMap<TUnifiedBlobId, ui64> blobIdSize;
for (auto&& i : portionBefore->Records) {
blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size;
@@ -101,7 +119,7 @@ void TGranuleMeta::OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> por
PortionInfoGuard.OnDropBlob(portionBefore->IsActive() ? portionBefore->GetMeta().Produced : NPortion::EProduced::INACTIVE, i.second);
}
if (portionBefore->IsActive()) {
- OptimizerPlanner->RemovePortion(portionBefore);
+ OptimizerPlanner->StartModificationGuard().RemovePortion(portionBefore);
}
}
if (!!AdditiveSummaryCache) {
@@ -153,14 +171,14 @@ const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary(
return *AdditiveSummaryCache;
}
-TGranuleMeta::TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters)
+TGranuleMeta::TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex)
: Owner(owner)
, Counters(counters)
, PortionInfoGuard(Owner->GetCounters().BuildPortionBlobsGuard())
, Record(rec)
{
Y_VERIFY(Owner);
- OptimizerPlanner = std::make_shared<NStorageOptimizer::TIntervalsOptimizerPlanner>(rec.Granule, owner->GetStoragesManager());
+ OptimizerPlanner = std::make_shared<NStorageOptimizer::NLevels::TLevelsOptimizerPlanner>(rec.Granule, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey());
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 955c2a83dd0..c9381575629 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -3,7 +3,7 @@
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/engines/portion_info.h>
-#include "optimizer/optimizer.h"
+#include "optimizer/abstract/optimizer.h"
namespace NKikimr::NOlap {
@@ -150,10 +150,10 @@ public:
class TCompactionPriority {
private:
- i64 Weight = 0;
+ NStorageOptimizer::TOptimizationPriority Weight;
TMonotonic ConstructionInstant = TMonotonic::Now();
public:
- i64 GetWeight() const {
+ const NStorageOptimizer::TOptimizationPriority& GetWeight() const {
return Weight;
}
@@ -163,11 +163,11 @@ public:
}
bool operator<(const TCompactionPriority& item) const {
- return std::tie(Weight) < std::tie(item.Weight);
+ return Weight < item.Weight;
}
TString DebugString() const {
- return TStringBuilder() << "summary:(" << Weight << ");";
+ return TStringBuilder() << "summary:(" << Weight.DebugString() << ");";
}
};
@@ -192,22 +192,24 @@ private:
const NColumnShard::TGranuleDataCounters Counters;
NColumnShard::TEngineLogsCounters::TPortionsInfoGuard PortionInfoGuard;
std::shared_ptr<NStorageOptimizer::IOptimizerPlanner> OptimizerPlanner;
+ std::map<NArrow::TReplaceKey, THashMap<ui64, std::shared_ptr<TPortionInfo>>> PortionsByPK;
void OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore);
- void OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter);
+ void OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard);
void OnAdditiveSummaryChange() const;
public:
std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> self, const THashSet<TPortionAddress>& busyPortions) const {
return OptimizerPlanner->GetOptimizationTask(limits, self, busyPortions);
}
- std::vector<std::shared_ptr<TPortionInfo>> GroupOrderedPortionsByPK(const TSnapshot& snapshot) const {
- return OptimizerPlanner->GetPortionsOrderedByPK(snapshot);
+ const std::map<NArrow::TReplaceKey, THashMap<ui64, std::shared_ptr<TPortionInfo>>>& GroupOrderedPortionsByPK() const {
+ return PortionsByPK;
}
void OnAfterPortionsLoad() {
+ auto g = OptimizerPlanner->StartModificationGuard();
for (auto&& i : Portions) {
- OnAfterChangePortion(i.second);
+ OnAfterChangePortion(i.second, &g);
}
}
@@ -287,7 +289,7 @@ public:
bool ErasePortion(const ui64 portion);
- explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters);
+ explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex);
ui64 GetGranuleId() const {
return Record.Granule;
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt
index 7a3a568f8c9..41da0631d39 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt
@@ -6,17 +6,15 @@
# original buildsystem will not be accepted.
+add_subdirectory(abstract)
+add_subdirectory(intervals)
+add_subdirectory(levels)
-add_library(engines-storage-optimizer)
-target_link_libraries(engines-storage-optimizer PUBLIC
+add_library(engines-storage-optimizer INTERFACE)
+target_link_libraries(engines-storage-optimizer INTERFACE
contrib-libs-cxxsupp
yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- engines-changes-abstract
-)
-target_sources(engines-storage-optimizer PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+ storage-optimizer-abstract
+ storage-optimizer-intervals
+ storage-optimizer-levels
)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt
index ef0e77c9397..a71e16d03bf 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt
@@ -6,18 +6,16 @@
# original buildsystem will not be accepted.
+add_subdirectory(abstract)
+add_subdirectory(intervals)
+add_subdirectory(levels)
-add_library(engines-storage-optimizer)
-target_link_libraries(engines-storage-optimizer PUBLIC
+add_library(engines-storage-optimizer INTERFACE)
+target_link_libraries(engines-storage-optimizer INTERFACE
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- engines-changes-abstract
-)
-target_sources(engines-storage-optimizer PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+ storage-optimizer-abstract
+ storage-optimizer-intervals
+ storage-optimizer-levels
)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt
index ef0e77c9397..a71e16d03bf 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt
@@ -6,18 +6,16 @@
# original buildsystem will not be accepted.
+add_subdirectory(abstract)
+add_subdirectory(intervals)
+add_subdirectory(levels)
-add_library(engines-storage-optimizer)
-target_link_libraries(engines-storage-optimizer PUBLIC
+add_library(engines-storage-optimizer INTERFACE)
+target_link_libraries(engines-storage-optimizer INTERFACE
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- engines-changes-abstract
-)
-target_sources(engines-storage-optimizer PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+ storage-optimizer-abstract
+ storage-optimizer-intervals
+ storage-optimizer-levels
)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt
index 7a3a568f8c9..41da0631d39 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt
@@ -6,17 +6,15 @@
# original buildsystem will not be accepted.
+add_subdirectory(abstract)
+add_subdirectory(intervals)
+add_subdirectory(levels)
-add_library(engines-storage-optimizer)
-target_link_libraries(engines-storage-optimizer PUBLIC
+add_library(engines-storage-optimizer INTERFACE)
+target_link_libraries(engines-storage-optimizer INTERFACE
contrib-libs-cxxsupp
yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- engines-changes-abstract
-)
-target_sources(engines-storage-optimizer PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+ storage-optimizer-abstract
+ storage-optimizer-intervals
+ storage-optimizer-levels
)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..2827f60579d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-abstract)
+target_link_libraries(storage-optimizer-abstract PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..dc5b2282ada
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-abstract)
+target_link_libraries(storage-optimizer-abstract PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..dc5b2282ada
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-abstract)
+target_link_libraries(storage-optimizer-abstract PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..2827f60579d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-abstract)
+target_link_libraries(storage-optimizer-abstract PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
index d3e82f8b210..d3e82f8b210 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
new file mode 100644
index 00000000000..63842757b6a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
@@ -0,0 +1,113 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <library/cpp/object_factory/object_factory.h>
+
+namespace NKikimr::NOlap {
+struct TCompactionLimits;
+class TGranuleMeta;
+class TColumnEngineChanges;
+}
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+class TOptimizationPriority {
+private:
+ YDB_READONLY(i64, Level, 0);
+ YDB_READONLY(i64, InternalLevelWeight, 0);
+ TOptimizationPriority(const i64 level, const i64 levelWeight)
+ : Level(level)
+ , InternalLevelWeight(levelWeight) {
+
+ }
+
+public:
+ bool operator<(const TOptimizationPriority& item) const {
+ return std::tie(Level, InternalLevelWeight) < std::tie(item.Level, item.InternalLevelWeight);
+ }
+
+ bool IsZero() const {
+ return !Level && !InternalLevelWeight;
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << "(" << Level << "," << InternalLevelWeight << ")";
+ }
+
+ static TOptimizationPriority Critical(const i64 weight) {
+ return TOptimizationPriority(10, weight);
+ }
+
+ static TOptimizationPriority Optimization(const i64 weight) {
+ return TOptimizationPriority(0, weight);
+ }
+
+ static TOptimizationPriority Zero() {
+ return TOptimizationPriority(0, 0);
+ }
+
+};
+
+class IOptimizerPlanner {
+private:
+ const ui64 GranuleId;
+protected:
+ virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) = 0;
+ virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const = 0;
+ virtual TOptimizationPriority DoGetUsefulMetric() const = 0;
+ virtual TString DoDebugString() const {
+ return "";
+ }
+public:
+ using TFactory = NObjectFactory::TObjectFactory<IOptimizerPlanner, TString>;
+ IOptimizerPlanner(const ui64 granuleId)
+ : GranuleId(granuleId)
+ {
+
+ }
+
+ class TModificationGuard: TNonCopyable {
+ private:
+ IOptimizerPlanner& Owner;
+ std::vector<std::shared_ptr<TPortionInfo>> AddPortions;
+ std::vector<std::shared_ptr<TPortionInfo>> RemovePortions;
+ public:
+ TModificationGuard& AddPortion(const std::shared_ptr<TPortionInfo>& portion) {
+ AddPortions.emplace_back(portion);
+ return*this;
+ }
+
+ TModificationGuard& RemovePortion(const std::shared_ptr<TPortionInfo>& portion) {
+ RemovePortions.emplace_back(portion);
+ return*this;
+ }
+
+ TModificationGuard(IOptimizerPlanner& owner)
+ : Owner(owner)
+ {
+ }
+ ~TModificationGuard() {
+ Owner.ModifyPortions(AddPortions, RemovePortions);
+ }
+ };
+
+ TModificationGuard StartModificationGuard() {
+ return TModificationGuard(*this);
+ }
+
+ virtual ~IOptimizerPlanner() = default;
+ TString DebugString() const {
+ return DoDebugString();
+ }
+
+ void ModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) {
+ NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId));
+ DoModifyPortions(add, remove);
+ }
+
+ std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const;
+ TOptimizationPriority GetUsefulMetric() const {
+ return DoGetUsefulMetric();
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make
new file mode 100644
index 00000000000..140b1ed351b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ optimizer.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/changes/abstract
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..39523c21a58
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-intervals)
+target_link_libraries(storage-optimizer-intervals PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-intervals PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..b67769e366d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-intervals)
+target_link_libraries(storage-optimizer-intervals PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-intervals PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..b67769e366d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-intervals)
+target_link_libraries(storage-optimizer-intervals PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-intervals PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..39523c21a58
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-intervals)
+target_link_libraries(storage-optimizer-intervals PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-intervals PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
new file mode 100644
index 00000000000..ea255ea852b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
@@ -0,0 +1,43 @@
+#include "blob_size.h"
+#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
+#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+std::shared_ptr<NKikimr::NOlap::TColumnEngineChanges> TBlobsWithSizeLimit::BuildMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const {
+ if (PortionsSize > (i64)SizeLimitToMerge || PortionsCount > CountLimitToMerge) {
+ i64 currentSum = 0;
+ std::vector<std::shared_ptr<TPortionInfo>> portions;
+ std::optional<TString> tierName;
+ for (auto&& i : Portions) {
+ for (auto&& c : i.second) {
+ if (busyPortions.contains(c.second->GetAddress())) {
+ continue;
+ }
+ if (c.second->GetMeta().GetTierName() && (!tierName || *tierName < c.second->GetMeta().GetTierName())) {
+ tierName = c.second->GetMeta().GetTierName();
+ }
+ currentSum += c.second->GetBlobBytes();
+ portions.emplace_back(c.second);
+ if (currentSum > (i64)32 * 1024 * 1024) {
+ break;
+ }
+ }
+ if (currentSum > (i64)32 * 1024 * 1024) {
+ break;
+ }
+ }
+ if (currentSum > SizeLimitToMerge || PortionsCount > CountLimitToMerge) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("portions", portions.size())("current_sum", currentSum);
+ TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager);
+ return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext);
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("skip", "not_enough_data");
+ }
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("event", "skip_by_condition");
+ }
+ return nullptr;
+}
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.h
new file mode 100644
index 00000000000..e5084e1cb45
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.h
@@ -0,0 +1,145 @@
+#pragma once
+#include "counters.h"
+#include <ydb/core/formats/arrow/replace_key.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/splitter/settings.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract/write.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+class TBlobsWithSizeLimit {
+private:
+ YDB_READONLY(ui64, SizeLimit, 0);
+ YDB_READONLY(i64, SizeLimitToMerge, (i64)2 * 1024 * 1024);
+ YDB_READONLY(i64, CountLimitToMerge, 8);
+ YDB_READONLY(i64, PortionsSize, 0);
+ YDB_READONLY(i64, PortionsCount, 0);
+ std::map<NArrow::TReplaceKey, std::map<ui64, std::shared_ptr<TPortionInfo>>> Portions;
+ std::shared_ptr<TCounters> Counters;
+ std::shared_ptr<IStoragesManager> StoragesManager;
+public:
+ TString DebugString() const {
+ return TStringBuilder()
+ << "p_count=" << PortionsCount << ";"
+ << "p_count_by_key=" << Portions.size() << ";"
+ ;
+ }
+
+ TBlobsWithSizeLimit(const ui64 limit, const std::shared_ptr<TCounters>& counters, const std::shared_ptr<IStoragesManager>& storagesManager)
+ : SizeLimit(limit)
+ , Counters(counters)
+ , StoragesManager(storagesManager)
+ {
+
+ }
+
+ std::shared_ptr<TColumnEngineChanges> BuildMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const;
+
+ void AddPortion(const std::shared_ptr<TPortionInfo>& portion) {
+ AFL_VERIFY(portion->BlobsBytes() < SizeLimit);
+ AFL_VERIFY(Portions[portion->IndexKeyStart()].emplace(portion->GetPortion(), portion).second);
+ PortionsSize += portion->BlobsBytes();
+ ++PortionsCount;
+ Counters->OnAddSmallPortion();
+ }
+
+ void RemovePortion(const std::shared_ptr<TPortionInfo>& portion) {
+ auto it = Portions.find(portion->IndexKeyStart());
+ AFL_VERIFY(it != Portions.end());
+ AFL_VERIFY(it->second.erase(portion->GetPortion()));
+ if (!it->second.size()) {
+ Portions.erase(it);
+ }
+ PortionsSize -= portion->BlobsBytes();
+ AFL_VERIFY(PortionsSize >= 0);
+ --PortionsCount;
+ AFL_VERIFY(PortionsCount >= 0);
+ Counters->OnRemoveSmallPortion();
+ }
+
+ std::optional<TOptimizationPriority> GetWeight() const {
+ Y_VERIFY(Counters->GetSmallCounts() == PortionsCount);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portions_opt_count", PortionsCount)("counter", (ui64)Counters->SmallPortionsByGranule.get());
+ if (PortionsSize > SizeLimitToMerge || PortionsCount > CountLimitToMerge) {
+ return TOptimizationPriority::Critical(PortionsCount);
+ } else {
+ return {};
+ }
+ }
+};
+
+class TBlobsBySize {
+private:
+ std::map<ui64, TBlobsWithSizeLimit> BlobsBySizeLimit;
+public:
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "(";
+ for (auto&& i : BlobsBySizeLimit) {
+ sb << "(" << i.first << ":" << i.second.DebugString() << ");";
+ }
+ sb << ")";
+ return sb;
+ }
+
+ void AddPortion(const std::shared_ptr<TPortionInfo>& portion) {
+ auto it = BlobsBySizeLimit.upper_bound(portion->GetBlobBytes());
+ if (it != BlobsBySizeLimit.end()) {
+ it->second.AddPortion(portion);
+ }
+ }
+
+ void RemovePortion(const std::shared_ptr<TPortionInfo>& portion) {
+ auto it = BlobsBySizeLimit.upper_bound(portion->GetBlobBytes());
+ if (it != BlobsBySizeLimit.end()) {
+ it->second.RemovePortion(portion);
+ }
+ }
+
+ std::optional<TOptimizationPriority> GetWeight() const {
+ std::optional<TOptimizationPriority> result;
+ for (auto&& i : BlobsBySizeLimit) {
+ auto w = i.second.GetWeight();
+ if (!w) {
+ continue;
+ }
+ if (!result || *result < *w) {
+ result = w;
+ }
+ }
+ return result;
+ }
+
+ const TBlobsWithSizeLimit* GetMaxWeightLimiter() const {
+ std::optional<TOptimizationPriority> resultWeight;
+ const TBlobsWithSizeLimit* result = nullptr;
+ for (auto&& i : BlobsBySizeLimit) {
+ auto w = i.second.GetWeight();
+ if (!w) {
+ continue;
+ }
+ if (!resultWeight || *resultWeight < *w) {
+ resultWeight = w;
+ result = &i.second;
+ }
+ }
+ return result;
+ }
+
+ std::shared_ptr<TColumnEngineChanges> BuildMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const {
+ auto* limiter = GetMaxWeightLimiter();
+ if (!limiter) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("fail", "limiter absent");
+ return nullptr;
+ }
+ return limiter->BuildMergeTask(limits, granule, busyPortions);
+ }
+
+ TBlobsBySize(const std::shared_ptr<TCounters>& counters, const std::shared_ptr<IStoragesManager>& storagesManager) {
+// BlobsBySizeLimit.emplace(512 * 1024, TBlobsWithSizeLimit(512 * 1024, counters, storagesManager));
+ BlobsBySizeLimit.emplace(1024 * 1024, TBlobsWithSizeLimit(1024 * 1024, counters, storagesManager));
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp
new file mode 100644
index 00000000000..a8c1f6920c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp
@@ -0,0 +1,5 @@
+#include "counters.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h
new file mode 100644
index 00000000000..b81871b673b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h
@@ -0,0 +1,98 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
+#include <ydb/core/formats/arrow/replace_key.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/splitter/settings.h>
+#include <ydb/core/tx/columnshard/counters/engine_logs.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+class TGlobalCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr SmallPortionsCount;
+ std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsCount;
+ std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsPackedSizeCount;
+ std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsRawSizeCount;
+ std::shared_ptr<NColumnShard::TValueAggregationAgent> SmallPortionsCountByGranule;
+public:
+ TGlobalCounters()
+ : TBase("IntervalsStorageOptimizer") {
+ SmallPortionsCountByGranule = TBase::GetValueAutoAggregations("Granule/SmallPortions/Count");
+ SmallPortionsCount = TBase::GetValue("SmallPortions/Count");
+
+ const std::set<i64> borders = {0, 1, 2, 4, 8, 16, 32, 64};
+ HistogramOverlappedIntervalsCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Count", "", borders);
+ HistogramOverlappedIntervalsPackedSizeCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Size/Packed", "", borders);
+ HistogramOverlappedIntervalsRawSizeCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Size/Raw", "", borders);
+ }
+
+ static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildClientSmallPortionsAggregation() {
+ return Singleton<TGlobalCounters>()->SmallPortionsCountByGranule->GetClient();
+ }
+
+ static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsOverlapping() {
+ return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsCount->BuildGuard();
+ }
+
+ static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsPackedSizeOverlapping() {
+ return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsPackedSizeCount->BuildGuard();
+ }
+
+ static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsRawSizeOverlapping() {
+ return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsRawSizeCount->BuildGuard();
+ }
+
+ static std::shared_ptr<NColumnShard::TValueGuard> BuildSmallPortionsGuard() {
+ return std::make_shared<NColumnShard::TValueGuard>(Singleton<TGlobalCounters>()->SmallPortionsCount);
+ }
+
+};
+
+class TCounters {
+private:
+ std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsGuard;
+ std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsPackedSizeGuard;
+ std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsRawSizeGuard;
+ std::shared_ptr<NColumnShard::TValueGuard> SmallPortionsCount;
+public:
+ std::shared_ptr<NColumnShard::TValueAggregationClient> SmallPortionsByGranule;
+ i64 GetSmallCounts() const {
+ return SmallPortionsByGranule->GetValue();
+ }
+
+ TCounters() {
+ IntervalsGuard = TGlobalCounters::BuildGuardIntervalsOverlapping();
+ IntervalsPackedSizeGuard = TGlobalCounters::BuildGuardIntervalsPackedSizeOverlapping();
+ IntervalsRawSizeGuard = TGlobalCounters::BuildGuardIntervalsRawSizeOverlapping();
+ SmallPortionsCount = TGlobalCounters::BuildSmallPortionsGuard();
+ SmallPortionsByGranule = TGlobalCounters::BuildClientSmallPortionsAggregation();
+ }
+
+ void OnRemoveIntervalsCount(const ui32 count, const ui64 rawSize, const ui64 packedSize) {
+ IntervalsGuard->Sub(count, 1);
+ IntervalsPackedSizeGuard->Sub(count, packedSize);
+ IntervalsRawSizeGuard->Sub(count, rawSize);
+ }
+
+ void OnAddIntervalsCount(const ui32 count, const ui64 rawSize, const ui64 packedSize) {
+ IntervalsGuard->Add(count, 1);
+ IntervalsPackedSizeGuard->Add(count, packedSize);
+ IntervalsRawSizeGuard->Add(count, rawSize);
+ }
+
+ void OnAddSmallPortion() {
+ SmallPortionsCount->Add(1);
+ SmallPortionsByGranule->Add(1);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("add_opt_count", SmallPortionsByGranule->GetValue())("counter", (ui64)SmallPortionsByGranule.get());
+ }
+
+ void OnRemoveSmallPortion() {
+ SmallPortionsCount->Sub(1);
+ SmallPortionsByGranule->Remove(1);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("remove_opt_count", SmallPortionsByGranule->GetValue())("counter", (ui64)SmallPortionsByGranule.get());
+ }
+
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
new file mode 100644
index 00000000000..3253413f0b0
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
@@ -0,0 +1,212 @@
+#include "optimizer.h"
+#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+#include <ydb/core/tx/columnshard/counters/engine_logs.h>
+#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+std::vector<std::shared_ptr<TPortionInfo>> TIntervalsOptimizerPlanner::GetPortionsForIntervalStartedIn(const NArrow::TReplaceKey& keyStart, const ui32 countExpectation) const {
+ std::vector<std::shared_ptr<TPortionInfo>> result;
+ auto it = Positions.find(keyStart);
+ AFL_VERIFY(it != Positions.end());
+ THashSet<ui64> portionsCurrentlyClosed;
+ auto itReverse = make_reverse_iterator(it);
+ AFL_VERIFY(itReverse != Positions.rbegin());
+ --itReverse;
+ for (; itReverse != Positions.rend(); ++itReverse) {
+ for (auto&& i : itReverse->second.GetPositions()) {
+ if (i.first.GetIsStart()) {
+ if (!portionsCurrentlyClosed.erase(i.first.GetPortionId())) {
+ result.emplace_back(i.second.GetPortionPtr());
+ }
+ } else {
+ AFL_VERIFY(portionsCurrentlyClosed.emplace(i.first.GetPortionId()).second);
+ }
+ }
+ if (result.size() == countExpectation) {
+ return result;
+ }
+ }
+ AFL_VERIFY(false)("result.size()", result.size())("expectation", countExpectation);
+ return result;
+}
+
+std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const {
+ if (auto result = SizeProblemBlobs.BuildMergeTask(limits, granule, busyPortions)) {
+ return result;
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("skip", "no_small_portion_tasks");
+ return nullptr;
+ if (RangedSegments.empty()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_ranged_segments");
+ return nullptr;
+ }
+ auto& topSegment = **RangedSegments.rbegin()->second.begin();
+ auto& features = topSegment.GetFeatures();
+ std::vector<std::shared_ptr<TPortionInfo>> portions = GetPortionsForIntervalStartedIn(topSegment.GetPosition(), features.GetPortionsCount());
+
+ if (portions.size() <= 1) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust())("reason", "one_portion");
+ return nullptr;
+ }
+
+ std::optional<TString> tierName;
+ for (auto&& i : portions) {
+ if (i->GetMeta().GetTierName() && (!tierName || *tierName < i->GetMeta().GetTierName())) {
+ tierName = i->GetMeta().GetTierName();
+ }
+ if (busyPortions.contains(i->GetAddress())) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust())
+ ("count", features.GetPortionsCount())("reason", "busy_portion")("portion_address", i->GetAddress().DebugString());
+ return nullptr;
+ }
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("features", features.DebugJson().GetStringRobust())("count", features.GetPortionsCount());
+
+ TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager);
+ return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext);
+}
+
+void TIntervalsOptimizerPlanner::RemovePortion(const std::shared_ptr<TPortionInfo>& info) {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion")("portion_id", info->GetPortion());
+ auto itStart = Positions.find(info->IndexKeyStart());
+ auto itFinish = Positions.find(info->IndexKeyEnd());
+ Y_VERIFY(itStart != Positions.end());
+ Y_VERIFY(itFinish != Positions.end());
+ if (itStart == itFinish) {
+ RemoveRanged(itStart->second);
+ itStart->second.RemoveSummary(info);
+ AddRanged(itStart->second);
+ if (itStart->second.RemoveStart(info) || itStart->second.RemoveFinish(info)) {
+ RemoveRanged(itStart->second);
+ Positions.erase(itStart);
+ }
+ } else {
+ for (auto it = itStart; it != itFinish; ++it) {
+ RemoveRanged(it->second);
+ it->second.RemoveSummary(info);
+ AddRanged(it->second);
+ }
+ if (itStart->second.RemoveStart(info)) {
+ RemoveRanged(itStart->second);
+ Positions.erase(itStart);
+ }
+ if (itFinish->second.RemoveFinish(info)) {
+ RemoveRanged(itFinish->second);
+ Positions.erase(itFinish);
+ }
+ }
+ AFL_VERIFY(RangedSegments.empty() == Positions.empty())("rs_size", RangedSegments.size())("p_size", Positions.size());
+}
+
+void TIntervalsOptimizerPlanner::AddPortion(const std::shared_ptr<TPortionInfo>& info) {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion")("portion_id", info->GetPortion());
+ auto itStart = Positions.find(info->IndexKeyStart());
+ if (itStart == Positions.end()) {
+ itStart = Positions.emplace(info->IndexKeyStart(), TBorderPositions(info->IndexKeyStart())).first;
+ if (itStart != Positions.begin()) {
+ auto itStartCopy = itStart;
+ --itStartCopy;
+ itStart->second.CopyFrom(itStartCopy->second);
+ AddRanged(itStart->second);
+ }
+ }
+ auto itEnd = Positions.find(info->IndexKeyEnd());
+ if (itEnd == Positions.end()) {
+ itEnd = Positions.emplace(info->IndexKeyEnd(), TBorderPositions(info->IndexKeyEnd())).first;
+ Y_VERIFY(itEnd != Positions.begin());
+ auto itEndCopy = itEnd;
+ --itEndCopy;
+ itEnd->second.CopyFrom(itEndCopy->second);
+ AddRanged(itEnd->second);
+ itStart = Positions.find(info->IndexKeyStart());
+ }
+ Y_VERIFY(itStart != Positions.end());
+ Y_VERIFY(itEnd != Positions.end());
+ itStart->second.AddStart(info);
+ itEnd->second.AddFinish(info);
+ if (itStart != itEnd) {
+ for (auto it = itStart; it != itEnd; ++it) {
+ RemoveRanged(it->second);
+ it->second.AddSummary(info);
+ AFL_VERIFY(!!it->second.GetFeatures());
+ AddRanged(it->second);
+ }
+ } else {
+ RemoveRanged(itStart->second);
+ itStart->second.AddSummary(info);
+ AddRanged(itStart->second);
+ }
+ AFL_VERIFY(RangedSegments.empty() == Positions.empty())("rs_size", RangedSegments.size())("p_size", Positions.size());
+}
+
+void TIntervalsOptimizerPlanner::DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) {
+ for (auto&& i : remove) {
+ SizeProblemBlobs.RemovePortion(i);
+ RemovePortion(i);
+ }
+ for (auto&& i : add) {
+ SizeProblemBlobs.AddPortion(i);
+ AddPortion(i);
+ }
+}
+
+void TIntervalsOptimizerPlanner::RemoveRanged(const TBorderPositions& data) {
+ if (!!data.GetFeatures()) {
+ Counters->OnRemoveIntervalsCount(data.GetFeatures().GetPortionsCount(), data.GetFeatures().GetPortionsRawWeight(), data.GetFeatures().GetPortionsWeight());
+ auto itFeatures = RangedSegments.find(data.GetFeatures());
+ Y_VERIFY(itFeatures->second.erase(&data));
+ if (itFeatures->second.empty()) {
+ RangedSegments.erase(itFeatures);
+ }
+ }
+}
+
+void TIntervalsOptimizerPlanner::AddRanged(const TBorderPositions& data) {
+ if (!!data.GetFeatures()) {
+ Counters->OnAddIntervalsCount(data.GetFeatures().GetPortionsCount(), data.GetFeatures().GetPortionsRawWeight(), data.GetFeatures().GetPortionsWeight());
+ Y_VERIFY(RangedSegments[data.GetFeatures()].emplace(&data).second);
+ }
+}
+
+TIntervalsOptimizerPlanner::TIntervalsOptimizerPlanner(const ui64 granuleId, const std::shared_ptr<IStoragesManager>& storagesManager)
+ : TBase(granuleId)
+ , StoragesManager(storagesManager)
+ , Counters(std::make_shared<TCounters>())
+ , SizeProblemBlobs(Counters, storagesManager)
+{
+}
+
+TOptimizationPriority TIntervalsOptimizerPlanner::DoGetUsefulMetric() const {
+ auto res = SizeProblemBlobs.GetWeight();
+ if (!!res) {
+ AFL_VERIFY(RangedSegments.size())("positions", Positions.size())("sizes", SizeProblemBlobs.DebugString());
+ return *res;
+ }
+ if (RangedSegments.empty()) {
+ return TOptimizationPriority::Zero();
+ }
+ auto& topSegment = **RangedSegments.rbegin()->second.begin();
+ auto& topFeaturesTask = topSegment.GetFeatures();
+ return TOptimizationPriority::Optimization(topFeaturesTask.GetUsefulMetric());
+}
+
+TString TIntervalsOptimizerPlanner::DoDebugString() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ auto& positions = result.InsertValue("positions", NJson::JSON_ARRAY);
+ for (auto&& i : Positions) {
+ positions.AppendValue(i.second.DebugJson());
+ }
+ return result.GetStringRobust();
+}
+
+void TIntervalsOptimizerPlanner::TBorderPositions::AddSummary(const std::shared_ptr<TPortionInfo>& info) {
+ Features.Add(info);
+}
+
+void TIntervalsOptimizerPlanner::TBorderPositions::RemoveSummary(const std::shared_ptr<TPortionInfo>& info) {
+ Features.Remove(info);
+}
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h
index 11aa4d5d068..5a708535525 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h
@@ -1,5 +1,7 @@
#pragma once
#include "optimizer.h"
+#include "counters.h"
+#include "blob_size.h"
#include <ydb/core/formats/arrow/replace_key.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/columnshard/splitter/settings.h>
@@ -8,6 +10,8 @@
namespace NKikimr::NOlap::NStorageOptimizer {
+class TCounters;
+
class TSegmentPosition {
private:
std::shared_ptr<TPortionInfo> Portion;
@@ -69,7 +73,6 @@ private:
YDB_READONLY(i64, PortionsRawWeight, 0);
YDB_READONLY(i64, SmallPortionsWeight, 0);
YDB_READONLY(i64, SmallPortionsCount, 0);
- std::map<ui64, std::shared_ptr<TPortionInfo>> SummaryPortions;
public:
NJson::TJsonValue DebugJson() const {
@@ -80,26 +83,9 @@ public:
result.InsertValue("sp_count", SmallPortionsCount);
result.InsertValue("sp_weight", SmallPortionsWeight);
result.InsertValue("r_count", RecordsCount);
- auto& pIds = result.InsertValue("portion_ids", NJson::JSON_ARRAY);
- for (auto&& i : SummaryPortions) {
- pIds.AppendValue(i.first);
- }
return result;
}
- bool Merge(const TIntervalFeatures& features, const i64 sumWeightLimit) {
- if (PortionsCount > 1 && PortionsWeight + features.PortionsWeight > sumWeightLimit) {
- return false;
- }
- for (auto&& i : features.SummaryPortions) {
- if (SummaryPortions.contains(i.first)) {
- continue;
- }
- Add(i.second);
- }
- return true;
- }
-
i64 GetUsefulMetric() const {
if (PortionsCount == 1 || PortionsWeight == 0) {
return 0;
@@ -116,8 +102,7 @@ public:
}
void Add(const std::shared_ptr<TPortionInfo>& info) {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion_in_summary")("portion_id", info->GetPortion())("count", SummaryPortions.size())("this", (ui64)this);
- AFL_VERIFY(SummaryPortions.emplace(info->GetPortion(), info).second)("portion_id", info->GetPortion())("this", (ui64)this);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion_in_summary")("portion_id", info->GetPortion())("count", GetPortionsCount())("this", (ui64)this);
++PortionsCount;
const i64 portionBytes = info->BlobsBytes();
PortionsWeight += portionBytes;
@@ -130,8 +115,7 @@ public:
}
void Remove(const std::shared_ptr<TPortionInfo>& info) {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion_from_summary")("portion_id", info->GetPortion())("count", SummaryPortions.size())("this", (ui64)this);
- AFL_VERIFY(SummaryPortions.erase(info->GetPortion()))("portion_id", info->GetPortion())("this", (ui64)this);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion_from_summary")("portion_id", info->GetPortion())("count", GetPortionsCount())("this", (ui64)this);
Y_VERIFY(--PortionsCount >= 0);
const i64 portionBytes = info->BlobsBytes();
PortionsWeight -= portionBytes;
@@ -154,17 +138,12 @@ public:
bool operator<(const TIntervalFeatures& item) const {
return GetUsefulMetric() < item.GetUsefulMetric();
}
- const std::map<ui64, std::shared_ptr<TPortionInfo>>& GetSummaryPortions() const {
- return SummaryPortions;
- }
bool IsEnoughWeight() const {
- return GetPortionsWeight() > TSplitSettings().GetMinBlobSize();
+ return GetPortionsRawWeight() > TSplitSettings().GetMinBlobSize() * 10;
}
};
-class TCounters;
-
class TIntervalsOptimizerPlanner: public IOptimizerPlanner {
private:
static ui64 LimitSmallBlobsMerge;
@@ -244,29 +223,30 @@ private:
void AddSummary(const std::shared_ptr<TPortionInfo>& info);
void RemoveSummary(const std::shared_ptr<TPortionInfo>& info);
};
+
std::map<TIntervalFeatures, std::set<const TBorderPositions*>> RangedSegments;
+
using TPositions = std::map<NArrow::TReplaceKey, TBorderPositions>;
TPositions Positions;
- i64 SumSmall = 0;
- std::map<NArrow::TReplaceKey, std::map<ui64, std::shared_ptr<TPortionInfo>>> SmallBlobs;
+ TBlobsBySize SizeProblemBlobs;
- void RemoveRanged(const TBorderPositions& data);
+ void RemovePortion(const std::shared_ptr<TPortionInfo>& info);
+ void AddPortion(const std::shared_ptr<TPortionInfo>& info);
+ void RemoveRanged(const TBorderPositions& data);
void AddRanged(const TBorderPositions& data);
bool RemoveSmallPortion(const std::shared_ptr<TPortionInfo>& info);
bool AddSmallPortion(const std::shared_ptr<TPortionInfo>& info);
- std::shared_ptr<TColumnEngineChanges> GetSmallPortionsMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const;
+ std::vector<std::shared_ptr<TPortionInfo>> GetPortionsForIntervalStartedIn(const NArrow::TReplaceKey& keyStart, const ui32 countExpectation) const;
protected:
- virtual void DoAddPortion(const std::shared_ptr<TPortionInfo>& info) override;
- virtual void DoRemovePortion(const std::shared_ptr<TPortionInfo>& info) override;
+ virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) override;
virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const override;
- virtual std::vector<std::shared_ptr<TPortionInfo>> DoGetPortionsOrderedByPK(const TSnapshot& snapshot) const override;
- virtual i64 DoGetUsefulMetric() const override;
+ virtual TOptimizationPriority DoGetUsefulMetric() const override;
virtual TString DoDebugString() const override;
public:
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make
new file mode 100644
index 00000000000..f76c42447ab
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ optimizer.cpp
+ blob_size.cpp
+ counters.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/changes/abstract
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
deleted file mode 100644
index f5016a83a34..00000000000
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+++ /dev/null
@@ -1,342 +0,0 @@
-#include "intervals_optimizer.h"
-#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
-#include <ydb/core/tx/columnshard/counters/common/owner.h>
-#include <ydb/core/tx/columnshard/counters/engine_logs.h>
-#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
-
-namespace NKikimr::NOlap::NStorageOptimizer {
-
-ui64 TIntervalsOptimizerPlanner::LimitSmallBlobsMerge = 2 * 1024 * 1024;
-ui64 TIntervalsOptimizerPlanner::LimitSmallBlobDetect = 512 * 1024;
-
-class TGlobalCounters: public NColumnShard::TCommonCountersOwner {
-private:
- using TBase = NColumnShard::TCommonCountersOwner;
- NMonitoring::TDynamicCounters::TCounterPtr SmallPortionsCount;
- std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsCount;
- std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsPackedSizeCount;
- std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsRawSizeCount;
-public:
- TGlobalCounters()
- : TBase("IntervalsStorageOptimizer") {
- SmallPortionsCount = TBase::GetValue("SmallPortions/Count");
-
- const std::set<i64> borders = {0, 1, 2, 4, 8, 16, 32, 64};
- HistogramOverlappedIntervalsCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Count", "", borders);
- HistogramOverlappedIntervalsPackedSizeCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Size/Packed", "", borders);
- HistogramOverlappedIntervalsRawSizeCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Size/Raw", "", borders);
- }
-
- static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsOverlapping() {
- return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsCount->BuildGuard();
- }
-
- static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsPackedSizeOverlapping() {
- return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsPackedSizeCount->BuildGuard();
- }
-
- static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsRawSizeOverlapping() {
- return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsRawSizeCount->BuildGuard();
- }
-
- static std::shared_ptr<NColumnShard::TValueGuard> BuildSmallPortionsGuard() {
- return std::make_shared<NColumnShard::TValueGuard>(Singleton<TGlobalCounters>()->SmallPortionsCount);
- }
-
-};
-
-class TCounters {
-private:
- std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsGuard;
- std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsPackedSizeGuard;
- std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsRawSizeGuard;
- std::shared_ptr<NColumnShard::TValueGuard> SmallPortionsCount;
-public:
- TCounters() {
- IntervalsGuard = TGlobalCounters::BuildGuardIntervalsOverlapping();
- IntervalsPackedSizeGuard = TGlobalCounters::BuildGuardIntervalsPackedSizeOverlapping();
- IntervalsRawSizeGuard = TGlobalCounters::BuildGuardIntervalsRawSizeOverlapping();
- SmallPortionsCount = TGlobalCounters::BuildSmallPortionsGuard();
- }
-
- void OnRemoveIntervalsCount(const ui32 count, const ui64 rawSize, const ui64 packedSize) {
- IntervalsGuard->Sub(count, 1);
- IntervalsPackedSizeGuard->Sub(count, packedSize);
- IntervalsRawSizeGuard->Sub(count, rawSize);
- }
-
- void OnAddIntervalsCount(const ui32 count, const ui64 rawSize, const ui64 packedSize) {
- IntervalsGuard->Add(count, 1);
- IntervalsPackedSizeGuard->Add(count, packedSize);
- IntervalsRawSizeGuard->Add(count, rawSize);
- }
-
- void OnAddSmallPortion() {
- SmallPortionsCount->Add(1);
- }
-
- void OnRemoveSmallPortion() {
- SmallPortionsCount->Sub(1);
- }
-
-};
-
-std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::GetSmallPortionsMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const {
- if (SumSmall > (i64)LimitSmallBlobsMerge) {
- ui64 currentSum = 0;
- std::map<ui64, std::shared_ptr<TPortionInfo>> portions;
- std::optional<TString> tierName;
- for (auto&& i : SmallBlobs) {
- for (auto&& c : i.second) {
- if (busyPortions.contains(c.second->GetAddress())) {
- return nullptr;
- }
- if (c.second->GetMeta().GetTierName() && (!tierName || *tierName < c.second->GetMeta().GetTierName())) {
- tierName = c.second->GetMeta().GetTierName();
- }
- currentSum += c.second->RawBytesSum();
- portions.emplace(c.first, c.second);
- }
- }
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("portions", portions.size())("current_sum", currentSum)("remained", SmallBlobs.size())("remained_size", SumSmall);
- TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager);
- return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext);
- }
- return nullptr;
-}
-
-std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const {
- if (auto result = GetSmallPortionsMergeTask(limits, granule, busyPortions)) {
- return result;
- }
- if (RangedSegments.empty()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_ranged_segments");
- return nullptr;
- }
- auto& topSegment = **RangedSegments.rbegin()->second.begin();
- auto& topFeaturesTask = topSegment.GetFeatures();
- TIntervalFeatures features;
- if (topFeaturesTask.IsEnoughWeight()) {
- if (topFeaturesTask.GetPortionsCount() == 1) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "not_critical_task_top");
- return nullptr;
- }
- std::map<ui64, std::vector<std::shared_ptr<TPortionInfo>>> sortedPortions;
- for (auto&& p : topFeaturesTask.GetSummaryPortions()) {
- sortedPortions[p.second->BlobsBytes()].emplace_back(p.second);
- }
-
- for (auto&& s : sortedPortions) {
- for (auto&& p : s.second) {
- if (features.GetPortionsCount() > 1 && s.first > 128 * 1024) {
- if (features.GetPortionsWeight() + p->BlobsBytes() > 512 * 1024 * 1024 && features.GetPortionsCount() > 1) {
- break;
- }
- }
- features.Add(p);
- }
- }
- } else {
- auto itFwd = Positions.find(topSegment.GetPosition());
- Y_VERIFY(itFwd != Positions.end());
- features = itFwd->second.GetFeatures();
- auto itReverse = std::make_reverse_iterator(itFwd);
- ++itFwd;
- while (!features.IsEnoughWeight()) {
- if (itFwd == Positions.end() && itReverse == Positions.rend()) {
- break;
- }
- if (itFwd == Positions.end()) {
- if (!features.Merge(itReverse->second.GetFeatures(), 512 * 1024 * 1024)) {
- break;
- }
- ++itReverse;
- } else if (itReverse == Positions.rend()) {
- if (!features.Merge(itFwd->second.GetFeatures(), 512 * 1024 * 1024)) {
- break;
- }
- ++itFwd;
- } else if (itFwd->second.GetFeatures().GetUsefulKff() < itReverse->second.GetFeatures().GetUsefulKff()) {
- if (!features.Merge(itReverse->second.GetFeatures(), 512 * 1024 * 1024)) {
- break;
- }
- ++itReverse;
- } else {
- if (!features.Merge(itFwd->second.GetFeatures(), 512 * 1024 * 1024)) {
- break;
- }
- ++itFwd;
- }
- }
- }
- if (features.GetPortionsCount() <= 1) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust())("reason", "one_portion");
- return nullptr;
- }
-
- std::optional<TString> tierName;
- for (auto&& i : features.GetSummaryPortions()) {
- if (i.second->GetMeta().GetTierName() && (!tierName || *tierName < i.second->GetMeta().GetTierName())) {
- tierName = i.second->GetMeta().GetTierName();
- }
- if (busyPortions.contains(i.second->GetAddress())) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust())
- ("count", features.GetSummaryPortions().size())("reason", "busy_portion")("portion_address", i.second->GetAddress().DebugString());
- return nullptr;
- }
- }
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("features", features.DebugJson().GetStringRobust())("count", features.GetSummaryPortions().size());
-
- TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager);
- return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, features.GetSummaryPortions(), saverContext);
-}
-
-bool TIntervalsOptimizerPlanner::RemoveSmallPortion(const std::shared_ptr<TPortionInfo>& info) {
- if (info->BlobsBytes() < LimitSmallBlobDetect) {
- Counters->OnRemoveSmallPortion();
- auto it = SmallBlobs.find(info->IndexKeyStart());
- Y_VERIFY(it->second.erase(info->GetPortion()));
- if (it->second.empty()) {
- SmallBlobs.erase(it);
- }
- SumSmall -= info->BlobsBytes();
- Y_VERIFY(SumSmall >= 0);
- return true;
- }
- return false;
-}
-
-bool TIntervalsOptimizerPlanner::AddSmallPortion(const std::shared_ptr<TPortionInfo>& info) {
- if (info->BlobsBytes() < LimitSmallBlobDetect) {
- Counters->OnAddSmallPortion();
- Y_VERIFY(SmallBlobs[info->IndexKeyStart()].emplace(info->GetPortion(), info).second);
- SumSmall += info->BlobsBytes();
- return true;
- }
- return false;
-}
-
-void TIntervalsOptimizerPlanner::DoRemovePortion(const std::shared_ptr<TPortionInfo>& info) {
- RemoveSmallPortion(info);
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion")("portion_id", info->GetPortion());
- auto itStart = Positions.find(info->IndexKeyStart());
- auto itFinish = Positions.find(info->IndexKeyEnd());
- Y_VERIFY(itStart != Positions.end());
- Y_VERIFY(itFinish != Positions.end());
- for (auto it = itStart; it != itFinish; ++it) {
- RemoveRanged(it->second);
- it->second.RemoveSummary(info);
- AddRanged(it->second);
- }
- if (itStart->second.RemoveStart(info)) {
- RemoveRanged(itStart->second);
- Positions.erase(itStart);
- }
- if (itFinish->second.RemoveFinish(info)) {
- RemoveRanged(itFinish->second);
- Positions.erase(itFinish);
- }
-}
-
-void TIntervalsOptimizerPlanner::DoAddPortion(const std::shared_ptr<TPortionInfo>& info) {
- AddSmallPortion(info);
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion")("portion_id", info->GetPortion());
- auto itStart = Positions.find(info->IndexKeyStart());
- if (itStart == Positions.end()) {
- itStart = Positions.emplace(info->IndexKeyStart(), TBorderPositions(info->IndexKeyStart())).first;
- if (itStart != Positions.begin()) {
- auto itStartCopy = itStart;
- --itStartCopy;
- itStart->second.CopyFrom(itStartCopy->second);
- AddRanged(itStart->second);
- }
- }
- auto itEnd = Positions.find(info->IndexKeyEnd());
- if (itEnd == Positions.end()) {
- itEnd = Positions.emplace(info->IndexKeyEnd(), TBorderPositions(info->IndexKeyEnd())).first;
- Y_VERIFY(itEnd != Positions.begin());
- auto itEndCopy = itEnd;
- --itEndCopy;
- itEnd->second.CopyFrom(itEndCopy->second);
- AddRanged(itEnd->second);
- itStart = Positions.find(info->IndexKeyStart());
- }
- Y_VERIFY(itStart != Positions.end());
- Y_VERIFY(itEnd != Positions.end());
- itStart->second.AddStart(info);
- itEnd->second.AddFinish(info);
- for (auto it = itStart; it != itEnd; ++it) {
- RemoveRanged(it->second);
- it->second.AddSummary(info);
- AddRanged(it->second);
- }
-}
-
-void TIntervalsOptimizerPlanner::RemoveRanged(const TBorderPositions& data) {
- if (!!data.GetFeatures()) {
- Counters->OnRemoveIntervalsCount(data.GetFeatures().GetSummaryPortions().size(), data.GetFeatures().GetPortionsRawWeight(), data.GetFeatures().GetPortionsWeight());
- auto itFeatures = RangedSegments.find(data.GetFeatures());
- Y_VERIFY(itFeatures->second.erase(&data));
- if (itFeatures->second.empty()) {
- RangedSegments.erase(itFeatures);
- }
- }
-}
-
-void TIntervalsOptimizerPlanner::AddRanged(const TBorderPositions& data) {
- if (!!data.GetFeatures()) {
- Counters->OnAddIntervalsCount(data.GetFeatures().GetSummaryPortions().size(), data.GetFeatures().GetPortionsRawWeight(), data.GetFeatures().GetPortionsWeight());
- Y_VERIFY(RangedSegments[data.GetFeatures()].emplace(&data).second);
- }
-}
-
-TIntervalsOptimizerPlanner::TIntervalsOptimizerPlanner(const ui64 granuleId, const std::shared_ptr<IStoragesManager>& storagesManager)
- : TBase(granuleId)
- , StoragesManager(storagesManager)
-{
- Counters = std::make_shared<TCounters>();
-}
-
-std::vector<std::shared_ptr<TPortionInfo>> TIntervalsOptimizerPlanner::DoGetPortionsOrderedByPK(const TSnapshot& snapshot) const {
- std::vector<std::shared_ptr<TPortionInfo>> result;
- for (auto&& i : Positions) {
- for (auto&& p : i.second.GetPositions()) {
- if (!p.first.GetIsStart()) {
- continue;
- }
- if (!p.second.GetPortion().IsVisible(snapshot)) {
- continue;
- }
- result.emplace_back(p.second.GetPortionPtr());
- }
- }
- return result;
-}
-
-i64 TIntervalsOptimizerPlanner::DoGetUsefulMetric() const {
- if (RangedSegments.empty()) {
- return 0;
- }
- auto& topSegment = **RangedSegments.rbegin()->second.begin();
- auto& topFeaturesTask = topSegment.GetFeatures();
- return std::max<i64>(topFeaturesTask.GetUsefulMetric(), std::max<ui64>(SumSmall ? 1 : 0, SumSmall / LimitSmallBlobsMerge));
-}
-
-TString TIntervalsOptimizerPlanner::DoDebugString() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- auto& positions = result.InsertValue("positions", NJson::JSON_ARRAY);
- for (auto&& i : Positions) {
- positions.AppendValue(i.second.DebugJson());
- }
- return result.GetStringRobust();
-}
-
-void TIntervalsOptimizerPlanner::TBorderPositions::AddSummary(const std::shared_ptr<TPortionInfo>& info) {
- Features.Add(info);
-}
-
-void TIntervalsOptimizerPlanner::TBorderPositions::RemoveSummary(const std::shared_ptr<TPortionInfo>& info) {
- Features.Remove(info);
-}
-
-} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..8a1314dc2c9
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-levels)
+target_link_libraries(storage-optimizer-levels PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-levels PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..f07e7535dd3
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-levels)
+target_link_libraries(storage-optimizer-levels PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-levels PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..f07e7535dd3
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-levels)
+target_link_libraries(storage-optimizer-levels PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-levels PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..8a1314dc2c9
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(storage-optimizer-levels)
+target_link_libraries(storage-optimizer-levels PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+ engines-changes-abstract
+)
+target_sources(storage-optimizer-levels PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp
new file mode 100644
index 00000000000..6004ce09169
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp
@@ -0,0 +1,5 @@
+#include "counters.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLevels {
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h
new file mode 100644
index 00000000000..be808941e87
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h
@@ -0,0 +1,93 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
+#include <ydb/core/formats/arrow/replace_key.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/splitter/settings.h>
+#include <ydb/core/tx/columnshard/counters/engine_logs.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLevels {
+
+class TGlobalCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr SmallPortionsCount;
+ std::shared_ptr<NColumnShard::TValueAggregationAgent> SmallPortionsCountByGranule;
+
+ std::shared_ptr<NColumnShard::TValueAggregationAgent> CriticalRecordsCount;
+ std::shared_ptr<NColumnShard::TValueAggregationAgent> NormalRecordsCount;
+public:
+ TGlobalCounters()
+ : TBase("LevelsStorageOptimizer")
+ {
+ SmallPortionsCount = TBase::GetValue("SmallPortions/Count");
+ CriticalRecordsCount = TBase::GetValueAutoAggregations("Granule/CriticalRecord/Count");
+ NormalRecordsCount = TBase::GetValueAutoAggregations("Granule/NormalRecord/Count");
+ SmallPortionsCountByGranule = TBase::GetValueAutoAggregations("Granule/SmallPortions/Count");
+ }
+
+ static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildClientSmallPortionsAggregation() {
+ return Singleton<TGlobalCounters>()->SmallPortionsCountByGranule->GetClient();
+ }
+
+ static std::shared_ptr<NColumnShard::TValueGuard> BuildSmallPortionsGuard() {
+ return std::make_shared<NColumnShard::TValueGuard>(Singleton<TGlobalCounters>()->SmallPortionsCount);
+ }
+
+ static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildCriticalRecordsCountAggregation() {
+ return Singleton<TGlobalCounters>()->CriticalRecordsCount->GetClient();
+ }
+
+ static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildNormalRecordsCountAggregation() {
+ return Singleton<TGlobalCounters>()->NormalRecordsCount->GetClient();
+ }
+
+};
+
+class TCounters {
+private:
+ std::shared_ptr<NColumnShard::TValueAggregationClient> CriticalRecordsCount;
+ std::shared_ptr<NColumnShard::TValueAggregationClient> NormalRecordsCount;
+
+ std::shared_ptr<NColumnShard::TValueGuard> SmallPortionsCount;
+ std::shared_ptr<NColumnShard::TValueAggregationClient> SmallPortionsByGranule;
+public:
+ i64 GetSmallCounts() const {
+ return SmallPortionsByGranule->GetValue();
+ }
+
+ TCounters() {
+ CriticalRecordsCount = TGlobalCounters::BuildCriticalRecordsCountAggregation();
+ NormalRecordsCount = TGlobalCounters::BuildNormalRecordsCountAggregation();
+ SmallPortionsCount = TGlobalCounters::BuildSmallPortionsGuard();
+ SmallPortionsByGranule = TGlobalCounters::BuildClientSmallPortionsAggregation();
+ }
+
+ void OnAddCriticalCount(const ui32 count) {
+ CriticalRecordsCount->Add(count);
+ }
+
+ void OnAddNormalCount(const ui32 count) {
+ NormalRecordsCount->Add(count);
+ }
+
+ void OnRemoveCriticalCount(const ui32 count) {
+ CriticalRecordsCount->Remove(count);
+ }
+
+ void OnRemoveNormalCount(const ui32 count) {
+ NormalRecordsCount->Remove(count);
+ }
+
+ void OnAddSmallPortion() {
+ SmallPortionsCount->Add(1);
+ SmallPortionsByGranule->Add(1);
+ }
+
+ void OnRemoveSmallPortion() {
+ SmallPortionsCount->Sub(1);
+ SmallPortionsByGranule->Remove(1);
+ }
+
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp
new file mode 100644
index 00000000000..c78ef905041
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp
@@ -0,0 +1,5 @@
+#include "optimizer.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h
new file mode 100644
index 00000000000..311923e0c7e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h
@@ -0,0 +1,517 @@
+#pragma once
+#include "counters.h"
+
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
+#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
+#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
+#include <ydb/library/accessor/accessor.h>
+
+#include <util/generic/hash.h>
+#include <util/system/types.h>
+#include <util/generic/hash_set.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLevels {
+
+class TLevelInfo {
+private:
+ THashMap<ui64, i64> Counters;
+ YDB_READONLY(i64, CriticalWeight, 0);
+ YDB_READONLY(i64, NormalizedWeight, 0);
+ THashSet<ui64> PortionIds;
+ std::shared_ptr<TCounters> Signals;
+public:
+ TLevelInfo(std::shared_ptr<TCounters> counters)
+ : Signals(counters)
+ {
+
+ }
+
+ void AddPortion(const std::shared_ptr<TPortionInfo>& p, const ui32 refCount) {
+ if (p->GetBlobBytes() < (1 << 20)) {
+ Signals->OnAddSmallPortion();
+ }
+ auto it = Counters.find(p->GetPortion());
+ i64 refCountPred = 0;
+ if (it == Counters.end()) {
+ it = Counters.emplace(p->GetPortion(), refCount).first;
+ } else {
+ refCountPred = it->second;
+ it->second += refCount;
+ }
+ if (it->second == 1 && refCountPred == 0) {
+ NormalizedWeight += p->NumRows();
+ Signals->OnAddNormalCount(p->NumRows());
+ } else if (it->second >= 2 && refCountPred == 1) {
+ CriticalWeight += p->NumRows();
+ Signals->OnAddCriticalCount(p->NumRows());
+
+ NormalizedWeight -= p->NumRows();
+ Y_VERIFY(NormalizedWeight >= 0);
+
+ Signals->OnRemoveNormalCount(p->NumRows());
+ } else if (it->second >= 2 && refCountPred == 0) {
+ CriticalWeight += p->NumRows();
+ Signals->OnAddCriticalCount(p->NumRows());
+ } else if (it->second >= 2 && refCountPred >= 2) {
+ } else {
+ Y_VERIFY(false);
+ }
+ }
+
+ void RemovePortion(const std::shared_ptr<TPortionInfo>& p, const ui32 refCount) {
+ if (p->GetBlobBytes() < (1 << 20)) {
+ Signals->OnRemoveSmallPortion();
+ }
+ auto it = Counters.find(p->GetPortion());
+ Y_VERIFY(it != Counters.end());
+ const i64 refCountPred = it->second;
+ it->second -= refCount;
+ Y_VERIFY(it->second >= 0);
+ if (it->second >= 2) {
+ } else if (it->second == 1) {
+ Y_VERIFY(refCountPred >= 2);
+ CriticalWeight -= p->NumRows();
+ Y_VERIFY(CriticalWeight >= 0);
+ Signals->OnRemoveCriticalCount(p->NumRows());
+ Y_VERIFY(CriticalWeight >= 0);
+ NormalizedWeight += p->NumRows();
+ Signals->OnAddNormalCount(p->NumRows());
+ } else if (it->second == 0) {
+ if (refCountPred >= 2) {
+ Y_VERIFY(refCountPred >= 2);
+ CriticalWeight -= p->NumRows();
+ Y_VERIFY(CriticalWeight >= 0);
+ Signals->OnRemoveCriticalCount(p->NumRows());
+ } else if (refCountPred == 1) {
+ NormalizedWeight -= p->NumRows();
+ Y_VERIFY(NormalizedWeight >= 0);
+ Signals->OnRemoveNormalCount(p->NumRows());
+ } else {
+ Y_VERIFY(false);
+ }
+ Counters.erase(it);
+ }
+ }
+
+};
+
+class TBorderPoint {
+public:
+ using TBorderPortions = THashMap<ui64, std::shared_ptr<TPortionInfo>>;
+private:
+ THashMap<ui64, ui32> MiddleWeight;
+ YDB_READONLY_DEF(TBorderPortions, StartPortions);
+ YDB_READONLY_DEF(TBorderPortions, MiddlePortions);
+ YDB_READONLY_DEF(TBorderPortions, FinishPortions);
+ std::shared_ptr<TLevelInfo> LevelInfo;
+public:
+ void InitInternalPoint(const TBorderPoint& predPoint) {
+ Y_VERIFY(predPoint.MiddleWeight.size() == predPoint.MiddlePortions.size());
+ for (auto&& i : predPoint.MiddlePortions) {
+ auto it = predPoint.MiddleWeight.find(i.first);
+ if (it->second != 2) {
+ AddMiddle(i.second, 1);
+ }
+ }
+ }
+
+ std::shared_ptr<TPortionInfo> GetOnlyPortion() const {
+ Y_VERIFY(MiddlePortions.size() == 1);
+ Y_VERIFY(!IsCritical());
+ return MiddlePortions.begin()->second;
+ }
+
+ bool IsSmall() const {
+ if (!IsCritical() && MiddlePortions.size() == 1 && MiddlePortions.begin()->second->GetBlobBytes() < (1 << 20)) {
+ return true;
+ }
+ return false;
+ }
+
+ bool IsCritical() const {
+ if (StartPortions.size() && FinishPortions.size()) {
+ return true;
+ }
+ if (MiddlePortions.size() > 1 || StartPortions.size() > 1 || FinishPortions.size() > 1) {
+ return true;
+ }
+ return false;
+ }
+
+ TBorderPoint(const std::shared_ptr<TLevelInfo>& info)
+ : LevelInfo(info) {
+
+ }
+
+ ~TBorderPoint() {
+ for (auto&& i : MiddlePortions) {
+ if (i.second->IndexKeyStart() == i.second->IndexKeyEnd()) {
+ LevelInfo->RemovePortion(i.second, 2);
+ } else {
+ LevelInfo->RemovePortion(i.second, 1);
+ }
+ }
+ }
+
+ void AddStart(const std::shared_ptr<TPortionInfo>& p) {
+ Y_VERIFY(StartPortions.emplace(p->GetPortion(), p).second);
+ }
+ void RemoveStart(const std::shared_ptr<TPortionInfo>& p) {
+ Y_VERIFY(StartPortions.erase(p->GetPortion()));
+ }
+
+ void AddMiddle(const std::shared_ptr<TPortionInfo>& p, const ui32 portionCriticalWeight) {
+ Y_VERIFY(MiddleWeight.emplace(p->GetPortion(), portionCriticalWeight).second);
+ Y_VERIFY(MiddlePortions.emplace(p->GetPortion(), p).second);
+ LevelInfo->AddPortion(p, portionCriticalWeight);
+ }
+ void RemoveMiddle(const std::shared_ptr<TPortionInfo>& p, const ui32 portionCriticalWeight) {
+ Y_VERIFY(MiddleWeight.erase(p->GetPortion()));
+ Y_VERIFY(MiddlePortions.erase(p->GetPortion()));
+ LevelInfo->RemovePortion(p, portionCriticalWeight);
+ }
+
+ void AddFinish(const std::shared_ptr<TPortionInfo>& p) {
+ Y_VERIFY(FinishPortions.emplace(p->GetPortion(), p).second);
+ }
+ void RemoveFinish(const std::shared_ptr<TPortionInfo>& p) {
+ Y_VERIFY(FinishPortions.erase(p->GetPortion()));
+ }
+
+ bool IsEmpty() const {
+ return StartPortions.empty() && FinishPortions.empty();
+ }
+};
+
+class TPortionsPlacement {
+private:
+ THashSet<ui64> PortionIds;
+ std::map<NArrow::TReplaceKey, TBorderPoint> Borders;
+ std::shared_ptr<TLevelInfo> LevelInfo;
+public:
+ TPortionsPlacement(const std::shared_ptr<TLevelInfo>& levelInfo)
+ : LevelInfo(levelInfo)
+ {
+
+ }
+
+ class TPortionsScanner {
+ private:
+ THashMap<ui64, std::shared_ptr<TPortionInfo>> CurrentPortions;
+ const THashSet<TPortionAddress>& BusyPortions;
+ public:
+
+ TPortionsScanner(const THashSet<TPortionAddress>& busyPortions)
+ : BusyPortions(busyPortions)
+ {
+
+ }
+
+ const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetCurrentPortions() const {
+ return CurrentPortions;
+ }
+
+ bool AddBorderPoint(const TBorderPoint& p, bool& hasBusy) {
+ hasBusy = false;
+ for (auto&& [_, portionInfo] : p.GetStartPortions()) {
+ if (BusyPortions.contains(portionInfo->GetAddress())) {
+ hasBusy = true;
+ continue;
+ }
+ AFL_VERIFY(CurrentPortions.emplace(portionInfo->GetPortion(), portionInfo).second);
+ }
+
+ for (auto&& [_, portionInfo] : p.GetFinishPortions()) {
+ if (BusyPortions.contains(portionInfo->GetAddress())) {
+ continue;
+ }
+ AFL_VERIFY(CurrentPortions.erase(portionInfo->GetPortion()));
+ }
+ return CurrentPortions.size();
+ }
+ };
+
+ enum class EChainProblem {
+ NoProblem,
+ SmallChunks,
+ MergeChunks
+ };
+
+ std::vector<std::vector<std::shared_ptr<TPortionInfo>>> GetPortionsToCompact(const ui64 sizeLimit, const THashSet<TPortionAddress>& busyPortions) const {
+ std::vector<std::vector<std::shared_ptr<TPortionInfo>>> result;
+ THashSet<ui64> readyPortionIds;
+ ui64 resultSize = 0;
+
+ TPortionsScanner buffer(busyPortions);
+ THashMap<ui64, std::shared_ptr<TPortionInfo>> portionsCurrentChain;
+ ui64 chainSize = 0;
+ EChainProblem problemType = EChainProblem::NoProblem;
+ for (auto&& i : Borders) {
+ bool hasBusy = false;
+ if (!buffer.AddBorderPoint(i.second, hasBusy)) {
+ if (hasBusy && problemType == EChainProblem::SmallChunks) {
+ chainSize = 0;
+ portionsCurrentChain.clear();
+ problemType = EChainProblem::NoProblem;
+ } else if (chainSize > (1 << 20)) {
+ resultSize += chainSize;
+ std::vector<std::shared_ptr<TPortionInfo>> chain;
+ for (auto&& i : portionsCurrentChain) {
+ chain.emplace_back(i.second);
+ }
+ result.emplace_back(chain);
+ chainSize = 0;
+ portionsCurrentChain.clear();
+ problemType = EChainProblem::NoProblem;
+ }
+ } else {
+ if (buffer.GetCurrentPortions().size() > 1) {
+ problemType = EChainProblem::MergeChunks;
+ } else if (buffer.GetCurrentPortions().begin()->second->GetBlobBytes() < (1 << 20) && problemType == EChainProblem::NoProblem) {
+ problemType = EChainProblem::SmallChunks;
+ }
+ if (problemType != EChainProblem::NoProblem) {
+ for (auto&& i : buffer.GetCurrentPortions()) {
+ if (portionsCurrentChain.emplace(i.second->GetPortion(), i.second).second) {
+ chainSize += i.second->GetBlobBytes();
+ }
+ }
+ }
+ }
+ if (resultSize + chainSize > sizeLimit) {
+ break;
+ }
+ }
+ if (portionsCurrentChain.size() > 1) {
+ std::vector<std::shared_ptr<TPortionInfo>> chain;
+ for (auto&& i : portionsCurrentChain) {
+ chain.emplace_back(i.second);
+ }
+ result.emplace_back(chain);
+ }
+
+ return result;
+ }
+
+ void RemovePortion(const std::shared_ptr<TPortionInfo>& portion) {
+ Y_VERIFY(PortionIds.erase(portion->GetPortion()));
+ auto itStart = Borders.find(portion->IndexKeyStart());
+ AFL_VERIFY(itStart != Borders.end());
+ auto itFinish = Borders.find(portion->IndexKeyEnd());
+ AFL_VERIFY(itFinish != Borders.end());
+
+ itStart->second.RemoveStart(portion);
+ itFinish->second.RemoveFinish(portion);
+ if (itStart != itFinish) {
+ for (auto it = itStart; it != itFinish; ++it) {
+ it->second.RemoveMiddle(portion, 1);
+ }
+ if (itFinish->second.IsEmpty()) {
+ Y_VERIFY(Borders.erase(portion->IndexKeyEnd()));
+ }
+ if (itStart->second.IsEmpty()) {
+ Y_VERIFY(Borders.erase(portion->IndexKeyStart()));
+ }
+ } else {
+ itStart->second.RemoveMiddle(portion, 2);
+ if (itStart->second.IsEmpty()) {
+ Borders.erase(itStart);
+ }
+ }
+ }
+
+ void AddPortion(const std::shared_ptr<TPortionInfo>& portion) {
+ Y_VERIFY(PortionIds.emplace(portion->GetPortion()).second);
+ auto itStartInfo = Borders.emplace(portion->IndexKeyStart(), TBorderPoint(LevelInfo));
+ auto itStart = itStartInfo.first;
+ if (itStartInfo.second && itStart != Borders.begin()) {
+ auto itStartCopy = itStart;
+ --itStartCopy;
+ itStart->second.InitInternalPoint(itStartCopy->second);
+ }
+ auto itFinishInfo = Borders.emplace(portion->IndexKeyEnd(), TBorderPoint(LevelInfo));
+ auto itFinish = itFinishInfo.first;
+ if (itFinishInfo.second) {
+ Y_VERIFY(itFinish != Borders.begin());
+ auto itFinishCopy = itFinish;
+ --itFinishCopy;
+ itFinish->second.InitInternalPoint(itFinishCopy->second);
+ }
+
+ itStart->second.AddStart(portion);
+ itFinish->second.AddFinish(portion);
+ if (itStart != itFinish) {
+ for (auto it = itStart; it != itFinish; ++it) {
+ it->second.AddMiddle(portion, 1);
+ }
+ } else {
+ itStart->second.AddMiddle(portion, 2);
+ }
+ }
+};
+
+class TLevel {
+private:
+ YDB_READONLY(TDuration, CriticalAge, TDuration::Zero());
+ YDB_READONLY(ui64, CriticalSize, 0);
+ std::shared_ptr<TLevelInfo> LevelInfo;
+ TPortionsPlacement PortionsPlacement;
+ std::shared_ptr<TLevel> NextLevel;
+ std::map<NArrow::TReplaceKey, TBorderPoint> Borders;
+ std::map<TSnapshot, THashMap<ui64, std::shared_ptr<TPortionInfo>>> PortionByAge;
+ const ui64 PortionsSizeLimit = (ui64)250 * 1024 * 1024;
+ TCompactionLimits CompactionLimits;
+ THashSet<ui64> PortionIds;
+ const std::shared_ptr<IStoragesManager> StoragesManager;
+ std::shared_ptr<arrow::Schema> PrimaryKeysSchema;
+public:
+ TLevel(const TDuration criticalAge, const ui64 criticalSize, std::shared_ptr<TLevel> nextLevel, const std::shared_ptr<IStoragesManager>& storagesManager, std::shared_ptr<TCounters> counters,
+ const std::shared_ptr<arrow::Schema>& primaryKeysSchema)
+ : CriticalAge(criticalAge)
+ , CriticalSize(criticalSize)
+ , LevelInfo(std::make_shared<TLevelInfo>(counters))
+ , PortionsPlacement(LevelInfo)
+ , NextLevel(nextLevel)
+ , StoragesManager(storagesManager)
+ , PrimaryKeysSchema(primaryKeysSchema)
+ {
+ CompactionLimits.GranuleSizeForOverloadPrevent = CriticalSize * 0.5;
+ }
+
+ ui64 GetWeight() const {
+ return LevelInfo->GetCriticalWeight();
+ }
+
+ void ProvidePortionsNextLevel(const TInstant currentInstant) {
+ if (!NextLevel) {
+ return;
+ }
+ std::vector<std::shared_ptr<TPortionInfo>> portionsForProviding;
+ for (auto&& i : PortionByAge) {
+ if (TInstant::MilliSeconds(i.first.GetPlanStep()) + CriticalAge < currentInstant) {
+ for (auto&& p : i.second) {
+ portionsForProviding.emplace_back(p.second);
+ }
+ } else {
+ break;
+ }
+ }
+ for (auto&& i : portionsForProviding) {
+ RemovePortion(i, currentInstant);
+ NextLevel->AddPortion(i, currentInstant);
+ }
+ }
+
+ void AddPortion(const std::shared_ptr<TPortionInfo>& portionInfo, const TInstant addInstant) {
+ if (TInstant::MilliSeconds(portionInfo->RecordSnapshotMax().GetPlanStep()) + CriticalAge < addInstant) {
+ Y_VERIFY(!PortionIds.contains(portionInfo->GetPortion()));
+ if (NextLevel) {
+ return NextLevel->AddPortion(portionInfo, addInstant);
+ }
+ }
+ PortionsPlacement.AddPortion(portionInfo);
+ Y_VERIFY(PortionByAge[portionInfo->RecordSnapshotMax()].emplace(portionInfo->GetPortion(), portionInfo).second);
+ ProvidePortionsNextLevel(addInstant);
+ }
+
+ void RemovePortion(const std::shared_ptr<TPortionInfo>& portionInfo, const TInstant removeInstant) {
+ PortionsPlacement.RemovePortion(portionInfo);
+ {
+ auto it = PortionByAge.find(portionInfo->RecordSnapshotMax());
+ Y_VERIFY(it != PortionByAge.end());
+ Y_VERIFY(it->second.erase(portionInfo->GetPortion()));
+ if (it->second.empty()) {
+ PortionByAge.erase(it);
+ }
+ }
+ ProvidePortionsNextLevel(removeInstant);
+ }
+
+ std::shared_ptr<TColumnEngineChanges> BuildOptimizationTask(const TCompactionLimits& /*limits*/, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions, const TInstant /*currentInstant*/) const {
+ std::vector<std::vector<std::shared_ptr<TPortionInfo>>> portionGroups = PortionsPlacement.GetPortionsToCompact(PortionsSizeLimit, busyPortions);
+ if (portionGroups.empty()) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "optimization_task_skipped");
+ return nullptr;
+ }
+ std::vector<std::shared_ptr<TPortionInfo>> portions;
+ std::vector<NIndexedReader::TSortableBatchPosition> positions;
+ for (auto&& i : portionGroups) {
+ portions.insert(portions.end(), i.begin(), i.end());
+ std::optional<NIndexedReader::TSortableBatchPosition> position;
+ for (auto&& p : i) {
+ NIndexedReader::TSortableBatchPosition pos(p->IndexKeyEnd().ToBatch(PrimaryKeysSchema), 0, PrimaryKeysSchema->field_names(), {}, false);
+ if (!position || position->Compare(pos) == std::partial_ordering::less) {
+ position = pos;
+ }
+ }
+ Y_VERIFY(position);
+ positions.emplace_back(*position);
+ }
+ TSaverContext saverContext(StoragesManager->GetOperator(IStoragesManager::DefaultStorageId), StoragesManager);
+ auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(CompactionLimits, granule, portions, saverContext);
+ for (auto&& i : positions) {
+ result->AddCheckPoint(i);
+ }
+ return result;
+ }
+
+};
+
+class TLevelsOptimizerPlanner: public IOptimizerPlanner {
+private:
+ using TBase = IOptimizerPlanner;
+ std::shared_ptr<TLevel> L3;
+ std::shared_ptr<TLevel> LMax;
+ std::shared_ptr<TLevel> LStart;
+ const std::shared_ptr<IStoragesManager> StoragesManager;
+ std::shared_ptr<TCounters> Counters;
+protected:
+ virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) override {
+ const TInstant currentInstant = TInstant::Now();
+ for (auto&& i : add) {
+ if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") {
+ continue;
+ }
+ if (!i->GetMeta().RecordSnapshotMax) {
+ LMax->AddPortion(i, currentInstant);
+ } else {
+ LStart->AddPortion(i, currentInstant);
+ }
+ }
+ for (auto&& i : remove) {
+ if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") {
+ continue;
+ }
+ if (!i->GetMeta().RecordSnapshotMax) {
+ LMax->RemovePortion(i, currentInstant);
+ } else {
+ LStart->RemovePortion(i, currentInstant);
+ }
+ }
+ }
+ virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const override {
+ return LStart->BuildOptimizationTask(limits, granule, busyPortions, TInstant::Now());
+
+ }
+ virtual TOptimizationPriority DoGetUsefulMetric() const override {
+ return TOptimizationPriority::Critical(LStart->GetWeight());
+ }
+ virtual TString DoDebugString() const override {
+ return "";
+ }
+
+public:
+ TLevelsOptimizerPlanner(const ui64 granuleId, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<arrow::Schema>& primaryKeysSchema)
+ : TBase(granuleId)
+ , StoragesManager(storagesManager)
+ , Counters(std::make_shared<TCounters>())
+ {
+ L3 = std::make_shared<TLevel>(TDuration::Seconds(120), 24 << 20, nullptr, StoragesManager, Counters, primaryKeysSchema);
+ LMax = L3;
+ LStart = L3;
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make
new file mode 100644
index 00000000000..3f96a571747
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ optimizer.cpp
+ counters.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/changes/abstract
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h
deleted file mode 100644
index 9638b6836fe..00000000000
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h
+++ /dev/null
@@ -1,59 +0,0 @@
-#pragma once
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <library/cpp/object_factory/object_factory.h>
-
-namespace NKikimr::NOlap {
-struct TCompactionLimits;
-class TGranuleMeta;
-class TColumnEngineChanges;
-}
-
-namespace NKikimr::NOlap::NStorageOptimizer {
-
-class IOptimizerPlanner {
-private:
- const ui64 GranuleId;
-protected:
- virtual void DoAddPortion(const std::shared_ptr<TPortionInfo>& info) = 0;
- virtual void DoRemovePortion(const std::shared_ptr<TPortionInfo>& info) = 0;
- virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const = 0;
- virtual i64 DoGetUsefulMetric() const = 0;
- virtual std::vector<std::shared_ptr<TPortionInfo>> DoGetPortionsOrderedByPK(const TSnapshot& snapshot) const = 0;
- virtual TString DoDebugString() const {
- return "";
- }
-public:
- using TFactory = NObjectFactory::TObjectFactory<IOptimizerPlanner, TString>;
- IOptimizerPlanner(const ui64 granuleId)
- : GranuleId(granuleId)
- {
-
- }
-
-
- virtual ~IOptimizerPlanner() = default;
- TString DebugString() const {
- return DoDebugString();
- }
-
- std::vector<std::shared_ptr<TPortionInfo>> GetPortionsOrderedByPK(const TSnapshot& snapshot) const {
- return DoGetPortionsOrderedByPK(snapshot);
- }
-
- void AddPortion(const std::shared_ptr<TPortionInfo>& info) {
- Y_VERIFY(info);
- NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId));
- DoAddPortion(info);
- }
- void RemovePortion(const std::shared_ptr<TPortionInfo>& info) {
- Y_VERIFY(info);
- NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId));
- DoRemovePortion(info);
- }
- std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const;
- i64 GetUsefulMetric() const {
- return DoGetUsefulMetric();
- }
-};
-
-} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp
index 1c21de24754..aafb4ce76dc 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp
@@ -1,7 +1,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/tx/columnshard/splitter/rb_splitter.h>
#include <ydb/core/tx/columnshard/counters/indexation.h>
-#include <ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h>
#include <ydb/core/formats/arrow/serializer/batch_only.h>
#include <ydb/core/formats/arrow/simple_builder/batch.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make
index 15879284dca..0e0afa93c2a 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make
@@ -1,15 +1,9 @@
LIBRARY()
-SRCS(
- optimizer.cpp
- intervals_optimizer.cpp
-)
-
PEERDIR(
- contrib/libs/apache/arrow
- ydb/core/protos
- ydb/core/formats/arrow
- ydb/core/tx/columnshard/engines/changes/abstract
+ ydb/core/tx/columnshard/engines/storage/optimizer/abstract
+ ydb/core/tx/columnshard/engines/storage/optimizer/intervals
+ ydb/core/tx/columnshard/engines/storage/optimizer/levels
)
END()
diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h
index ed78f7ce3ec..c7c80a4908e 100644
--- a/ydb/core/tx/columnshard/engines/storage/storage.h
+++ b/ydb/core/tx/columnshard/engines/storage/storage.h
@@ -70,7 +70,8 @@ public:
return {};
}
for (auto it = GranuleCompactionPrioritySorting.rbegin(); it != GranuleCompactionPrioritySorting.rend(); ++it) {
- if (it->first.GetWeight() == 0) {
+ if (it->first.GetWeight().IsZero()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "zero_granule_reached");
break;
}
Y_VERIFY(it->second.size());
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index bdeface567a..d84f12dc8eb 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -305,6 +305,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
}
changes->Blobs.insert(blobs.begin(), blobs.end());
+ blobs.clear();
changes->StartEmergency();
NOlap::TConstructionContext context(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Indexation"));
@@ -381,7 +382,7 @@ bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db,
changes->StartEmergency();
- const bool result = engine.ApplyChanges(db, changes, TSnapshot(1,0));
+ const bool result = engine.ApplyChanges(db, changes, TSnapshot(1,1));
changes->AbortEmergency();
return result;
}
@@ -527,12 +528,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// insert
ui64 planStep = 1;
- THashMap<TBlobRange, TString> blobs;
ui64 numRows = 1000;
ui64 rowPos = 0;
for (ui64 txId = 1; txId <= 20; ++txId, rowPos += numRows) {
TString testBlob = MakeTestBlob(rowPos, rowPos + numRows);
auto blobRange = MakeBlobRange(++step, testBlob.size());
+ THashMap<TBlobRange, TString> blobs;
blobs[blobRange] = testBlob;
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
@@ -623,12 +624,13 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
- THashMap<TBlobRange, TString> blobs;
ui64 numRows = 1000;
ui64 rowPos = 0;
+ THashMap<TBlobRange, TString> blobsAll;
for (ui64 txId = 1; txId <= 100; ++txId, rowPos += numRows) {
TString testBlob = MakeTestBlob(rowPos, rowPos + numRows);
auto blobRange = MakeBlobRange(++step, testBlob.size());
+ THashMap<TBlobRange, TString> blobs;
blobs[blobRange] = testBlob;
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
@@ -637,6 +639,9 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
+ for (auto&& i : blobs) {
+ blobsAll[i.first] = i.second;
+ }
UNIT_ASSERT(ok);
}
@@ -649,7 +654,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// compact
planStep = 2;
- bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {23, 5, 5});
+ bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobsAll), step, {23, 5, 5});
UNIT_ASSERT(ok);
// success write after compaction
@@ -658,6 +663,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
for (ui64 txId = 1; txId <= 2; ++txId, rowPos += numRows) {
TString testBlob = MakeTestBlob(rowPos, rowPos + numRows);
auto blobRange = MakeBlobRange(++step, testBlob.size());
+ THashMap<TBlobRange, TString> blobs;
blobs[blobRange] = testBlob;
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
@@ -692,12 +698,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
engine.Load(db, lostBlobs);
- THashMap<TBlobRange, TString> blobs;
ui64 numRows = 1000;
ui64 rowPos = 0;
for (ui64 txId = 1; txId <= 20; ++txId, rowPos += numRows) {
TString testBlob = MakeTestBlob(rowPos, rowPos + numRows);
auto blobRange = MakeBlobRange(++step, testBlob.size());
+ THashMap<TBlobRange, TString> blobs;
blobs[blobRange] = testBlob;
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]