diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-05 19:04:21 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-05 19:50:21 +0300 |
commit | 4b6ad89d8b3d7e73e4286ee2a5bc63fc22f4a578 (patch) | |
tree | 15049f86ba068f9dc3a68ce2c1e1f28d16ee9db1 | |
parent | d88f74d6fb31cb53b756b784efd505222c7f4b5f (diff) | |
download | ydb-4b6ad89d8b3d7e73e4286ee2a5bc63fc22f4a578.tar.gz |
KIKIMR-19211: special wave-optimizer (level in future)
62 files changed, 1929 insertions, 667 deletions
diff --git a/.mapping.json b/.mapping.json index 8c7d862863b..7d001aaae85 100644 --- a/.mapping.json +++ b/.mapping.json @@ -5366,6 +5366,21 @@ "ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt":"", "ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.txt":"", "ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt":"", + "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt":"", "ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt":"", "ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt":"", "ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h index ede577eb68f..9894195ab60 100644 --- a/ydb/core/formats/arrow/replace_key.h +++ b/ydb/core/formats/arrow/replace_key.h @@ -53,7 +53,7 @@ public: : Columns(columns) , Position(position) { - Y_VERIFY_DEBUG(Size() > 0 && Position < (ui64)Column(0).length()); + Y_VERIFY(Size() > 0 && Position < (ui64)Column(0).length()); } template<typename T = TArrayVecPtr> requires IsOwning @@ -61,7 +61,7 @@ public: : Columns(std::make_shared<TArrayVec>(std::move(columns))) , Position(position) { - Y_VERIFY_DEBUG(Size() > 0 && Position < (ui64)Column(0).length()); + Y_VERIFY(Size() > 0 && Position < (ui64)Column(0).length()); } size_t Hash() const { @@ -70,7 +70,7 @@ public: template<typename T> bool operator == (const TReplaceKeyTemplate<T>& key) const { - Y_VERIFY_DEBUG(Size() == key.Size()); + Y_VERIFY(Size() == key.Size()); for (int i = 0; i < Size(); ++i) { auto cmp = CompareColumnValue(i, key, i); @@ -83,7 +83,7 @@ public: template<typename T> std::partial_ordering operator <=> (const TReplaceKeyTemplate<T>& key) const { - Y_VERIFY_DEBUG(Size() == key.Size()); + Y_VERIFY(Size() == key.Size()); for (int i = 0; i < Size(); ++i) { auto cmp = CompareColumnValue(i, key, i); @@ -96,7 +96,7 @@ public: template<typename T> std::partial_ordering CompareNotNull(const TReplaceKeyTemplate<T>& key) const { - Y_VERIFY_DEBUG(Size() == key.Size()); + Y_VERIFY(Size() == key.Size()); for (int i = 0; i < Size(); ++i) { auto cmp = CompareColumnValueNotNull(i, key, i); @@ -109,8 +109,8 @@ public: template<typename T> std::partial_ordering ComparePartNotNull(const TReplaceKeyTemplate<T>& key, int size) const { - Y_VERIFY_DEBUG(size <= key.Size()); - Y_VERIFY_DEBUG(size <= Size()); + Y_VERIFY(size <= key.Size()); + Y_VERIFY(size <= Size()); for (int i = 0; i < size; ++i) { auto cmp = CompareColumnValueNotNull(i, key, i); diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index c6ad1b82fa8..35745d5b376 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -19,6 +19,13 @@ TLogoBlobID ParseLogoBlobId(TString blobId) { } struct TBlobBatch::TBatchInfo : TNonCopyable { +private: + std::vector<TUnifiedBlobId> BlobIds; +public: + const std::vector<TUnifiedBlobId>& GetBlobIds() const { + return BlobIds; + } + TIntrusivePtr<TTabletStorageInfo> TabletInfo; TAllocatedGenStepConstPtr GenStepRef; const TBlobsManagerCounters Counters; @@ -26,7 +33,6 @@ struct TBlobBatch::TBatchInfo : TNonCopyable { const ui32 Step; const ui32 Channel; - std::vector<ui32> BlobSizes; std::vector<bool> InFlight; i32 InFlightCount; ui64 TotalSizeBytes; @@ -42,18 +48,15 @@ struct TBlobBatch::TBatchInfo : TNonCopyable { , TotalSizeBytes(0) { } - TUnifiedBlobId NextBlobId(ui32 blobSize) { - BlobSizes.push_back(blobSize); + TUnifiedBlobId NextBlobId(const ui32 blobSize) { InFlight.push_back(true); ++InFlightCount; TotalSizeBytes += blobSize; - return MakeBlobId(BlobSizes.size() - 1); - } - TUnifiedBlobId MakeBlobId(ui32 i) const { - Y_VERIFY(i < BlobSizes.size()); const ui32 dsGroup = TabletInfo->GroupFor(Channel, Gen); - return TUnifiedBlobId(dsGroup, TLogoBlobID(TabletInfo->TabletID, Gen, Step, Channel, BlobSizes[i], i)); + TUnifiedBlobId nextBlobId(dsGroup, TLogoBlobID(TabletInfo->TabletID, Gen, Step, Channel, blobSize, BlobIds.size())); + BlobIds.emplace_back(std::move(nextBlobId)); + return BlobIds.back(); } }; @@ -90,6 +93,7 @@ void TBlobBatch::OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto BatchInfo->Counters.OnPutResult(blobId.BlobSize()); Y_VERIFY(status == NKikimrProto::OK, "The caller must handle unsuccessful status"); Y_VERIFY(BatchInfo); + Y_VERIFY(blobId.Cookie() < BatchInfo->InFlight.size()); Y_VERIFY(BatchInfo->InFlight[blobId.Cookie()], "Blob %s is already acked!", blobId.ToString().c_str()); BatchInfo->InFlight[blobId.Cookie()] = false; @@ -104,7 +108,7 @@ bool TBlobBatch::AllBlobWritesCompleted() const { ui64 TBlobBatch::GetBlobCount() const { if (BatchInfo) { - return BatchInfo->BlobSizes.size(); + return BatchInfo->GetBlobIds().size(); } return 0; } @@ -217,7 +221,7 @@ TGenStep TBlobManager::FindNewGCBarrier() { std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTask(const TString& storageId, const std::shared_ptr<TBlobManager>& manager) { if (BlobsToKeep.empty() && BlobsToDelete.empty() && LastCollectedGenStep == TGenStep{CurrentGen, CurrentStep}) { - ACFL_DEBUG("event", "TBlobManager::NeedStorageGC skip"); + ACFL_DEBUG("event", "TBlobManager::BuildGCTask skip")("current_gen", CurrentGen)("current_step", CurrentStep); return nullptr; } @@ -231,7 +235,7 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui NOlap::NBlobOperations::NBlobStorage::TGCTask::TGCListsByGroup perGroupGCListsInFlight; - // Clear all possibly not keeped trash in channel's groups: create an event for each group + // Clear all possibly not kept trash in channel's groups: create an event for each group if (FirstGC) { FirstGC = false; @@ -256,6 +260,7 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui } ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation()); perGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc", *keepBlobIt); } BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt); BlobsManagerCounters.OnBlobsKeep(BlobsToKeep); @@ -267,10 +272,12 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui if (genStep > newCollectGenStep) { break; } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", *blobIt); ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation()); NOlap::NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[blobGroup]; bool skipDontKeep = false; if (gl.KeepList.erase(*blobIt)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc_remove", *blobIt); // Skipped blobs still need to be deleted from BlobsToKeep table keepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt)); @@ -317,21 +324,21 @@ void TBlobManager::DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) { LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Save Batch GenStep: " << blobBatch.BatchInfo->Gen << ":" << blobBatch.BatchInfo->Step - << " Blob count: " << blobBatch.BatchInfo->BlobSizes.size()); + << " Blob count: " << blobBatch.BatchInfo->GetBlobIds().size()); // Add this batch to KeepQueue TGenStep edgeGenStep = EdgeGenStep(); - for (ui32 i = 0; i < blobBatch.BatchInfo->BlobSizes.size(); ++i) { - const TUnifiedBlobId blobId = blobBatch.BatchInfo->MakeBlobId(i); + for (auto&& blobId: blobBatch.BatchInfo->GetBlobIds()) { Y_VERIFY_DEBUG(blobId.IsDsBlob(), "Not a DS blob id: %s", blobId.ToStringNew().c_str()); - auto logoblobId = blobId.GetLogoBlobId(); - TGenStep genStep{logoblobId.Generation(), logoblobId.Step()}; + auto logoBlobId = blobId.GetLogoBlobId(); + TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()}; AFL_VERIFY(genStep > edgeGenStep)("gen_step", genStep)("edge_gen_step", edgeGenStep)("blob_id", blobId.ToStringNew()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep", logoBlobId.ToString()); - BlobsManagerCounters.OnKeepMarker(logoblobId.BlobSize()); - BlobsToKeep.insert(std::move(logoblobId)); + BlobsManagerCounters.OnKeepMarker(logoBlobId.BlobSize()); + BlobsToKeep.insert(std::move(logoBlobId)); db.AddBlobToKeep(blobId); } BlobsManagerCounters.OnBlobsKeep(BlobsToKeep); @@ -344,6 +351,7 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) // Persist deletion intent db.AddBlobToDelete(blobId); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete", blobId); // Check if the deletion needs to be delayed until the blob is no longer // used by in-flight requests diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp index 301ca21deac..5604d9d98b5 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp @@ -11,11 +11,13 @@ TWriteCounters::TWriteCounters(const TConsumerCounters& owner) RepliesCount = TBase::GetDeriviative("Replies/Count"); ReplyBytes = TBase::GetDeriviative("Replies/Bytes"); - ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000)); + ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1)); + ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1)); FailsCount = TBase::GetDeriviative("Fails/Count"); FailBytes = TBase::GetDeriviative("Fails/Bytes"); - FailDuration = TBase::GetHistogram("Fails/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000)); + FailDurationBySize = TBase::GetHistogram("Fails/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 2)); + FailDurationByCount = TBase::GetHistogram("Fails/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 2)); } } diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.h b/ydb/core/tx/columnshard/blobs_action/counters/write.h index 88c474a3fe1..591937f0de2 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/write.h +++ b/ydb/core/tx/columnshard/blobs_action/counters/write.h @@ -14,11 +14,13 @@ private: NMonitoring::TDynamicCounters::TCounterPtr RepliesCount; NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes; - NMonitoring::THistogramPtr ReplyDuration; + NMonitoring::THistogramPtr ReplyDurationByCount; + NMonitoring::THistogramPtr ReplyDurationBySize; NMonitoring::TDynamicCounters::TCounterPtr FailsCount; NMonitoring::TDynamicCounters::TCounterPtr FailBytes; - NMonitoring::THistogramPtr FailDuration; + NMonitoring::THistogramPtr FailDurationByCount; + NMonitoring::THistogramPtr FailDurationBySize; public: TWriteCounters(const TConsumerCounters& owner); @@ -30,13 +32,15 @@ public: void OnReply(const ui64 bytes, const TDuration d) const { RepliesCount->Add(1); ReplyBytes->Add(bytes); - ReplyDuration->Collect(d.MilliSeconds()); + ReplyDurationByCount->Collect((i64)d.MilliSeconds()); + ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes); } void OnFail(const ui64 bytes, const TDuration d) const { FailsCount->Add(1); FailBytes->Add(bytes); - FailDuration->Collect(d.MilliSeconds()); + FailDurationByCount->Collect((i64)d.MilliSeconds()); + FailDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes); } }; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp index 8ab7fedda1b..35907ba3e4d 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp @@ -5,12 +5,12 @@ namespace NKikimr::NColumnShard { bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) { - TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())); + auto changes = Ev->Get()->IndexChanges; + TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("external_task_id", changes->GetTaskIdentifier()); Y_VERIFY(Self->InsertTable); Y_VERIFY(Self->TablesManager.HasPrimaryIndex()); txc.DB.NoMoreReadsForTx(); - auto changes = Ev->Get()->IndexChanges; ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", *changes); if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) { NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index b7d50f84cf4..c539d179533 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -75,7 +75,7 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T NeedGranuleStatusProvide = false; } -TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext) +TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext) : TBase(limits.GetSplitSettings(), saverContext, StaticTypeName()) , Limits(limits) , GranuleMeta(granule) @@ -83,10 +83,10 @@ TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits Y_VERIFY(GranuleMeta); SwitchedPortions.reserve(portions.size()); - for (const auto& [_, portionInfo] : portions) { + for (const auto& portionInfo : portions) { Y_VERIFY(portionInfo->IsActive()); SwitchedPortions.emplace_back(*portionInfo); - PortionsToRemove.emplace_back(*portionInfo); + AFL_VERIFY(PortionsToRemove.emplace(portionInfo->GetAddress(), *portionInfo).second); Y_VERIFY(portionInfo->GetGranule() == GranuleMeta->GetGranuleId()); } Y_VERIFY(SwitchedPortions.size()); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h index fcba1e97cf0..f22c85e124e 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction.h @@ -32,7 +32,7 @@ public: virtual THashSet<TPortionAddress> GetTouchedPortions() const override; - TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext); + TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext); ~TCompactColumnEngineChanges(); static TString StaticTypeName() { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 6a137f7748f..17da977f5e9 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -37,7 +37,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc pkFieldNamesSet.emplace(i); } - std::shared_ptr<arrow::RecordBatch> batchResult; + std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults; { arrow::FieldVector indexFields; indexFields.emplace_back(portionIdField); @@ -62,51 +62,53 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey())); mergeStream.AddPoolSource({}, batch, nullptr); } - - NIndexedReader::TRecordBatchBuilder indexesBuilder(indexFields); - mergeStream.DrainAll(indexesBuilder); - batchResult = indexesBuilder.Finalize(); + batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields, true); } - - auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); - auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName); - auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); - auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); - Y_VERIFY(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx); - Y_VERIFY(columnPortionIdx->type_id() == arrow::UInt16Type::type_id); - Y_VERIFY(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id); - Y_VERIFY(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); - Y_VERIFY(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); - const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx); - const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx); - - const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / 10000 + 1) + 1; + Y_VERIFY(batchResults.size()); TSerializationStats stats; for (auto&& i : SwitchedPortions) { stats.Merge(i.GetSerializationStat(*resultSchema)); } - std::map<std::string, std::vector<TColumnPortionResult>> columnChunks; - + std::vector<std::map<std::string, std::vector<TColumnPortionResult>>> chunkGroups; + chunkGroups.resize(batchResults.size()); for (auto&& f : resultSchema->GetSchema()->fields()) { const ui32 columnId = resultSchema->GetColumnId(f->name()); auto columnInfo = stats.GetColumnInfo(columnId); Y_VERIFY(columnInfo); - TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext); - TMergedColumn mColumn(context); - { -// auto c = batchResult->GetColumnByName(f->name()); -// AFL_VERIFY(!c); + + std::vector<TPortionColumnCursor> cursors; + auto loader = resultSchema->GetColumnLoader(f->name()); + for (auto&& p : portions) { + std::vector<const TColumnRecord*> records; + std::vector<IPortionColumnChunk::TPtr> chunks; + p.ExtractColumnChunks(columnId, records, chunks); + cursors.emplace_back(TPortionColumnCursor(chunks, records, loader)); + } + + ui32 batchesRecordsCount = 0; + ui32 columnRecordsCount = 0; + std::map<std::string, std::vector<TColumnPortionResult>> columnChunks; + ui32 batchIdx = 0; + for (auto&& batchResult : batchResults) { + const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / 10000 + 1) + 1; + TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext); + TMergedColumn mColumn(context); + + auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); + auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName); + auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); + auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); + Y_VERIFY(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx); + Y_VERIFY(columnPortionIdx->type_id() == arrow::UInt16Type::type_id); + Y_VERIFY(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id); + Y_VERIFY(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); + Y_VERIFY(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); + const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx); + const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx); + AFL_VERIFY(batchResult->num_rows() == pIdxArray.length()); - std::vector<TPortionColumnCursor> cursors; - auto loader = resultSchema->GetColumnLoader(f->name()); - for (auto&& p : portions) { - std::vector<const TColumnRecord*> records; - std::vector<IPortionColumnChunk::TPtr> chunks; - p.ExtractColumnChunks(columnId, records, chunks); - cursors.emplace_back(TPortionColumnCursor(chunks, records, loader)); - } std::optional<ui16> predPortionIdx; for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) { const ui16 portionIdx = pIdxArray.Value(idx); @@ -121,46 +123,54 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc } predPortionIdx = portionIdx; } + chunkGroups[batchIdx][f->name()] = mColumn.BuildResult(); + batchesRecordsCount += batchResult->num_rows(); + columnRecordsCount += mColumn.GetRecordsCount(); + ++batchIdx; } - AFL_VERIFY(mColumn.GetRecordsCount() == batchResult->num_rows())("f_name", f->name())("mCount", mColumn.GetRecordsCount())("bCount", batchResult->num_rows()); - columnChunks[f->name()] = mColumn.BuildResult(); - } - - Y_VERIFY(columnChunks.size()); + AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("f_name", f->name())("mCount", columnRecordsCount)("bCount", batchesRecordsCount); - for (auto&& i : columnChunks) { - if (i.second.size() != columnChunks.begin()->second.size()) { - for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())("p", i.second[p].DebugString()); + } + ui32 batchIdx = 0; + for (auto&& columnChunks : chunkGroups) { + auto batchResult = batchResults[batchIdx]; + ++batchIdx; + Y_VERIFY(columnChunks.size()); + + for (auto&& i : columnChunks) { + if (i.second.size() != columnChunks.begin()->second.size()) { + for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())("p", i.second[p].DebugString()); + } } + AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())("current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); } - AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())("current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); - } - std::vector<TGeneralSerializedSlice> batchSlices; - std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, std::move(stats))); + std::vector<TGeneralSerializedSlice> batchSlices; + std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, std::move(stats))); - for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) { - std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns; - for (auto&& p : columnChunks) { - portionColumns.emplace(resultSchema->GetColumnId(p.first), p.second[i].GetChunks()); + for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) { + std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns; + for (auto&& p : columnChunks) { + portionColumns.emplace(resultSchema->GetColumnId(p.first), p.second[i].GetChunks()); + } + batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings()); } - batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings()); - } - TSimilarSlicer slicer(4 * 1024 * 1024); - auto packs = slicer.Split(batchSlices); - - ui32 recordIdx = 0; - for (auto&& i : packs) { - TGeneralSerializedSlice slice(std::move(i)); - auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); - std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs(); - AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator())); - NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); - NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); - AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName()); - recordIdx += slice.GetRecordsCount(); + TSimilarSlicer slicer(4 * 1024 * 1024); + auto packs = slicer.Split(batchSlices); + + ui32 recordIdx = 0; + for (auto&& i : packs) { + TGeneralSerializedSlice slice(std::move(i)); + auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); + std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs(); + AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator())); + NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); + NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); + AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName()); + recordIdx += slice.GetRecordsCount(); + } } if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { TStringBuilder sbSwitched; @@ -198,4 +208,11 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL; } +void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position) { + if (CheckPoints.size()) { + AFL_VERIFY(CheckPoints.back().Compare(position) == std::partial_ordering::less); + } + CheckPoints.emplace_back(position); +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index c246efe9ff4..2a5309646c0 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -1,5 +1,6 @@ #pragma once #include "compaction.h" +#include <ydb/core/formats/arrow/reader/read_filter_merger.h> namespace NKikimr::NOlap::NCompaction { @@ -7,6 +8,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { private: using TBase = TCompactColumnEngineChanges; virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; + std::vector<NIndexedReader::TSortableBatchPosition> CheckPoints; protected: virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override; virtual TPortionMeta::EProduced GetResultProducedClass() const override { @@ -17,6 +19,8 @@ protected: public: using TBase::TBase; + void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position); + virtual TString TypeString() const override { return StaticTypeName(); } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 2d3be5d8839..b280e7a1b5c 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -47,7 +47,7 @@ void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { for (size_t i = 0; i < DataToIndex.size(); ++i) { const auto& insertedData = DataToIndex[i]; Y_VERIFY(insertedData.GetBlobRange().IsFullBlob()); - reading->AddRange(insertedData.GetBlobRange()); + reading->AddRange(insertedData.GetBlobRange(), insertedData.GetBlobData().value_or("")); removing->DeclareRemove(insertedData.GetBlobRange().GetBlobId()); } @@ -90,26 +90,25 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont Y_VERIFY(indexInfo.IsSorted()); std::shared_ptr<arrow::RecordBatch> batch; - if (auto* blobData = Blobs.FindPtr(blobRange)) { - Y_VERIFY(!blobData->empty(), "Blob data not present"); + { + auto itBlobData = Blobs.find(blobRange); + Y_VERIFY(itBlobData != Blobs.end(), "Data for range %s has not been read", blobRange.ToString().c_str()); + Y_VERIFY(!itBlobData->second.empty(), "Blob data not present"); // Prepare batch - batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema()); - if (!batch) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD) - ("event", "cannot_parse") - ("data_snapshot", TStringBuilder() << inserted.GetSnapshot()) - ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot()); - } - } else { - Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str()); + batch = NArrow::DeserializeBatch(itBlobData->second, indexInfo.ArrowSchema()); + Blobs.erase(itBlobData); + AFL_VERIFY(batch)("event", "cannot_parse") + ("data_snapshot", TStringBuilder() << inserted.GetSnapshot()) + ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot()); + ; } - Y_VERIFY(batch); batch = AddSpecials(batch, blobSchema->GetIndexInfo(), inserted); batch = resultSchema->NormalizeBatch(*blobSchema, batch); pathBatches[inserted.PathId].push_back(batch); Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey())); } + Y_VERIFY(Blobs.empty()); for (auto& [pathId, batches] : pathBatches) { AddPathIfNotExists(pathId); diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index 33dd6d78245..f847bf75faa 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -22,7 +22,6 @@ void TTTLColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { Y_VERIFY(PortionsToEvict.size() || PortionsToRemove.size()); for (const auto& p : PortionsToEvict) { Y_VERIFY(!p.GetPortionInfo().Empty()); - PortionsToRemove.emplace_back(p.GetPortionInfo()); auto agent = BlobsAction.GetReading(p.GetPortionInfo()); for (const auto& rec : p.GetPortionInfo().Records) { @@ -73,7 +72,8 @@ NKikimr::TConclusionStatus TTTLColumnEngineChanges::DoConstructBlobs(TConstructi for (auto&& info : PortionsToEvict) { if (auto pwb = UpdateEvictedPortion(info, Blobs, context)) { - PortionsToRemove.emplace_back(info.GetPortionInfo()); + info.MutablePortionInfo().SetRemoveSnapshot(info.MutablePortionInfo().GetMinSnapshot()); + AFL_VERIFY(PortionsToRemove.emplace(info.GetPortionInfo().GetAddress(), info.GetPortionInfo()).second); AppendedPortions.emplace_back(std::move(*pwb)); } } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index 6e7208b8512..2c93c4d1a80 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -33,6 +33,10 @@ private: const TPortionInfo& GetPortionInfo() const { return PortionInfo; } + + TPortionInfo& MutablePortionInfo() { + return PortionInfo; + } }; std::optional<TPortionInfoWithBlobs> UpdateEvictedPortion(TPortionForEviction& info, const THashMap<TBlobRange, TString>& srcBlobs, diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index e60288c7fdc..d69469db795 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -41,7 +41,7 @@ void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIn self.IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size()); THashSet<TUnifiedBlobId> blobsDeactivated; - for (auto& portionInfo : PortionsToRemove) { + for (auto& [_, portionInfo] : PortionsToRemove) { for (auto& rec : portionInfo.Records) { blobsDeactivated.insert(rec.BlobRange.BlobId); } @@ -84,7 +84,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange } auto g = self.GranulesStorage->StartPackModification(); - for (auto& portionInfo : PortionsToRemove) { + for (auto& [_, portionInfo] : PortionsToRemove) { Y_VERIFY(!portionInfo.Empty()); Y_VERIFY(!portionInfo.IsActive()); @@ -100,7 +100,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange } } - for (auto& portionInfo : PortionsToRemove) { + for (auto& [_, portionInfo] : PortionsToRemove) { self.CleanupPortions[portionInfo.GetRemoveSnapshot()].emplace_back(portionInfo); } @@ -112,9 +112,10 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) { i.GetPortionInfo().SetPortion(context.NextPortionId()); i.GetPortionInfo().UpdateRecordsMeta(TPortionMeta::EProduced::INSERTED); } - for (auto& portionInfo : PortionsToRemove) { - Y_VERIFY(portionInfo.IsActive()); - portionInfo.SetRemoveSnapshot(context.GetSnapshot()); + for (auto& [_, portionInfo] : PortionsToRemove) { + if (portionInfo.IsActive()) { + portionInfo.SetRemoveSnapshot(context.GetSnapshot()); + } } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index f107eb637e8..8a454a3d5f1 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -39,12 +39,12 @@ public: virtual THashSet<TPortionAddress> GetTouchedPortions() const override { THashSet<TPortionAddress> result; for (auto&& i : PortionsToRemove) { - result.emplace(i.GetAddress()); + result.emplace(i.first); } return result; } - std::vector<TPortionInfo> PortionsToRemove; + THashMap<TPortionAddress, TPortionInfo> PortionsToRemove; std::vector<TPortionInfoWithBlobs> AppendedPortions; THashMap<ui64, std::pair<ui64, TMark>> NewGranules; ui64 FirstGranuleId = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index fd358b9232c..2f70988033d 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -449,7 +449,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering if (!keep && context.AllowDrop) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString()); dropBlobs += info->NumBlobs(); - context.Changes->PortionsToRemove.emplace_back(*info); + AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second); SignalCounters.OnPortionToDrop(info->BlobsBytes()); } } else { @@ -573,7 +573,7 @@ void TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec) { const TMark mark(rec.Mark); AFL_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second)("event", "marker_duplication")("granule_id", rec.Granule)("old_granule_id", PathGranules[rec.PathId][mark]); - AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters())).second)("event", "granule_duplication") + AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second)("event", "granule_duplication") ("rec_path_id", rec.PathId)("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString()); } @@ -583,7 +583,7 @@ std::optional<ui64> TColumnEngineForLogs::NewGranule(const TGranuleRecord& rec) auto insertInfo = PathGranules[rec.PathId].emplace(mark, rec.Granule); if (insertInfo.second) { - AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters())).second)("event", "granule_duplication") + AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second)("event", "granule_duplication") ("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString()); return {}; } else { @@ -689,17 +689,21 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot Y_VERIFY(spg); bool granuleHasDataForSnaphsot = false; - std::vector<std::shared_ptr<TPortionInfo>> orderedPortions = spg->GroupOrderedPortionsByPK(snapshot); - for (const auto& portionInfo : orderedPortions) { - Y_VERIFY(portionInfo->Produced()); - const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo()); - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected") - ("granule", granule)("portion", portionInfo->DebugString()); - if (skipPortion) { - continue; + for (const auto& [_, keyPortions] : spg->GroupOrderedPortionsByPK()) { + for (auto&& [_, portionInfo] : keyPortions) { + if (!portionInfo->IsVisible(snapshot)) { + continue; + } + Y_VERIFY(portionInfo->Produced()); + const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected") + ("granule", granule)("portion", portionInfo->DebugString()); + if (skipPortion) { + continue; + } + out->PortionsOrderedPK.emplace_back(portionInfo); + granuleHasDataForSnaphsot = true; } - out->PortionsOrderedPK.emplace_back(portionInfo); - granuleHasDataForSnaphsot = true; } if (granuleHasDataForSnaphsot) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp index 91d4574890e..b15ce2015b8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp @@ -27,7 +27,6 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, TPlainR bool TScanHead::BuildNextInterval() { while (BorderPoints.size()) { -// Y_VERIFY(FrontEnds.size()); auto position = BorderPoints.begin()->first; auto firstBorderPointInfo = std::move(BorderPoints.begin()->second); const bool isIncludeStart = CurrentSegments.empty(); @@ -49,17 +48,6 @@ bool TScanHead::BuildNextInterval() { AFL_VERIFY(CurrentSegments.erase(i->GetSourceIdx()))("idx", i->GetSourceIdx()); } -// const bool isFirstFinished = (position == *FrontEnds.begin()); -// if (firstBorderPointInfo.GetFinishSources().size()) { -// Y_VERIFY(isFirstFinished); -// Y_VERIFY(FrontEnds.erase(position)); -// } else { -// Y_VERIFY(!FrontEnds.erase(position)); -// } - -// if (isFirstFinished) { -// DrainSources(); -// } CurrentStart = BorderPoints.begin()->first; BorderPoints.erase(BorderPoints.begin()); if (CurrentSegments.size()) { @@ -87,9 +75,8 @@ void TScanHead::DrainResults() { } void TScanHead::DrainSources() { - while (Sources.size()/* && (FrontEnds.empty() || Sources.front()->GetStart().Compare(*FrontEnds.begin()) != std::partial_ordering::greater)*/) { + while (Sources.size()) { auto source = Sources.front(); -// FrontEnds.emplace(source->GetFinish()); BorderPoints[source->GetStart()].AddStart(source); BorderPoints[source->GetFinish()].AddFinish(source); Sources.pop_front(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h index 9c0525252ab..7e739988b33 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h @@ -26,7 +26,6 @@ private: std::deque<std::shared_ptr<IDataSource>> Sources; std::vector<std::shared_ptr<arrow::Field>> ResultFields; THashMap<ui32, std::shared_ptr<IDataSource>> SourceByIdx; -// std::set<NIndexedReader::TSortableBatchPosition> FrontEnds; std::map<NIndexedReader::TSortableBatchPosition, TDataSourceEndpoint> BorderPoints; std::map<ui32, std::shared_ptr<IDataSource>> CurrentSegments; std::optional<NIndexedReader::TSortableBatchPosition> CurrentStart; diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp index ef71c3366ab..cd30ee433e0 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -111,7 +111,8 @@ std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition( while (SortHeap.size() && (isFirst || result.Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) { auto& anotherIterator = SortHeap.front(); if (!isFirst) { - Y_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater); + AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) + ("key", result.DebugJson()); } NextInHeap(true); isFirst = false; @@ -122,6 +123,27 @@ std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition( return result; } +std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::vector<TSortableBatchPosition>& positions, + const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions) +{ + std::vector<std::shared_ptr<arrow::RecordBatch>> result; + for (auto&& i : positions) { + NIndexedReader::TRecordBatchBuilder indexesBuilder(resultFields); + DrainCurrentTo(indexesBuilder, i, includePositions); + result.emplace_back(indexesBuilder.Finalize()); + if (result.back()->num_rows() == 0) { + result.pop_back(); + } + } + NIndexedReader::TRecordBatchBuilder indexesBuilder(resultFields); + DrainAll(indexesBuilder); + result.emplace_back(indexesBuilder.Finalize()); + if (result.back()->num_rows() == 0) { + result.pop_back(); + } + return result; +} + NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const { NJson::TJsonValue result; result["is_cp"] = IsControlPoint(); diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index ce2bebf5b0c..9fa3888e203 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -220,6 +220,8 @@ public: bool DrainAll(TRecordBatchBuilder& builder); bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish); + std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::vector<TSortableBatchPosition>& positions, + const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions); }; class TRecordBatchBuilder { diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 84bd52f6622..37649297d59 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -1,7 +1,8 @@ #include "granule.h" #include "storage.h" #include <library/cpp/actors/core/log.h> -#include "optimizer/intervals_optimizer.h" +#include "optimizer/intervals/optimizer.h" +#include "optimizer/levels/optimizer.h" namespace NKikimr::NOlap { @@ -33,7 +34,7 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { OnBeforeChangePortion(it->second); it->second = std::make_shared<TPortionInfo>(info); } - OnAfterChangePortion(it->second); + OnAfterChangePortion(it->second, nullptr); } bool TGranuleMeta::ErasePortion(const ui64 portion) { @@ -46,7 +47,7 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) { } OnBeforeChangePortion(it->second); Portions.erase(it); - OnAfterChangePortion(nullptr); + OnAfterChangePortion(nullptr, nullptr); return true; } @@ -59,7 +60,7 @@ void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionIn portionNew->AddRecord(indexInfo, rec, portionMeta); it = Portions.emplace(portion.GetPortion(), portionNew).first; } else { - Y_VERIFY(it->second->IsEqualWithSnapshots(portion)); + AFL_VERIFY(it->second->IsEqualWithSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString()); it->second->AddRecord(indexInfo, rec, portionMeta); } if (portionMeta) { @@ -67,8 +68,10 @@ void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionIn } } -void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter) { +void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard) { if (portionAfter) { + AFL_VERIFY(PortionsByPK[portionAfter->IndexKeyStart()].emplace(portionAfter->GetPortion(), portionAfter).second); + THashMap<TUnifiedBlobId, ui64> blobIdSize; for (auto&& i : portionAfter->Records) { blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size; @@ -77,7 +80,11 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port PortionInfoGuard.OnNewBlob(portionAfter->IsActive() ? portionAfter->GetMeta().Produced : NPortion::EProduced::INACTIVE, i.second); } if (portionAfter->IsActive()) { - OptimizerPlanner->AddPortion(portionAfter); + if (modificationGuard) { + modificationGuard->AddPortion(portionAfter); + } else { + OptimizerPlanner->StartModificationGuard().AddPortion(portionAfter); + } } } if (!!AdditiveSummaryCache) { @@ -93,6 +100,17 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port void TGranuleMeta::OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore) { if (portionBefore) { + { + auto itByKey = PortionsByPK.find(portionBefore->IndexKeyStart()); + Y_VERIFY(itByKey != PortionsByPK.end()); + auto itPortion = itByKey->second.find(portionBefore->GetPortion()); + Y_VERIFY(itPortion != itByKey->second.end()); + itByKey->second.erase(itPortion); + if (itByKey->second.empty()) { + PortionsByPK.erase(itByKey); + } + } + THashMap<TUnifiedBlobId, ui64> blobIdSize; for (auto&& i : portionBefore->Records) { blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size; @@ -101,7 +119,7 @@ void TGranuleMeta::OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> por PortionInfoGuard.OnDropBlob(portionBefore->IsActive() ? portionBefore->GetMeta().Produced : NPortion::EProduced::INACTIVE, i.second); } if (portionBefore->IsActive()) { - OptimizerPlanner->RemovePortion(portionBefore); + OptimizerPlanner->StartModificationGuard().RemovePortion(portionBefore); } } if (!!AdditiveSummaryCache) { @@ -153,14 +171,14 @@ const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary( return *AdditiveSummaryCache; } -TGranuleMeta::TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters) +TGranuleMeta::TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex) : Owner(owner) , Counters(counters) , PortionInfoGuard(Owner->GetCounters().BuildPortionBlobsGuard()) , Record(rec) { Y_VERIFY(Owner); - OptimizerPlanner = std::make_shared<NStorageOptimizer::TIntervalsOptimizerPlanner>(rec.Granule, owner->GetStoragesManager()); + OptimizerPlanner = std::make_shared<NStorageOptimizer::NLevels::TLevelsOptimizerPlanner>(rec.Granule, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey()); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 955c2a83dd0..c9381575629 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -3,7 +3,7 @@ #include <ydb/core/tx/columnshard/counters/engine_logs.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <ydb/core/tx/columnshard/engines/portion_info.h> -#include "optimizer/optimizer.h" +#include "optimizer/abstract/optimizer.h" namespace NKikimr::NOlap { @@ -150,10 +150,10 @@ public: class TCompactionPriority { private: - i64 Weight = 0; + NStorageOptimizer::TOptimizationPriority Weight; TMonotonic ConstructionInstant = TMonotonic::Now(); public: - i64 GetWeight() const { + const NStorageOptimizer::TOptimizationPriority& GetWeight() const { return Weight; } @@ -163,11 +163,11 @@ public: } bool operator<(const TCompactionPriority& item) const { - return std::tie(Weight) < std::tie(item.Weight); + return Weight < item.Weight; } TString DebugString() const { - return TStringBuilder() << "summary:(" << Weight << ");"; + return TStringBuilder() << "summary:(" << Weight.DebugString() << ");"; } }; @@ -192,22 +192,24 @@ private: const NColumnShard::TGranuleDataCounters Counters; NColumnShard::TEngineLogsCounters::TPortionsInfoGuard PortionInfoGuard; std::shared_ptr<NStorageOptimizer::IOptimizerPlanner> OptimizerPlanner; + std::map<NArrow::TReplaceKey, THashMap<ui64, std::shared_ptr<TPortionInfo>>> PortionsByPK; void OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore); - void OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter); + void OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard); void OnAdditiveSummaryChange() const; public: std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> self, const THashSet<TPortionAddress>& busyPortions) const { return OptimizerPlanner->GetOptimizationTask(limits, self, busyPortions); } - std::vector<std::shared_ptr<TPortionInfo>> GroupOrderedPortionsByPK(const TSnapshot& snapshot) const { - return OptimizerPlanner->GetPortionsOrderedByPK(snapshot); + const std::map<NArrow::TReplaceKey, THashMap<ui64, std::shared_ptr<TPortionInfo>>>& GroupOrderedPortionsByPK() const { + return PortionsByPK; } void OnAfterPortionsLoad() { + auto g = OptimizerPlanner->StartModificationGuard(); for (auto&& i : Portions) { - OnAfterChangePortion(i.second); + OnAfterChangePortion(i.second, &g); } } @@ -287,7 +289,7 @@ public: bool ErasePortion(const ui64 portion); - explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters); + explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex); ui64 GetGranuleId() const { return Record.Granule; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt index 7a3a568f8c9..41da0631d39 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt @@ -6,17 +6,15 @@ # original buildsystem will not be accepted. +add_subdirectory(abstract) +add_subdirectory(intervals) +add_subdirectory(levels) -add_library(engines-storage-optimizer) -target_link_libraries(engines-storage-optimizer PUBLIC +add_library(engines-storage-optimizer INTERFACE) +target_link_libraries(engines-storage-optimizer INTERFACE contrib-libs-cxxsupp yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - engines-changes-abstract -) -target_sources(engines-storage-optimizer PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp + storage-optimizer-abstract + storage-optimizer-intervals + storage-optimizer-levels ) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt index ef0e77c9397..a71e16d03bf 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt @@ -6,18 +6,16 @@ # original buildsystem will not be accepted. +add_subdirectory(abstract) +add_subdirectory(intervals) +add_subdirectory(levels) -add_library(engines-storage-optimizer) -target_link_libraries(engines-storage-optimizer PUBLIC +add_library(engines-storage-optimizer INTERFACE) +target_link_libraries(engines-storage-optimizer INTERFACE contrib-libs-linux-headers contrib-libs-cxxsupp yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - engines-changes-abstract -) -target_sources(engines-storage-optimizer PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp + storage-optimizer-abstract + storage-optimizer-intervals + storage-optimizer-levels ) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt index ef0e77c9397..a71e16d03bf 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt @@ -6,18 +6,16 @@ # original buildsystem will not be accepted. +add_subdirectory(abstract) +add_subdirectory(intervals) +add_subdirectory(levels) -add_library(engines-storage-optimizer) -target_link_libraries(engines-storage-optimizer PUBLIC +add_library(engines-storage-optimizer INTERFACE) +target_link_libraries(engines-storage-optimizer INTERFACE contrib-libs-linux-headers contrib-libs-cxxsupp yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - engines-changes-abstract -) -target_sources(engines-storage-optimizer PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp + storage-optimizer-abstract + storage-optimizer-intervals + storage-optimizer-levels ) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt index 7a3a568f8c9..41da0631d39 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt @@ -6,17 +6,15 @@ # original buildsystem will not be accepted. +add_subdirectory(abstract) +add_subdirectory(intervals) +add_subdirectory(levels) -add_library(engines-storage-optimizer) -target_link_libraries(engines-storage-optimizer PUBLIC +add_library(engines-storage-optimizer INTERFACE) +target_link_libraries(engines-storage-optimizer INTERFACE contrib-libs-cxxsupp yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - engines-changes-abstract -) -target_sources(engines-storage-optimizer PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp + storage-optimizer-abstract + storage-optimizer-intervals + storage-optimizer-levels ) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..2827f60579d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-abstract) +target_link_libraries(storage-optimizer-abstract PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..dc5b2282ada --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-abstract) +target_link_libraries(storage-optimizer-abstract PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..dc5b2282ada --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-abstract) +target_link_libraries(storage-optimizer-abstract PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..2827f60579d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-abstract) +target_link_libraries(storage-optimizer-abstract PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp index d3e82f8b210..d3e82f8b210 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h new file mode 100644 index 00000000000..63842757b6a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h @@ -0,0 +1,113 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <library/cpp/object_factory/object_factory.h> + +namespace NKikimr::NOlap { +struct TCompactionLimits; +class TGranuleMeta; +class TColumnEngineChanges; +} + +namespace NKikimr::NOlap::NStorageOptimizer { + +class TOptimizationPriority { +private: + YDB_READONLY(i64, Level, 0); + YDB_READONLY(i64, InternalLevelWeight, 0); + TOptimizationPriority(const i64 level, const i64 levelWeight) + : Level(level) + , InternalLevelWeight(levelWeight) { + + } + +public: + bool operator<(const TOptimizationPriority& item) const { + return std::tie(Level, InternalLevelWeight) < std::tie(item.Level, item.InternalLevelWeight); + } + + bool IsZero() const { + return !Level && !InternalLevelWeight; + } + + TString DebugString() const { + return TStringBuilder() << "(" << Level << "," << InternalLevelWeight << ")"; + } + + static TOptimizationPriority Critical(const i64 weight) { + return TOptimizationPriority(10, weight); + } + + static TOptimizationPriority Optimization(const i64 weight) { + return TOptimizationPriority(0, weight); + } + + static TOptimizationPriority Zero() { + return TOptimizationPriority(0, 0); + } + +}; + +class IOptimizerPlanner { +private: + const ui64 GranuleId; +protected: + virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) = 0; + virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const = 0; + virtual TOptimizationPriority DoGetUsefulMetric() const = 0; + virtual TString DoDebugString() const { + return ""; + } +public: + using TFactory = NObjectFactory::TObjectFactory<IOptimizerPlanner, TString>; + IOptimizerPlanner(const ui64 granuleId) + : GranuleId(granuleId) + { + + } + + class TModificationGuard: TNonCopyable { + private: + IOptimizerPlanner& Owner; + std::vector<std::shared_ptr<TPortionInfo>> AddPortions; + std::vector<std::shared_ptr<TPortionInfo>> RemovePortions; + public: + TModificationGuard& AddPortion(const std::shared_ptr<TPortionInfo>& portion) { + AddPortions.emplace_back(portion); + return*this; + } + + TModificationGuard& RemovePortion(const std::shared_ptr<TPortionInfo>& portion) { + RemovePortions.emplace_back(portion); + return*this; + } + + TModificationGuard(IOptimizerPlanner& owner) + : Owner(owner) + { + } + ~TModificationGuard() { + Owner.ModifyPortions(AddPortions, RemovePortions); + } + }; + + TModificationGuard StartModificationGuard() { + return TModificationGuard(*this); + } + + virtual ~IOptimizerPlanner() = default; + TString DebugString() const { + return DoDebugString(); + } + + void ModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) { + NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId)); + DoModifyPortions(add, remove); + } + + std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const; + TOptimizationPriority GetUsefulMetric() const { + return DoGetUsefulMetric(); + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make new file mode 100644 index 00000000000..140b1ed351b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + optimizer.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/changes/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..39523c21a58 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-intervals) +target_link_libraries(storage-optimizer-intervals PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-intervals PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..b67769e366d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-intervals) +target_link_libraries(storage-optimizer-intervals PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-intervals PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..b67769e366d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-intervals) +target_link_libraries(storage-optimizer-intervals PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-intervals PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..39523c21a58 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-intervals) +target_link_libraries(storage-optimizer-intervals PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-intervals PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp new file mode 100644 index 00000000000..ea255ea852b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp @@ -0,0 +1,43 @@ +#include "blob_size.h" +#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> +#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h> + +namespace NKikimr::NOlap::NStorageOptimizer { + +std::shared_ptr<NKikimr::NOlap::TColumnEngineChanges> TBlobsWithSizeLimit::BuildMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const { + if (PortionsSize > (i64)SizeLimitToMerge || PortionsCount > CountLimitToMerge) { + i64 currentSum = 0; + std::vector<std::shared_ptr<TPortionInfo>> portions; + std::optional<TString> tierName; + for (auto&& i : Portions) { + for (auto&& c : i.second) { + if (busyPortions.contains(c.second->GetAddress())) { + continue; + } + if (c.second->GetMeta().GetTierName() && (!tierName || *tierName < c.second->GetMeta().GetTierName())) { + tierName = c.second->GetMeta().GetTierName(); + } + currentSum += c.second->GetBlobBytes(); + portions.emplace_back(c.second); + if (currentSum > (i64)32 * 1024 * 1024) { + break; + } + } + if (currentSum > (i64)32 * 1024 * 1024) { + break; + } + } + if (currentSum > SizeLimitToMerge || PortionsCount > CountLimitToMerge) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("portions", portions.size())("current_sum", currentSum); + TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager); + return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext); + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("skip", "not_enough_data"); + } + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("event", "skip_by_condition"); + } + return nullptr; +} + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.h new file mode 100644 index 00000000000..e5084e1cb45 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.h @@ -0,0 +1,145 @@ +#pragma once +#include "counters.h" +#include <ydb/core/formats/arrow/replace_key.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/splitter/settings.h> +#include <ydb/core/tx/columnshard/blobs_action/abstract/write.h> +#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> + +namespace NKikimr::NOlap::NStorageOptimizer { + +class TBlobsWithSizeLimit { +private: + YDB_READONLY(ui64, SizeLimit, 0); + YDB_READONLY(i64, SizeLimitToMerge, (i64)2 * 1024 * 1024); + YDB_READONLY(i64, CountLimitToMerge, 8); + YDB_READONLY(i64, PortionsSize, 0); + YDB_READONLY(i64, PortionsCount, 0); + std::map<NArrow::TReplaceKey, std::map<ui64, std::shared_ptr<TPortionInfo>>> Portions; + std::shared_ptr<TCounters> Counters; + std::shared_ptr<IStoragesManager> StoragesManager; +public: + TString DebugString() const { + return TStringBuilder() + << "p_count=" << PortionsCount << ";" + << "p_count_by_key=" << Portions.size() << ";" + ; + } + + TBlobsWithSizeLimit(const ui64 limit, const std::shared_ptr<TCounters>& counters, const std::shared_ptr<IStoragesManager>& storagesManager) + : SizeLimit(limit) + , Counters(counters) + , StoragesManager(storagesManager) + { + + } + + std::shared_ptr<TColumnEngineChanges> BuildMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const; + + void AddPortion(const std::shared_ptr<TPortionInfo>& portion) { + AFL_VERIFY(portion->BlobsBytes() < SizeLimit); + AFL_VERIFY(Portions[portion->IndexKeyStart()].emplace(portion->GetPortion(), portion).second); + PortionsSize += portion->BlobsBytes(); + ++PortionsCount; + Counters->OnAddSmallPortion(); + } + + void RemovePortion(const std::shared_ptr<TPortionInfo>& portion) { + auto it = Portions.find(portion->IndexKeyStart()); + AFL_VERIFY(it != Portions.end()); + AFL_VERIFY(it->second.erase(portion->GetPortion())); + if (!it->second.size()) { + Portions.erase(it); + } + PortionsSize -= portion->BlobsBytes(); + AFL_VERIFY(PortionsSize >= 0); + --PortionsCount; + AFL_VERIFY(PortionsCount >= 0); + Counters->OnRemoveSmallPortion(); + } + + std::optional<TOptimizationPriority> GetWeight() const { + Y_VERIFY(Counters->GetSmallCounts() == PortionsCount); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portions_opt_count", PortionsCount)("counter", (ui64)Counters->SmallPortionsByGranule.get()); + if (PortionsSize > SizeLimitToMerge || PortionsCount > CountLimitToMerge) { + return TOptimizationPriority::Critical(PortionsCount); + } else { + return {}; + } + } +}; + +class TBlobsBySize { +private: + std::map<ui64, TBlobsWithSizeLimit> BlobsBySizeLimit; +public: + TString DebugString() const { + TStringBuilder sb; + sb << "("; + for (auto&& i : BlobsBySizeLimit) { + sb << "(" << i.first << ":" << i.second.DebugString() << ");"; + } + sb << ")"; + return sb; + } + + void AddPortion(const std::shared_ptr<TPortionInfo>& portion) { + auto it = BlobsBySizeLimit.upper_bound(portion->GetBlobBytes()); + if (it != BlobsBySizeLimit.end()) { + it->second.AddPortion(portion); + } + } + + void RemovePortion(const std::shared_ptr<TPortionInfo>& portion) { + auto it = BlobsBySizeLimit.upper_bound(portion->GetBlobBytes()); + if (it != BlobsBySizeLimit.end()) { + it->second.RemovePortion(portion); + } + } + + std::optional<TOptimizationPriority> GetWeight() const { + std::optional<TOptimizationPriority> result; + for (auto&& i : BlobsBySizeLimit) { + auto w = i.second.GetWeight(); + if (!w) { + continue; + } + if (!result || *result < *w) { + result = w; + } + } + return result; + } + + const TBlobsWithSizeLimit* GetMaxWeightLimiter() const { + std::optional<TOptimizationPriority> resultWeight; + const TBlobsWithSizeLimit* result = nullptr; + for (auto&& i : BlobsBySizeLimit) { + auto w = i.second.GetWeight(); + if (!w) { + continue; + } + if (!resultWeight || *resultWeight < *w) { + resultWeight = w; + result = &i.second; + } + } + return result; + } + + std::shared_ptr<TColumnEngineChanges> BuildMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const { + auto* limiter = GetMaxWeightLimiter(); + if (!limiter) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("fail", "limiter absent"); + return nullptr; + } + return limiter->BuildMergeTask(limits, granule, busyPortions); + } + + TBlobsBySize(const std::shared_ptr<TCounters>& counters, const std::shared_ptr<IStoragesManager>& storagesManager) { +// BlobsBySizeLimit.emplace(512 * 1024, TBlobsWithSizeLimit(512 * 1024, counters, storagesManager)); + BlobsBySizeLimit.emplace(1024 * 1024, TBlobsWithSizeLimit(1024 * 1024, counters, storagesManager)); + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp new file mode 100644 index 00000000000..a8c1f6920c1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.cpp @@ -0,0 +1,5 @@ +#include "counters.h" + +namespace NKikimr::NOlap::NStorageOptimizer { + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h new file mode 100644 index 00000000000..b81871b673b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h @@ -0,0 +1,98 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> +#include <ydb/core/formats/arrow/replace_key.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/splitter/settings.h> +#include <ydb/core/tx/columnshard/counters/engine_logs.h> + +namespace NKikimr::NOlap::NStorageOptimizer { + +class TGlobalCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr SmallPortionsCount; + std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsCount; + std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsPackedSizeCount; + std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsRawSizeCount; + std::shared_ptr<NColumnShard::TValueAggregationAgent> SmallPortionsCountByGranule; +public: + TGlobalCounters() + : TBase("IntervalsStorageOptimizer") { + SmallPortionsCountByGranule = TBase::GetValueAutoAggregations("Granule/SmallPortions/Count"); + SmallPortionsCount = TBase::GetValue("SmallPortions/Count"); + + const std::set<i64> borders = {0, 1, 2, 4, 8, 16, 32, 64}; + HistogramOverlappedIntervalsCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Count", "", borders); + HistogramOverlappedIntervalsPackedSizeCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Size/Packed", "", borders); + HistogramOverlappedIntervalsRawSizeCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Size/Raw", "", borders); + } + + static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildClientSmallPortionsAggregation() { + return Singleton<TGlobalCounters>()->SmallPortionsCountByGranule->GetClient(); + } + + static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsOverlapping() { + return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsCount->BuildGuard(); + } + + static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsPackedSizeOverlapping() { + return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsPackedSizeCount->BuildGuard(); + } + + static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsRawSizeOverlapping() { + return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsRawSizeCount->BuildGuard(); + } + + static std::shared_ptr<NColumnShard::TValueGuard> BuildSmallPortionsGuard() { + return std::make_shared<NColumnShard::TValueGuard>(Singleton<TGlobalCounters>()->SmallPortionsCount); + } + +}; + +class TCounters { +private: + std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsGuard; + std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsPackedSizeGuard; + std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsRawSizeGuard; + std::shared_ptr<NColumnShard::TValueGuard> SmallPortionsCount; +public: + std::shared_ptr<NColumnShard::TValueAggregationClient> SmallPortionsByGranule; + i64 GetSmallCounts() const { + return SmallPortionsByGranule->GetValue(); + } + + TCounters() { + IntervalsGuard = TGlobalCounters::BuildGuardIntervalsOverlapping(); + IntervalsPackedSizeGuard = TGlobalCounters::BuildGuardIntervalsPackedSizeOverlapping(); + IntervalsRawSizeGuard = TGlobalCounters::BuildGuardIntervalsRawSizeOverlapping(); + SmallPortionsCount = TGlobalCounters::BuildSmallPortionsGuard(); + SmallPortionsByGranule = TGlobalCounters::BuildClientSmallPortionsAggregation(); + } + + void OnRemoveIntervalsCount(const ui32 count, const ui64 rawSize, const ui64 packedSize) { + IntervalsGuard->Sub(count, 1); + IntervalsPackedSizeGuard->Sub(count, packedSize); + IntervalsRawSizeGuard->Sub(count, rawSize); + } + + void OnAddIntervalsCount(const ui32 count, const ui64 rawSize, const ui64 packedSize) { + IntervalsGuard->Add(count, 1); + IntervalsPackedSizeGuard->Add(count, packedSize); + IntervalsRawSizeGuard->Add(count, rawSize); + } + + void OnAddSmallPortion() { + SmallPortionsCount->Add(1); + SmallPortionsByGranule->Add(1); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("add_opt_count", SmallPortionsByGranule->GetValue())("counter", (ui64)SmallPortionsByGranule.get()); + } + + void OnRemoveSmallPortion() { + SmallPortionsCount->Sub(1); + SmallPortionsByGranule->Remove(1); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("remove_opt_count", SmallPortionsByGranule->GetValue())("counter", (ui64)SmallPortionsByGranule.get()); + } + +}; + +} diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp new file mode 100644 index 00000000000..3253413f0b0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp @@ -0,0 +1,212 @@ +#include "optimizer.h" +#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> +#include <ydb/core/tx/columnshard/counters/common/owner.h> +#include <ydb/core/tx/columnshard/counters/engine_logs.h> +#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h> + +namespace NKikimr::NOlap::NStorageOptimizer { + +std::vector<std::shared_ptr<TPortionInfo>> TIntervalsOptimizerPlanner::GetPortionsForIntervalStartedIn(const NArrow::TReplaceKey& keyStart, const ui32 countExpectation) const { + std::vector<std::shared_ptr<TPortionInfo>> result; + auto it = Positions.find(keyStart); + AFL_VERIFY(it != Positions.end()); + THashSet<ui64> portionsCurrentlyClosed; + auto itReverse = make_reverse_iterator(it); + AFL_VERIFY(itReverse != Positions.rbegin()); + --itReverse; + for (; itReverse != Positions.rend(); ++itReverse) { + for (auto&& i : itReverse->second.GetPositions()) { + if (i.first.GetIsStart()) { + if (!portionsCurrentlyClosed.erase(i.first.GetPortionId())) { + result.emplace_back(i.second.GetPortionPtr()); + } + } else { + AFL_VERIFY(portionsCurrentlyClosed.emplace(i.first.GetPortionId()).second); + } + } + if (result.size() == countExpectation) { + return result; + } + } + AFL_VERIFY(false)("result.size()", result.size())("expectation", countExpectation); + return result; +} + +std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const { + if (auto result = SizeProblemBlobs.BuildMergeTask(limits, granule, busyPortions)) { + return result; + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("skip", "no_small_portion_tasks"); + return nullptr; + if (RangedSegments.empty()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_ranged_segments"); + return nullptr; + } + auto& topSegment = **RangedSegments.rbegin()->second.begin(); + auto& features = topSegment.GetFeatures(); + std::vector<std::shared_ptr<TPortionInfo>> portions = GetPortionsForIntervalStartedIn(topSegment.GetPosition(), features.GetPortionsCount()); + + if (portions.size() <= 1) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust())("reason", "one_portion"); + return nullptr; + } + + std::optional<TString> tierName; + for (auto&& i : portions) { + if (i->GetMeta().GetTierName() && (!tierName || *tierName < i->GetMeta().GetTierName())) { + tierName = i->GetMeta().GetTierName(); + } + if (busyPortions.contains(i->GetAddress())) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust()) + ("count", features.GetPortionsCount())("reason", "busy_portion")("portion_address", i->GetAddress().DebugString()); + return nullptr; + } + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("features", features.DebugJson().GetStringRobust())("count", features.GetPortionsCount()); + + TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager); + return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext); +} + +void TIntervalsOptimizerPlanner::RemovePortion(const std::shared_ptr<TPortionInfo>& info) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion")("portion_id", info->GetPortion()); + auto itStart = Positions.find(info->IndexKeyStart()); + auto itFinish = Positions.find(info->IndexKeyEnd()); + Y_VERIFY(itStart != Positions.end()); + Y_VERIFY(itFinish != Positions.end()); + if (itStart == itFinish) { + RemoveRanged(itStart->second); + itStart->second.RemoveSummary(info); + AddRanged(itStart->second); + if (itStart->second.RemoveStart(info) || itStart->second.RemoveFinish(info)) { + RemoveRanged(itStart->second); + Positions.erase(itStart); + } + } else { + for (auto it = itStart; it != itFinish; ++it) { + RemoveRanged(it->second); + it->second.RemoveSummary(info); + AddRanged(it->second); + } + if (itStart->second.RemoveStart(info)) { + RemoveRanged(itStart->second); + Positions.erase(itStart); + } + if (itFinish->second.RemoveFinish(info)) { + RemoveRanged(itFinish->second); + Positions.erase(itFinish); + } + } + AFL_VERIFY(RangedSegments.empty() == Positions.empty())("rs_size", RangedSegments.size())("p_size", Positions.size()); +} + +void TIntervalsOptimizerPlanner::AddPortion(const std::shared_ptr<TPortionInfo>& info) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion")("portion_id", info->GetPortion()); + auto itStart = Positions.find(info->IndexKeyStart()); + if (itStart == Positions.end()) { + itStart = Positions.emplace(info->IndexKeyStart(), TBorderPositions(info->IndexKeyStart())).first; + if (itStart != Positions.begin()) { + auto itStartCopy = itStart; + --itStartCopy; + itStart->second.CopyFrom(itStartCopy->second); + AddRanged(itStart->second); + } + } + auto itEnd = Positions.find(info->IndexKeyEnd()); + if (itEnd == Positions.end()) { + itEnd = Positions.emplace(info->IndexKeyEnd(), TBorderPositions(info->IndexKeyEnd())).first; + Y_VERIFY(itEnd != Positions.begin()); + auto itEndCopy = itEnd; + --itEndCopy; + itEnd->second.CopyFrom(itEndCopy->second); + AddRanged(itEnd->second); + itStart = Positions.find(info->IndexKeyStart()); + } + Y_VERIFY(itStart != Positions.end()); + Y_VERIFY(itEnd != Positions.end()); + itStart->second.AddStart(info); + itEnd->second.AddFinish(info); + if (itStart != itEnd) { + for (auto it = itStart; it != itEnd; ++it) { + RemoveRanged(it->second); + it->second.AddSummary(info); + AFL_VERIFY(!!it->second.GetFeatures()); + AddRanged(it->second); + } + } else { + RemoveRanged(itStart->second); + itStart->second.AddSummary(info); + AddRanged(itStart->second); + } + AFL_VERIFY(RangedSegments.empty() == Positions.empty())("rs_size", RangedSegments.size())("p_size", Positions.size()); +} + +void TIntervalsOptimizerPlanner::DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) { + for (auto&& i : remove) { + SizeProblemBlobs.RemovePortion(i); + RemovePortion(i); + } + for (auto&& i : add) { + SizeProblemBlobs.AddPortion(i); + AddPortion(i); + } +} + +void TIntervalsOptimizerPlanner::RemoveRanged(const TBorderPositions& data) { + if (!!data.GetFeatures()) { + Counters->OnRemoveIntervalsCount(data.GetFeatures().GetPortionsCount(), data.GetFeatures().GetPortionsRawWeight(), data.GetFeatures().GetPortionsWeight()); + auto itFeatures = RangedSegments.find(data.GetFeatures()); + Y_VERIFY(itFeatures->second.erase(&data)); + if (itFeatures->second.empty()) { + RangedSegments.erase(itFeatures); + } + } +} + +void TIntervalsOptimizerPlanner::AddRanged(const TBorderPositions& data) { + if (!!data.GetFeatures()) { + Counters->OnAddIntervalsCount(data.GetFeatures().GetPortionsCount(), data.GetFeatures().GetPortionsRawWeight(), data.GetFeatures().GetPortionsWeight()); + Y_VERIFY(RangedSegments[data.GetFeatures()].emplace(&data).second); + } +} + +TIntervalsOptimizerPlanner::TIntervalsOptimizerPlanner(const ui64 granuleId, const std::shared_ptr<IStoragesManager>& storagesManager) + : TBase(granuleId) + , StoragesManager(storagesManager) + , Counters(std::make_shared<TCounters>()) + , SizeProblemBlobs(Counters, storagesManager) +{ +} + +TOptimizationPriority TIntervalsOptimizerPlanner::DoGetUsefulMetric() const { + auto res = SizeProblemBlobs.GetWeight(); + if (!!res) { + AFL_VERIFY(RangedSegments.size())("positions", Positions.size())("sizes", SizeProblemBlobs.DebugString()); + return *res; + } + if (RangedSegments.empty()) { + return TOptimizationPriority::Zero(); + } + auto& topSegment = **RangedSegments.rbegin()->second.begin(); + auto& topFeaturesTask = topSegment.GetFeatures(); + return TOptimizationPriority::Optimization(topFeaturesTask.GetUsefulMetric()); +} + +TString TIntervalsOptimizerPlanner::DoDebugString() const { + NJson::TJsonValue result = NJson::JSON_MAP; + auto& positions = result.InsertValue("positions", NJson::JSON_ARRAY); + for (auto&& i : Positions) { + positions.AppendValue(i.second.DebugJson()); + } + return result.GetStringRobust(); +} + +void TIntervalsOptimizerPlanner::TBorderPositions::AddSummary(const std::shared_ptr<TPortionInfo>& info) { + Features.Add(info); +} + +void TIntervalsOptimizerPlanner::TBorderPositions::RemoveSummary(const std::shared_ptr<TPortionInfo>& info) { + Features.Remove(info); +} + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h index 11aa4d5d068..5a708535525 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h @@ -1,5 +1,7 @@ #pragma once #include "optimizer.h" +#include "counters.h" +#include "blob_size.h" #include <ydb/core/formats/arrow/replace_key.h> #include <ydb/library/accessor/accessor.h> #include <ydb/core/tx/columnshard/splitter/settings.h> @@ -8,6 +10,8 @@ namespace NKikimr::NOlap::NStorageOptimizer { +class TCounters; + class TSegmentPosition { private: std::shared_ptr<TPortionInfo> Portion; @@ -69,7 +73,6 @@ private: YDB_READONLY(i64, PortionsRawWeight, 0); YDB_READONLY(i64, SmallPortionsWeight, 0); YDB_READONLY(i64, SmallPortionsCount, 0); - std::map<ui64, std::shared_ptr<TPortionInfo>> SummaryPortions; public: NJson::TJsonValue DebugJson() const { @@ -80,26 +83,9 @@ public: result.InsertValue("sp_count", SmallPortionsCount); result.InsertValue("sp_weight", SmallPortionsWeight); result.InsertValue("r_count", RecordsCount); - auto& pIds = result.InsertValue("portion_ids", NJson::JSON_ARRAY); - for (auto&& i : SummaryPortions) { - pIds.AppendValue(i.first); - } return result; } - bool Merge(const TIntervalFeatures& features, const i64 sumWeightLimit) { - if (PortionsCount > 1 && PortionsWeight + features.PortionsWeight > sumWeightLimit) { - return false; - } - for (auto&& i : features.SummaryPortions) { - if (SummaryPortions.contains(i.first)) { - continue; - } - Add(i.second); - } - return true; - } - i64 GetUsefulMetric() const { if (PortionsCount == 1 || PortionsWeight == 0) { return 0; @@ -116,8 +102,7 @@ public: } void Add(const std::shared_ptr<TPortionInfo>& info) { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion_in_summary")("portion_id", info->GetPortion())("count", SummaryPortions.size())("this", (ui64)this); - AFL_VERIFY(SummaryPortions.emplace(info->GetPortion(), info).second)("portion_id", info->GetPortion())("this", (ui64)this); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion_in_summary")("portion_id", info->GetPortion())("count", GetPortionsCount())("this", (ui64)this); ++PortionsCount; const i64 portionBytes = info->BlobsBytes(); PortionsWeight += portionBytes; @@ -130,8 +115,7 @@ public: } void Remove(const std::shared_ptr<TPortionInfo>& info) { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion_from_summary")("portion_id", info->GetPortion())("count", SummaryPortions.size())("this", (ui64)this); - AFL_VERIFY(SummaryPortions.erase(info->GetPortion()))("portion_id", info->GetPortion())("this", (ui64)this); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion_from_summary")("portion_id", info->GetPortion())("count", GetPortionsCount())("this", (ui64)this); Y_VERIFY(--PortionsCount >= 0); const i64 portionBytes = info->BlobsBytes(); PortionsWeight -= portionBytes; @@ -154,17 +138,12 @@ public: bool operator<(const TIntervalFeatures& item) const { return GetUsefulMetric() < item.GetUsefulMetric(); } - const std::map<ui64, std::shared_ptr<TPortionInfo>>& GetSummaryPortions() const { - return SummaryPortions; - } bool IsEnoughWeight() const { - return GetPortionsWeight() > TSplitSettings().GetMinBlobSize(); + return GetPortionsRawWeight() > TSplitSettings().GetMinBlobSize() * 10; } }; -class TCounters; - class TIntervalsOptimizerPlanner: public IOptimizerPlanner { private: static ui64 LimitSmallBlobsMerge; @@ -244,29 +223,30 @@ private: void AddSummary(const std::shared_ptr<TPortionInfo>& info); void RemoveSummary(const std::shared_ptr<TPortionInfo>& info); }; + std::map<TIntervalFeatures, std::set<const TBorderPositions*>> RangedSegments; + using TPositions = std::map<NArrow::TReplaceKey, TBorderPositions>; TPositions Positions; - i64 SumSmall = 0; - std::map<NArrow::TReplaceKey, std::map<ui64, std::shared_ptr<TPortionInfo>>> SmallBlobs; + TBlobsBySize SizeProblemBlobs; - void RemoveRanged(const TBorderPositions& data); + void RemovePortion(const std::shared_ptr<TPortionInfo>& info); + void AddPortion(const std::shared_ptr<TPortionInfo>& info); + void RemoveRanged(const TBorderPositions& data); void AddRanged(const TBorderPositions& data); bool RemoveSmallPortion(const std::shared_ptr<TPortionInfo>& info); bool AddSmallPortion(const std::shared_ptr<TPortionInfo>& info); - std::shared_ptr<TColumnEngineChanges> GetSmallPortionsMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const; + std::vector<std::shared_ptr<TPortionInfo>> GetPortionsForIntervalStartedIn(const NArrow::TReplaceKey& keyStart, const ui32 countExpectation) const; protected: - virtual void DoAddPortion(const std::shared_ptr<TPortionInfo>& info) override; - virtual void DoRemovePortion(const std::shared_ptr<TPortionInfo>& info) override; + virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) override; virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const override; - virtual std::vector<std::shared_ptr<TPortionInfo>> DoGetPortionsOrderedByPK(const TSnapshot& snapshot) const override; - virtual i64 DoGetUsefulMetric() const override; + virtual TOptimizationPriority DoGetUsefulMetric() const override; virtual TString DoDebugString() const override; public: diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make new file mode 100644 index 00000000000..f76c42447ab --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + optimizer.cpp + blob_size.cpp + counters.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/changes/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp deleted file mode 100644 index f5016a83a34..00000000000 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp +++ /dev/null @@ -1,342 +0,0 @@ -#include "intervals_optimizer.h" -#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> -#include <ydb/core/tx/columnshard/counters/common/owner.h> -#include <ydb/core/tx/columnshard/counters/engine_logs.h> -#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h> - -namespace NKikimr::NOlap::NStorageOptimizer { - -ui64 TIntervalsOptimizerPlanner::LimitSmallBlobsMerge = 2 * 1024 * 1024; -ui64 TIntervalsOptimizerPlanner::LimitSmallBlobDetect = 512 * 1024; - -class TGlobalCounters: public NColumnShard::TCommonCountersOwner { -private: - using TBase = NColumnShard::TCommonCountersOwner; - NMonitoring::TDynamicCounters::TCounterPtr SmallPortionsCount; - std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsCount; - std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsPackedSizeCount; - std::shared_ptr<NColumnShard::TIncrementalHistogram> HistogramOverlappedIntervalsRawSizeCount; -public: - TGlobalCounters() - : TBase("IntervalsStorageOptimizer") { - SmallPortionsCount = TBase::GetValue("SmallPortions/Count"); - - const std::set<i64> borders = {0, 1, 2, 4, 8, 16, 32, 64}; - HistogramOverlappedIntervalsCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Count", "", borders); - HistogramOverlappedIntervalsPackedSizeCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Size/Packed", "", borders); - HistogramOverlappedIntervalsRawSizeCount = std::make_shared<NColumnShard::TIncrementalHistogram>("IntervalsStorageOptimizer", "OverlappedIntervals/Size/Raw", "", borders); - } - - static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsOverlapping() { - return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsCount->BuildGuard(); - } - - static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsPackedSizeOverlapping() { - return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsPackedSizeCount->BuildGuard(); - } - - static std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> BuildGuardIntervalsRawSizeOverlapping() { - return Singleton<TGlobalCounters>()->HistogramOverlappedIntervalsRawSizeCount->BuildGuard(); - } - - static std::shared_ptr<NColumnShard::TValueGuard> BuildSmallPortionsGuard() { - return std::make_shared<NColumnShard::TValueGuard>(Singleton<TGlobalCounters>()->SmallPortionsCount); - } - -}; - -class TCounters { -private: - std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsGuard; - std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsPackedSizeGuard; - std::shared_ptr<NColumnShard::TIncrementalHistogram::TGuard> IntervalsRawSizeGuard; - std::shared_ptr<NColumnShard::TValueGuard> SmallPortionsCount; -public: - TCounters() { - IntervalsGuard = TGlobalCounters::BuildGuardIntervalsOverlapping(); - IntervalsPackedSizeGuard = TGlobalCounters::BuildGuardIntervalsPackedSizeOverlapping(); - IntervalsRawSizeGuard = TGlobalCounters::BuildGuardIntervalsRawSizeOverlapping(); - SmallPortionsCount = TGlobalCounters::BuildSmallPortionsGuard(); - } - - void OnRemoveIntervalsCount(const ui32 count, const ui64 rawSize, const ui64 packedSize) { - IntervalsGuard->Sub(count, 1); - IntervalsPackedSizeGuard->Sub(count, packedSize); - IntervalsRawSizeGuard->Sub(count, rawSize); - } - - void OnAddIntervalsCount(const ui32 count, const ui64 rawSize, const ui64 packedSize) { - IntervalsGuard->Add(count, 1); - IntervalsPackedSizeGuard->Add(count, packedSize); - IntervalsRawSizeGuard->Add(count, rawSize); - } - - void OnAddSmallPortion() { - SmallPortionsCount->Add(1); - } - - void OnRemoveSmallPortion() { - SmallPortionsCount->Sub(1); - } - -}; - -std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::GetSmallPortionsMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const { - if (SumSmall > (i64)LimitSmallBlobsMerge) { - ui64 currentSum = 0; - std::map<ui64, std::shared_ptr<TPortionInfo>> portions; - std::optional<TString> tierName; - for (auto&& i : SmallBlobs) { - for (auto&& c : i.second) { - if (busyPortions.contains(c.second->GetAddress())) { - return nullptr; - } - if (c.second->GetMeta().GetTierName() && (!tierName || *tierName < c.second->GetMeta().GetTierName())) { - tierName = c.second->GetMeta().GetTierName(); - } - currentSum += c.second->RawBytesSum(); - portions.emplace(c.first, c.second); - } - } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("portions", portions.size())("current_sum", currentSum)("remained", SmallBlobs.size())("remained_size", SumSmall); - TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager); - return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext); - } - return nullptr; -} - -std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const { - if (auto result = GetSmallPortionsMergeTask(limits, granule, busyPortions)) { - return result; - } - if (RangedSegments.empty()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_ranged_segments"); - return nullptr; - } - auto& topSegment = **RangedSegments.rbegin()->second.begin(); - auto& topFeaturesTask = topSegment.GetFeatures(); - TIntervalFeatures features; - if (topFeaturesTask.IsEnoughWeight()) { - if (topFeaturesTask.GetPortionsCount() == 1) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "not_critical_task_top"); - return nullptr; - } - std::map<ui64, std::vector<std::shared_ptr<TPortionInfo>>> sortedPortions; - for (auto&& p : topFeaturesTask.GetSummaryPortions()) { - sortedPortions[p.second->BlobsBytes()].emplace_back(p.second); - } - - for (auto&& s : sortedPortions) { - for (auto&& p : s.second) { - if (features.GetPortionsCount() > 1 && s.first > 128 * 1024) { - if (features.GetPortionsWeight() + p->BlobsBytes() > 512 * 1024 * 1024 && features.GetPortionsCount() > 1) { - break; - } - } - features.Add(p); - } - } - } else { - auto itFwd = Positions.find(topSegment.GetPosition()); - Y_VERIFY(itFwd != Positions.end()); - features = itFwd->second.GetFeatures(); - auto itReverse = std::make_reverse_iterator(itFwd); - ++itFwd; - while (!features.IsEnoughWeight()) { - if (itFwd == Positions.end() && itReverse == Positions.rend()) { - break; - } - if (itFwd == Positions.end()) { - if (!features.Merge(itReverse->second.GetFeatures(), 512 * 1024 * 1024)) { - break; - } - ++itReverse; - } else if (itReverse == Positions.rend()) { - if (!features.Merge(itFwd->second.GetFeatures(), 512 * 1024 * 1024)) { - break; - } - ++itFwd; - } else if (itFwd->second.GetFeatures().GetUsefulKff() < itReverse->second.GetFeatures().GetUsefulKff()) { - if (!features.Merge(itReverse->second.GetFeatures(), 512 * 1024 * 1024)) { - break; - } - ++itReverse; - } else { - if (!features.Merge(itFwd->second.GetFeatures(), 512 * 1024 * 1024)) { - break; - } - ++itFwd; - } - } - } - if (features.GetPortionsCount() <= 1) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust())("reason", "one_portion"); - return nullptr; - } - - std::optional<TString> tierName; - for (auto&& i : features.GetSummaryPortions()) { - if (i.second->GetMeta().GetTierName() && (!tierName || *tierName < i.second->GetMeta().GetTierName())) { - tierName = i.second->GetMeta().GetTierName(); - } - if (busyPortions.contains(i.second->GetAddress())) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust()) - ("count", features.GetSummaryPortions().size())("reason", "busy_portion")("portion_address", i.second->GetAddress().DebugString()); - return nullptr; - } - } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("features", features.DebugJson().GetStringRobust())("count", features.GetSummaryPortions().size()); - - TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager); - return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, features.GetSummaryPortions(), saverContext); -} - -bool TIntervalsOptimizerPlanner::RemoveSmallPortion(const std::shared_ptr<TPortionInfo>& info) { - if (info->BlobsBytes() < LimitSmallBlobDetect) { - Counters->OnRemoveSmallPortion(); - auto it = SmallBlobs.find(info->IndexKeyStart()); - Y_VERIFY(it->second.erase(info->GetPortion())); - if (it->second.empty()) { - SmallBlobs.erase(it); - } - SumSmall -= info->BlobsBytes(); - Y_VERIFY(SumSmall >= 0); - return true; - } - return false; -} - -bool TIntervalsOptimizerPlanner::AddSmallPortion(const std::shared_ptr<TPortionInfo>& info) { - if (info->BlobsBytes() < LimitSmallBlobDetect) { - Counters->OnAddSmallPortion(); - Y_VERIFY(SmallBlobs[info->IndexKeyStart()].emplace(info->GetPortion(), info).second); - SumSmall += info->BlobsBytes(); - return true; - } - return false; -} - -void TIntervalsOptimizerPlanner::DoRemovePortion(const std::shared_ptr<TPortionInfo>& info) { - RemoveSmallPortion(info); - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion")("portion_id", info->GetPortion()); - auto itStart = Positions.find(info->IndexKeyStart()); - auto itFinish = Positions.find(info->IndexKeyEnd()); - Y_VERIFY(itStart != Positions.end()); - Y_VERIFY(itFinish != Positions.end()); - for (auto it = itStart; it != itFinish; ++it) { - RemoveRanged(it->second); - it->second.RemoveSummary(info); - AddRanged(it->second); - } - if (itStart->second.RemoveStart(info)) { - RemoveRanged(itStart->second); - Positions.erase(itStart); - } - if (itFinish->second.RemoveFinish(info)) { - RemoveRanged(itFinish->second); - Positions.erase(itFinish); - } -} - -void TIntervalsOptimizerPlanner::DoAddPortion(const std::shared_ptr<TPortionInfo>& info) { - AddSmallPortion(info); - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion")("portion_id", info->GetPortion()); - auto itStart = Positions.find(info->IndexKeyStart()); - if (itStart == Positions.end()) { - itStart = Positions.emplace(info->IndexKeyStart(), TBorderPositions(info->IndexKeyStart())).first; - if (itStart != Positions.begin()) { - auto itStartCopy = itStart; - --itStartCopy; - itStart->second.CopyFrom(itStartCopy->second); - AddRanged(itStart->second); - } - } - auto itEnd = Positions.find(info->IndexKeyEnd()); - if (itEnd == Positions.end()) { - itEnd = Positions.emplace(info->IndexKeyEnd(), TBorderPositions(info->IndexKeyEnd())).first; - Y_VERIFY(itEnd != Positions.begin()); - auto itEndCopy = itEnd; - --itEndCopy; - itEnd->second.CopyFrom(itEndCopy->second); - AddRanged(itEnd->second); - itStart = Positions.find(info->IndexKeyStart()); - } - Y_VERIFY(itStart != Positions.end()); - Y_VERIFY(itEnd != Positions.end()); - itStart->second.AddStart(info); - itEnd->second.AddFinish(info); - for (auto it = itStart; it != itEnd; ++it) { - RemoveRanged(it->second); - it->second.AddSummary(info); - AddRanged(it->second); - } -} - -void TIntervalsOptimizerPlanner::RemoveRanged(const TBorderPositions& data) { - if (!!data.GetFeatures()) { - Counters->OnRemoveIntervalsCount(data.GetFeatures().GetSummaryPortions().size(), data.GetFeatures().GetPortionsRawWeight(), data.GetFeatures().GetPortionsWeight()); - auto itFeatures = RangedSegments.find(data.GetFeatures()); - Y_VERIFY(itFeatures->second.erase(&data)); - if (itFeatures->second.empty()) { - RangedSegments.erase(itFeatures); - } - } -} - -void TIntervalsOptimizerPlanner::AddRanged(const TBorderPositions& data) { - if (!!data.GetFeatures()) { - Counters->OnAddIntervalsCount(data.GetFeatures().GetSummaryPortions().size(), data.GetFeatures().GetPortionsRawWeight(), data.GetFeatures().GetPortionsWeight()); - Y_VERIFY(RangedSegments[data.GetFeatures()].emplace(&data).second); - } -} - -TIntervalsOptimizerPlanner::TIntervalsOptimizerPlanner(const ui64 granuleId, const std::shared_ptr<IStoragesManager>& storagesManager) - : TBase(granuleId) - , StoragesManager(storagesManager) -{ - Counters = std::make_shared<TCounters>(); -} - -std::vector<std::shared_ptr<TPortionInfo>> TIntervalsOptimizerPlanner::DoGetPortionsOrderedByPK(const TSnapshot& snapshot) const { - std::vector<std::shared_ptr<TPortionInfo>> result; - for (auto&& i : Positions) { - for (auto&& p : i.second.GetPositions()) { - if (!p.first.GetIsStart()) { - continue; - } - if (!p.second.GetPortion().IsVisible(snapshot)) { - continue; - } - result.emplace_back(p.second.GetPortionPtr()); - } - } - return result; -} - -i64 TIntervalsOptimizerPlanner::DoGetUsefulMetric() const { - if (RangedSegments.empty()) { - return 0; - } - auto& topSegment = **RangedSegments.rbegin()->second.begin(); - auto& topFeaturesTask = topSegment.GetFeatures(); - return std::max<i64>(topFeaturesTask.GetUsefulMetric(), std::max<ui64>(SumSmall ? 1 : 0, SumSmall / LimitSmallBlobsMerge)); -} - -TString TIntervalsOptimizerPlanner::DoDebugString() const { - NJson::TJsonValue result = NJson::JSON_MAP; - auto& positions = result.InsertValue("positions", NJson::JSON_ARRAY); - for (auto&& i : Positions) { - positions.AppendValue(i.second.DebugJson()); - } - return result.GetStringRobust(); -} - -void TIntervalsOptimizerPlanner::TBorderPositions::AddSummary(const std::shared_ptr<TPortionInfo>& info) { - Features.Add(info); -} - -void TIntervalsOptimizerPlanner::TBorderPositions::RemoveSummary(const std::shared_ptr<TPortionInfo>& info) { - Features.Remove(info); -} - -} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..8a1314dc2c9 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-levels) +target_link_libraries(storage-optimizer-levels PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-levels PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..f07e7535dd3 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-levels) +target_link_libraries(storage-optimizer-levels PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-levels PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..f07e7535dd3 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-levels) +target_link_libraries(storage-optimizer-levels PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-levels PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..8a1314dc2c9 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-optimizer-levels) +target_link_libraries(storage-optimizer-levels PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow + engines-changes-abstract +) +target_sources(storage-optimizer-levels PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp new file mode 100644 index 00000000000..6004ce09169 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.cpp @@ -0,0 +1,5 @@ +#include "counters.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLevels { + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h new file mode 100644 index 00000000000..be808941e87 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h @@ -0,0 +1,93 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> +#include <ydb/core/formats/arrow/replace_key.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/splitter/settings.h> +#include <ydb/core/tx/columnshard/counters/engine_logs.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLevels { + +class TGlobalCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr SmallPortionsCount; + std::shared_ptr<NColumnShard::TValueAggregationAgent> SmallPortionsCountByGranule; + + std::shared_ptr<NColumnShard::TValueAggregationAgent> CriticalRecordsCount; + std::shared_ptr<NColumnShard::TValueAggregationAgent> NormalRecordsCount; +public: + TGlobalCounters() + : TBase("LevelsStorageOptimizer") + { + SmallPortionsCount = TBase::GetValue("SmallPortions/Count"); + CriticalRecordsCount = TBase::GetValueAutoAggregations("Granule/CriticalRecord/Count"); + NormalRecordsCount = TBase::GetValueAutoAggregations("Granule/NormalRecord/Count"); + SmallPortionsCountByGranule = TBase::GetValueAutoAggregations("Granule/SmallPortions/Count"); + } + + static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildClientSmallPortionsAggregation() { + return Singleton<TGlobalCounters>()->SmallPortionsCountByGranule->GetClient(); + } + + static std::shared_ptr<NColumnShard::TValueGuard> BuildSmallPortionsGuard() { + return std::make_shared<NColumnShard::TValueGuard>(Singleton<TGlobalCounters>()->SmallPortionsCount); + } + + static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildCriticalRecordsCountAggregation() { + return Singleton<TGlobalCounters>()->CriticalRecordsCount->GetClient(); + } + + static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildNormalRecordsCountAggregation() { + return Singleton<TGlobalCounters>()->NormalRecordsCount->GetClient(); + } + +}; + +class TCounters { +private: + std::shared_ptr<NColumnShard::TValueAggregationClient> CriticalRecordsCount; + std::shared_ptr<NColumnShard::TValueAggregationClient> NormalRecordsCount; + + std::shared_ptr<NColumnShard::TValueGuard> SmallPortionsCount; + std::shared_ptr<NColumnShard::TValueAggregationClient> SmallPortionsByGranule; +public: + i64 GetSmallCounts() const { + return SmallPortionsByGranule->GetValue(); + } + + TCounters() { + CriticalRecordsCount = TGlobalCounters::BuildCriticalRecordsCountAggregation(); + NormalRecordsCount = TGlobalCounters::BuildNormalRecordsCountAggregation(); + SmallPortionsCount = TGlobalCounters::BuildSmallPortionsGuard(); + SmallPortionsByGranule = TGlobalCounters::BuildClientSmallPortionsAggregation(); + } + + void OnAddCriticalCount(const ui32 count) { + CriticalRecordsCount->Add(count); + } + + void OnAddNormalCount(const ui32 count) { + NormalRecordsCount->Add(count); + } + + void OnRemoveCriticalCount(const ui32 count) { + CriticalRecordsCount->Remove(count); + } + + void OnRemoveNormalCount(const ui32 count) { + NormalRecordsCount->Remove(count); + } + + void OnAddSmallPortion() { + SmallPortionsCount->Add(1); + SmallPortionsByGranule->Add(1); + } + + void OnRemoveSmallPortion() { + SmallPortionsCount->Sub(1); + SmallPortionsByGranule->Remove(1); + } + +}; + +} diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp new file mode 100644 index 00000000000..c78ef905041 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.cpp @@ -0,0 +1,5 @@ +#include "optimizer.h" + +namespace NKikimr::NOlap::NStorageOptimizer { + +} diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h new file mode 100644 index 00000000000..311923e0c7e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h @@ -0,0 +1,517 @@ +#pragma once +#include "counters.h" + +#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> +#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> +#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h> +#include <ydb/library/accessor/accessor.h> + +#include <util/generic/hash.h> +#include <util/system/types.h> +#include <util/generic/hash_set.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLevels { + +class TLevelInfo { +private: + THashMap<ui64, i64> Counters; + YDB_READONLY(i64, CriticalWeight, 0); + YDB_READONLY(i64, NormalizedWeight, 0); + THashSet<ui64> PortionIds; + std::shared_ptr<TCounters> Signals; +public: + TLevelInfo(std::shared_ptr<TCounters> counters) + : Signals(counters) + { + + } + + void AddPortion(const std::shared_ptr<TPortionInfo>& p, const ui32 refCount) { + if (p->GetBlobBytes() < (1 << 20)) { + Signals->OnAddSmallPortion(); + } + auto it = Counters.find(p->GetPortion()); + i64 refCountPred = 0; + if (it == Counters.end()) { + it = Counters.emplace(p->GetPortion(), refCount).first; + } else { + refCountPred = it->second; + it->second += refCount; + } + if (it->second == 1 && refCountPred == 0) { + NormalizedWeight += p->NumRows(); + Signals->OnAddNormalCount(p->NumRows()); + } else if (it->second >= 2 && refCountPred == 1) { + CriticalWeight += p->NumRows(); + Signals->OnAddCriticalCount(p->NumRows()); + + NormalizedWeight -= p->NumRows(); + Y_VERIFY(NormalizedWeight >= 0); + + Signals->OnRemoveNormalCount(p->NumRows()); + } else if (it->second >= 2 && refCountPred == 0) { + CriticalWeight += p->NumRows(); + Signals->OnAddCriticalCount(p->NumRows()); + } else if (it->second >= 2 && refCountPred >= 2) { + } else { + Y_VERIFY(false); + } + } + + void RemovePortion(const std::shared_ptr<TPortionInfo>& p, const ui32 refCount) { + if (p->GetBlobBytes() < (1 << 20)) { + Signals->OnRemoveSmallPortion(); + } + auto it = Counters.find(p->GetPortion()); + Y_VERIFY(it != Counters.end()); + const i64 refCountPred = it->second; + it->second -= refCount; + Y_VERIFY(it->second >= 0); + if (it->second >= 2) { + } else if (it->second == 1) { + Y_VERIFY(refCountPred >= 2); + CriticalWeight -= p->NumRows(); + Y_VERIFY(CriticalWeight >= 0); + Signals->OnRemoveCriticalCount(p->NumRows()); + Y_VERIFY(CriticalWeight >= 0); + NormalizedWeight += p->NumRows(); + Signals->OnAddNormalCount(p->NumRows()); + } else if (it->second == 0) { + if (refCountPred >= 2) { + Y_VERIFY(refCountPred >= 2); + CriticalWeight -= p->NumRows(); + Y_VERIFY(CriticalWeight >= 0); + Signals->OnRemoveCriticalCount(p->NumRows()); + } else if (refCountPred == 1) { + NormalizedWeight -= p->NumRows(); + Y_VERIFY(NormalizedWeight >= 0); + Signals->OnRemoveNormalCount(p->NumRows()); + } else { + Y_VERIFY(false); + } + Counters.erase(it); + } + } + +}; + +class TBorderPoint { +public: + using TBorderPortions = THashMap<ui64, std::shared_ptr<TPortionInfo>>; +private: + THashMap<ui64, ui32> MiddleWeight; + YDB_READONLY_DEF(TBorderPortions, StartPortions); + YDB_READONLY_DEF(TBorderPortions, MiddlePortions); + YDB_READONLY_DEF(TBorderPortions, FinishPortions); + std::shared_ptr<TLevelInfo> LevelInfo; +public: + void InitInternalPoint(const TBorderPoint& predPoint) { + Y_VERIFY(predPoint.MiddleWeight.size() == predPoint.MiddlePortions.size()); + for (auto&& i : predPoint.MiddlePortions) { + auto it = predPoint.MiddleWeight.find(i.first); + if (it->second != 2) { + AddMiddle(i.second, 1); + } + } + } + + std::shared_ptr<TPortionInfo> GetOnlyPortion() const { + Y_VERIFY(MiddlePortions.size() == 1); + Y_VERIFY(!IsCritical()); + return MiddlePortions.begin()->second; + } + + bool IsSmall() const { + if (!IsCritical() && MiddlePortions.size() == 1 && MiddlePortions.begin()->second->GetBlobBytes() < (1 << 20)) { + return true; + } + return false; + } + + bool IsCritical() const { + if (StartPortions.size() && FinishPortions.size()) { + return true; + } + if (MiddlePortions.size() > 1 || StartPortions.size() > 1 || FinishPortions.size() > 1) { + return true; + } + return false; + } + + TBorderPoint(const std::shared_ptr<TLevelInfo>& info) + : LevelInfo(info) { + + } + + ~TBorderPoint() { + for (auto&& i : MiddlePortions) { + if (i.second->IndexKeyStart() == i.second->IndexKeyEnd()) { + LevelInfo->RemovePortion(i.second, 2); + } else { + LevelInfo->RemovePortion(i.second, 1); + } + } + } + + void AddStart(const std::shared_ptr<TPortionInfo>& p) { + Y_VERIFY(StartPortions.emplace(p->GetPortion(), p).second); + } + void RemoveStart(const std::shared_ptr<TPortionInfo>& p) { + Y_VERIFY(StartPortions.erase(p->GetPortion())); + } + + void AddMiddle(const std::shared_ptr<TPortionInfo>& p, const ui32 portionCriticalWeight) { + Y_VERIFY(MiddleWeight.emplace(p->GetPortion(), portionCriticalWeight).second); + Y_VERIFY(MiddlePortions.emplace(p->GetPortion(), p).second); + LevelInfo->AddPortion(p, portionCriticalWeight); + } + void RemoveMiddle(const std::shared_ptr<TPortionInfo>& p, const ui32 portionCriticalWeight) { + Y_VERIFY(MiddleWeight.erase(p->GetPortion())); + Y_VERIFY(MiddlePortions.erase(p->GetPortion())); + LevelInfo->RemovePortion(p, portionCriticalWeight); + } + + void AddFinish(const std::shared_ptr<TPortionInfo>& p) { + Y_VERIFY(FinishPortions.emplace(p->GetPortion(), p).second); + } + void RemoveFinish(const std::shared_ptr<TPortionInfo>& p) { + Y_VERIFY(FinishPortions.erase(p->GetPortion())); + } + + bool IsEmpty() const { + return StartPortions.empty() && FinishPortions.empty(); + } +}; + +class TPortionsPlacement { +private: + THashSet<ui64> PortionIds; + std::map<NArrow::TReplaceKey, TBorderPoint> Borders; + std::shared_ptr<TLevelInfo> LevelInfo; +public: + TPortionsPlacement(const std::shared_ptr<TLevelInfo>& levelInfo) + : LevelInfo(levelInfo) + { + + } + + class TPortionsScanner { + private: + THashMap<ui64, std::shared_ptr<TPortionInfo>> CurrentPortions; + const THashSet<TPortionAddress>& BusyPortions; + public: + + TPortionsScanner(const THashSet<TPortionAddress>& busyPortions) + : BusyPortions(busyPortions) + { + + } + + const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetCurrentPortions() const { + return CurrentPortions; + } + + bool AddBorderPoint(const TBorderPoint& p, bool& hasBusy) { + hasBusy = false; + for (auto&& [_, portionInfo] : p.GetStartPortions()) { + if (BusyPortions.contains(portionInfo->GetAddress())) { + hasBusy = true; + continue; + } + AFL_VERIFY(CurrentPortions.emplace(portionInfo->GetPortion(), portionInfo).second); + } + + for (auto&& [_, portionInfo] : p.GetFinishPortions()) { + if (BusyPortions.contains(portionInfo->GetAddress())) { + continue; + } + AFL_VERIFY(CurrentPortions.erase(portionInfo->GetPortion())); + } + return CurrentPortions.size(); + } + }; + + enum class EChainProblem { + NoProblem, + SmallChunks, + MergeChunks + }; + + std::vector<std::vector<std::shared_ptr<TPortionInfo>>> GetPortionsToCompact(const ui64 sizeLimit, const THashSet<TPortionAddress>& busyPortions) const { + std::vector<std::vector<std::shared_ptr<TPortionInfo>>> result; + THashSet<ui64> readyPortionIds; + ui64 resultSize = 0; + + TPortionsScanner buffer(busyPortions); + THashMap<ui64, std::shared_ptr<TPortionInfo>> portionsCurrentChain; + ui64 chainSize = 0; + EChainProblem problemType = EChainProblem::NoProblem; + for (auto&& i : Borders) { + bool hasBusy = false; + if (!buffer.AddBorderPoint(i.second, hasBusy)) { + if (hasBusy && problemType == EChainProblem::SmallChunks) { + chainSize = 0; + portionsCurrentChain.clear(); + problemType = EChainProblem::NoProblem; + } else if (chainSize > (1 << 20)) { + resultSize += chainSize; + std::vector<std::shared_ptr<TPortionInfo>> chain; + for (auto&& i : portionsCurrentChain) { + chain.emplace_back(i.second); + } + result.emplace_back(chain); + chainSize = 0; + portionsCurrentChain.clear(); + problemType = EChainProblem::NoProblem; + } + } else { + if (buffer.GetCurrentPortions().size() > 1) { + problemType = EChainProblem::MergeChunks; + } else if (buffer.GetCurrentPortions().begin()->second->GetBlobBytes() < (1 << 20) && problemType == EChainProblem::NoProblem) { + problemType = EChainProblem::SmallChunks; + } + if (problemType != EChainProblem::NoProblem) { + for (auto&& i : buffer.GetCurrentPortions()) { + if (portionsCurrentChain.emplace(i.second->GetPortion(), i.second).second) { + chainSize += i.second->GetBlobBytes(); + } + } + } + } + if (resultSize + chainSize > sizeLimit) { + break; + } + } + if (portionsCurrentChain.size() > 1) { + std::vector<std::shared_ptr<TPortionInfo>> chain; + for (auto&& i : portionsCurrentChain) { + chain.emplace_back(i.second); + } + result.emplace_back(chain); + } + + return result; + } + + void RemovePortion(const std::shared_ptr<TPortionInfo>& portion) { + Y_VERIFY(PortionIds.erase(portion->GetPortion())); + auto itStart = Borders.find(portion->IndexKeyStart()); + AFL_VERIFY(itStart != Borders.end()); + auto itFinish = Borders.find(portion->IndexKeyEnd()); + AFL_VERIFY(itFinish != Borders.end()); + + itStart->second.RemoveStart(portion); + itFinish->second.RemoveFinish(portion); + if (itStart != itFinish) { + for (auto it = itStart; it != itFinish; ++it) { + it->second.RemoveMiddle(portion, 1); + } + if (itFinish->second.IsEmpty()) { + Y_VERIFY(Borders.erase(portion->IndexKeyEnd())); + } + if (itStart->second.IsEmpty()) { + Y_VERIFY(Borders.erase(portion->IndexKeyStart())); + } + } else { + itStart->second.RemoveMiddle(portion, 2); + if (itStart->second.IsEmpty()) { + Borders.erase(itStart); + } + } + } + + void AddPortion(const std::shared_ptr<TPortionInfo>& portion) { + Y_VERIFY(PortionIds.emplace(portion->GetPortion()).second); + auto itStartInfo = Borders.emplace(portion->IndexKeyStart(), TBorderPoint(LevelInfo)); + auto itStart = itStartInfo.first; + if (itStartInfo.second && itStart != Borders.begin()) { + auto itStartCopy = itStart; + --itStartCopy; + itStart->second.InitInternalPoint(itStartCopy->second); + } + auto itFinishInfo = Borders.emplace(portion->IndexKeyEnd(), TBorderPoint(LevelInfo)); + auto itFinish = itFinishInfo.first; + if (itFinishInfo.second) { + Y_VERIFY(itFinish != Borders.begin()); + auto itFinishCopy = itFinish; + --itFinishCopy; + itFinish->second.InitInternalPoint(itFinishCopy->second); + } + + itStart->second.AddStart(portion); + itFinish->second.AddFinish(portion); + if (itStart != itFinish) { + for (auto it = itStart; it != itFinish; ++it) { + it->second.AddMiddle(portion, 1); + } + } else { + itStart->second.AddMiddle(portion, 2); + } + } +}; + +class TLevel { +private: + YDB_READONLY(TDuration, CriticalAge, TDuration::Zero()); + YDB_READONLY(ui64, CriticalSize, 0); + std::shared_ptr<TLevelInfo> LevelInfo; + TPortionsPlacement PortionsPlacement; + std::shared_ptr<TLevel> NextLevel; + std::map<NArrow::TReplaceKey, TBorderPoint> Borders; + std::map<TSnapshot, THashMap<ui64, std::shared_ptr<TPortionInfo>>> PortionByAge; + const ui64 PortionsSizeLimit = (ui64)250 * 1024 * 1024; + TCompactionLimits CompactionLimits; + THashSet<ui64> PortionIds; + const std::shared_ptr<IStoragesManager> StoragesManager; + std::shared_ptr<arrow::Schema> PrimaryKeysSchema; +public: + TLevel(const TDuration criticalAge, const ui64 criticalSize, std::shared_ptr<TLevel> nextLevel, const std::shared_ptr<IStoragesManager>& storagesManager, std::shared_ptr<TCounters> counters, + const std::shared_ptr<arrow::Schema>& primaryKeysSchema) + : CriticalAge(criticalAge) + , CriticalSize(criticalSize) + , LevelInfo(std::make_shared<TLevelInfo>(counters)) + , PortionsPlacement(LevelInfo) + , NextLevel(nextLevel) + , StoragesManager(storagesManager) + , PrimaryKeysSchema(primaryKeysSchema) + { + CompactionLimits.GranuleSizeForOverloadPrevent = CriticalSize * 0.5; + } + + ui64 GetWeight() const { + return LevelInfo->GetCriticalWeight(); + } + + void ProvidePortionsNextLevel(const TInstant currentInstant) { + if (!NextLevel) { + return; + } + std::vector<std::shared_ptr<TPortionInfo>> portionsForProviding; + for (auto&& i : PortionByAge) { + if (TInstant::MilliSeconds(i.first.GetPlanStep()) + CriticalAge < currentInstant) { + for (auto&& p : i.second) { + portionsForProviding.emplace_back(p.second); + } + } else { + break; + } + } + for (auto&& i : portionsForProviding) { + RemovePortion(i, currentInstant); + NextLevel->AddPortion(i, currentInstant); + } + } + + void AddPortion(const std::shared_ptr<TPortionInfo>& portionInfo, const TInstant addInstant) { + if (TInstant::MilliSeconds(portionInfo->RecordSnapshotMax().GetPlanStep()) + CriticalAge < addInstant) { + Y_VERIFY(!PortionIds.contains(portionInfo->GetPortion())); + if (NextLevel) { + return NextLevel->AddPortion(portionInfo, addInstant); + } + } + PortionsPlacement.AddPortion(portionInfo); + Y_VERIFY(PortionByAge[portionInfo->RecordSnapshotMax()].emplace(portionInfo->GetPortion(), portionInfo).second); + ProvidePortionsNextLevel(addInstant); + } + + void RemovePortion(const std::shared_ptr<TPortionInfo>& portionInfo, const TInstant removeInstant) { + PortionsPlacement.RemovePortion(portionInfo); + { + auto it = PortionByAge.find(portionInfo->RecordSnapshotMax()); + Y_VERIFY(it != PortionByAge.end()); + Y_VERIFY(it->second.erase(portionInfo->GetPortion())); + if (it->second.empty()) { + PortionByAge.erase(it); + } + } + ProvidePortionsNextLevel(removeInstant); + } + + std::shared_ptr<TColumnEngineChanges> BuildOptimizationTask(const TCompactionLimits& /*limits*/, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions, const TInstant /*currentInstant*/) const { + std::vector<std::vector<std::shared_ptr<TPortionInfo>>> portionGroups = PortionsPlacement.GetPortionsToCompact(PortionsSizeLimit, busyPortions); + if (portionGroups.empty()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "optimization_task_skipped"); + return nullptr; + } + std::vector<std::shared_ptr<TPortionInfo>> portions; + std::vector<NIndexedReader::TSortableBatchPosition> positions; + for (auto&& i : portionGroups) { + portions.insert(portions.end(), i.begin(), i.end()); + std::optional<NIndexedReader::TSortableBatchPosition> position; + for (auto&& p : i) { + NIndexedReader::TSortableBatchPosition pos(p->IndexKeyEnd().ToBatch(PrimaryKeysSchema), 0, PrimaryKeysSchema->field_names(), {}, false); + if (!position || position->Compare(pos) == std::partial_ordering::less) { + position = pos; + } + } + Y_VERIFY(position); + positions.emplace_back(*position); + } + TSaverContext saverContext(StoragesManager->GetOperator(IStoragesManager::DefaultStorageId), StoragesManager); + auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(CompactionLimits, granule, portions, saverContext); + for (auto&& i : positions) { + result->AddCheckPoint(i); + } + return result; + } + +}; + +class TLevelsOptimizerPlanner: public IOptimizerPlanner { +private: + using TBase = IOptimizerPlanner; + std::shared_ptr<TLevel> L3; + std::shared_ptr<TLevel> LMax; + std::shared_ptr<TLevel> LStart; + const std::shared_ptr<IStoragesManager> StoragesManager; + std::shared_ptr<TCounters> Counters; +protected: + virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) override { + const TInstant currentInstant = TInstant::Now(); + for (auto&& i : add) { + if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { + continue; + } + if (!i->GetMeta().RecordSnapshotMax) { + LMax->AddPortion(i, currentInstant); + } else { + LStart->AddPortion(i, currentInstant); + } + } + for (auto&& i : remove) { + if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { + continue; + } + if (!i->GetMeta().RecordSnapshotMax) { + LMax->RemovePortion(i, currentInstant); + } else { + LStart->RemovePortion(i, currentInstant); + } + } + } + virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const override { + return LStart->BuildOptimizationTask(limits, granule, busyPortions, TInstant::Now()); + + } + virtual TOptimizationPriority DoGetUsefulMetric() const override { + return TOptimizationPriority::Critical(LStart->GetWeight()); + } + virtual TString DoDebugString() const override { + return ""; + } + +public: + TLevelsOptimizerPlanner(const ui64 granuleId, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<arrow::Schema>& primaryKeysSchema) + : TBase(granuleId) + , StoragesManager(storagesManager) + , Counters(std::make_shared<TCounters>()) + { + L3 = std::make_shared<TLevel>(TDuration::Seconds(120), 24 << 20, nullptr, StoragesManager, Counters, primaryKeysSchema); + LMax = L3; + LStart = L3; + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make new file mode 100644 index 00000000000..3f96a571747 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + optimizer.cpp + counters.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/changes/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h deleted file mode 100644 index 9638b6836fe..00000000000 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h +++ /dev/null @@ -1,59 +0,0 @@ -#pragma once -#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <library/cpp/object_factory/object_factory.h> - -namespace NKikimr::NOlap { -struct TCompactionLimits; -class TGranuleMeta; -class TColumnEngineChanges; -} - -namespace NKikimr::NOlap::NStorageOptimizer { - -class IOptimizerPlanner { -private: - const ui64 GranuleId; -protected: - virtual void DoAddPortion(const std::shared_ptr<TPortionInfo>& info) = 0; - virtual void DoRemovePortion(const std::shared_ptr<TPortionInfo>& info) = 0; - virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const = 0; - virtual i64 DoGetUsefulMetric() const = 0; - virtual std::vector<std::shared_ptr<TPortionInfo>> DoGetPortionsOrderedByPK(const TSnapshot& snapshot) const = 0; - virtual TString DoDebugString() const { - return ""; - } -public: - using TFactory = NObjectFactory::TObjectFactory<IOptimizerPlanner, TString>; - IOptimizerPlanner(const ui64 granuleId) - : GranuleId(granuleId) - { - - } - - - virtual ~IOptimizerPlanner() = default; - TString DebugString() const { - return DoDebugString(); - } - - std::vector<std::shared_ptr<TPortionInfo>> GetPortionsOrderedByPK(const TSnapshot& snapshot) const { - return DoGetPortionsOrderedByPK(snapshot); - } - - void AddPortion(const std::shared_ptr<TPortionInfo>& info) { - Y_VERIFY(info); - NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId)); - DoAddPortion(info); - } - void RemovePortion(const std::shared_ptr<TPortionInfo>& info) { - Y_VERIFY(info); - NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId)); - DoRemovePortion(info); - } - std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const; - i64 GetUsefulMetric() const { - return DoGetUsefulMetric(); - } -}; - -} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp index 1c21de24754..aafb4ce76dc 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp @@ -1,7 +1,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <ydb/core/tx/columnshard/splitter/rb_splitter.h> #include <ydb/core/tx/columnshard/counters/indexation.h> -#include <ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h> #include <ydb/core/formats/arrow/serializer/batch_only.h> #include <ydb/core/formats/arrow/simple_builder/batch.h> #include <ydb/core/formats/arrow/simple_builder/filler.h> diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make index 15879284dca..0e0afa93c2a 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make @@ -1,15 +1,9 @@ LIBRARY() -SRCS( - optimizer.cpp - intervals_optimizer.cpp -) - PEERDIR( - contrib/libs/apache/arrow - ydb/core/protos - ydb/core/formats/arrow - ydb/core/tx/columnshard/engines/changes/abstract + ydb/core/tx/columnshard/engines/storage/optimizer/abstract + ydb/core/tx/columnshard/engines/storage/optimizer/intervals + ydb/core/tx/columnshard/engines/storage/optimizer/levels ) END() diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h index ed78f7ce3ec..c7c80a4908e 100644 --- a/ydb/core/tx/columnshard/engines/storage/storage.h +++ b/ydb/core/tx/columnshard/engines/storage/storage.h @@ -70,7 +70,8 @@ public: return {}; } for (auto it = GranuleCompactionPrioritySorting.rbegin(); it != GranuleCompactionPrioritySorting.rend(); ++it) { - if (it->first.GetWeight() == 0) { + if (it->first.GetWeight().IsZero()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "zero_granule_reached"); break; } Y_VERIFY(it->second.size()); diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index bdeface567a..d84f12dc8eb 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -305,6 +305,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, } changes->Blobs.insert(blobs.begin(), blobs.end()); + blobs.clear(); changes->StartEmergency(); NOlap::TConstructionContext context(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Indexation")); @@ -381,7 +382,7 @@ bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db, changes->StartEmergency(); - const bool result = engine.ApplyChanges(db, changes, TSnapshot(1,0)); + const bool result = engine.ApplyChanges(db, changes, TSnapshot(1,1)); changes->AbortEmergency(); return result; } @@ -527,12 +528,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // insert ui64 planStep = 1; - THashMap<TBlobRange, TString> blobs; ui64 numRows = 1000; ui64 rowPos = 0; for (ui64 txId = 1; txId <= 20; ++txId, rowPos += numRows) { TString testBlob = MakeTestBlob(rowPos, rowPos + numRows); auto blobRange = MakeBlobRange(++step, testBlob.size()); + THashMap<TBlobRange, TString> blobs; blobs[blobRange] = testBlob; // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] @@ -623,12 +624,13 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); - THashMap<TBlobRange, TString> blobs; ui64 numRows = 1000; ui64 rowPos = 0; + THashMap<TBlobRange, TString> blobsAll; for (ui64 txId = 1; txId <= 100; ++txId, rowPos += numRows) { TString testBlob = MakeTestBlob(rowPos, rowPos + numRows); auto blobRange = MakeBlobRange(++step, testBlob.size()); + THashMap<TBlobRange, TString> blobs; blobs[blobRange] = testBlob; // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] @@ -637,6 +639,9 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); + for (auto&& i : blobs) { + blobsAll[i.first] = i.second; + } UNIT_ASSERT(ok); } @@ -649,7 +654,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // compact planStep = 2; - bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {23, 5, 5}); + bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobsAll), step, {23, 5, 5}); UNIT_ASSERT(ok); // success write after compaction @@ -658,6 +663,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { for (ui64 txId = 1; txId <= 2; ++txId, rowPos += numRows) { TString testBlob = MakeTestBlob(rowPos, rowPos + numRows); auto blobRange = MakeBlobRange(++step, testBlob.size()); + THashMap<TBlobRange, TString> blobs; blobs[blobRange] = testBlob; // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] @@ -692,12 +698,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); engine.Load(db, lostBlobs); - THashMap<TBlobRange, TString> blobs; ui64 numRows = 1000; ui64 rowPos = 0; for (ui64 txId = 1; txId <= 20; ++txId, rowPos += numRows) { TString testBlob = MakeTestBlob(rowPos, rowPos + numRows); auto blobRange = MakeBlobRange(++step, testBlob.size()); + THashMap<TBlobRange, TString> blobs; blobs[blobRange] = testBlob; // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] |