diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-29 16:33:13 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-29 16:33:13 +0300 |
commit | 0e07043ae62bc543d90237be4bfaf04fab67a318 (patch) | |
tree | cdd361c43931acf0d17e458fe73d91e352f811d7 | |
parent | 2eb5121615620a9b4347646a764a2028bcfd5ef3 (diff) | |
download | ydb-0e07043ae62bc543d90237be4bfaf04fab67a318.tar.gz |
granules compaction priority usage
28 files changed, 936 insertions, 389 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 563b3957d3..62b978efd1 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -27,10 +27,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { LOG_S_INFO("Switched to work at " << TabletID() << " actor " << ctx.SelfID); IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID, IndexationCounters)); - CompactionActor = ctx.Register( - CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS), - // Default mail-box and batch pool. - TMailboxType::HTSwap, AppData(ctx)->BatchPoolId); + CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS)); EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID, EvictionCounters)); for (auto&& i : TablesManager.GetTables()) { ActivateTiering(i.first, i.second.GetTieringUsage()); diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index fab45d3bde..427259b209 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -259,7 +259,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) Self->ActiveCompaction--; Y_VERIFY(changes->CompactionInfo); - bool inGranule = changes->CompactionInfo->InGranule; + bool inGranule = changes->CompactionInfo->InGranule(); if (inGranule) { Self->IncCounter(ok ? COUNTER_COMPACTION_SUCCESS : COUNTER_COMPACTION_FAIL); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 22564ded18..0cdc3955fb 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -681,6 +681,11 @@ bool TColumnShard::SetupIndexation() { dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT); THashMap<ui64, ui64> overloadedPathGranules; for (auto& [pathId, committed] : InsertTable->GetCommitted()) { + auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(pathId); + if (pMap) { + overloadedPathGranules[pathId] = pMap->size(); + } + InsertTable->SetOverloaded(pathId, !!pMap); for (auto& data : committed) { ui32 dataSize = data.BlobSize(); Y_VERIFY(dataSize); @@ -689,13 +694,9 @@ bool TColumnShard::SetupIndexation() { if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) { continue; } - if (auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(data.PathId)) { - overloadedPathGranules[pathId] = pMap->size(); - InsertTable->SetOverloaded(data.PathId, true); + if (pMap) { ++ignored; continue; - } else { - InsertTable->SetOverloaded(data.PathId, false); } ++blobs; bytesToIndex += dataSize; @@ -713,7 +714,7 @@ bool TColumnShard::SetupIndexation() { LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored " << ignored << ") at tablet " << TabletID()); - // Force small indexations simetimes to keep BatchCache smaller + // Force small indexations sometimes to keep BatchCache smaller if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) { ++SkippedIndexations; return false; @@ -760,16 +761,14 @@ bool TColumnShard::SetupCompaction() { while (ActiveCompaction < TSettings::MAX_ACTIVE_COMPACTIONS) { auto limits = CompactionLimits.Get(); - auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(limits, LastCompactedGranule); - if (!compactionInfo || compactionInfo->Empty()) { + auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(limits); + if (!compactionInfo) { if (events.empty()) { LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID()); } break; } - Y_VERIFY(compactionInfo->Good()); - LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); ui64 outdatedStep = GetOutdatedStep(); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 7bfc833709..0f91676649 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -349,7 +349,6 @@ private: TWriteId LastWriteId = TWriteId{0}; ui64 LastPlannedStep = 0; ui64 LastPlannedTxId = 0; - ui64 LastCompactedGranule = 0; ui64 LastExportNo = 0; ui64 WritesInFly = 0; ui64 OwnerPathId = 0; diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt index bc23d223a5..8a29dbda94 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(predicate) add_subdirectory(reader) +add_subdirectory(storage) add_subdirectory(ut) get_built_tool_path( TOOL_enum_parser_bin @@ -32,6 +33,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-tablet_flat columnshard-engines-reader columnshard-engines-predicate + columnshard-engines-storage formats-arrow-compression udf-service-exception_policy tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt index 3dd6df8132..43a5fa4870 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt @@ -8,6 +8,7 @@ add_subdirectory(predicate) add_subdirectory(reader) +add_subdirectory(storage) add_subdirectory(ut) get_built_tool_path( TOOL_enum_parser_bin @@ -33,6 +34,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-tablet_flat columnshard-engines-reader columnshard-engines-predicate + columnshard-engines-storage formats-arrow-compression udf-service-exception_policy tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt index 3dd6df8132..43a5fa4870 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(predicate) add_subdirectory(reader) +add_subdirectory(storage) add_subdirectory(ut) get_built_tool_path( TOOL_enum_parser_bin @@ -33,6 +34,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-tablet_flat columnshard-engines-reader columnshard-engines-predicate + columnshard-engines-storage formats-arrow-compression udf-service-exception_policy tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt index bc23d223a5..8a29dbda94 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(predicate) add_subdirectory(reader) +add_subdirectory(storage) add_subdirectory(ut) get_built_tool_path( TOOL_enum_parser_bin @@ -32,6 +33,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-tablet_flat columnshard-engines-reader columnshard-engines-predicate + columnshard-engines-storage formats-arrow-compression udf-service-exception_policy tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index de4b774b9c..49b49c68aa 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -113,20 +113,68 @@ private: static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type); }; +class ICompactionObjectCallback { +public: + virtual ~ICompactionObjectCallback() = default; + virtual void OnCompactionStarted(const bool inGranule) = 0; + virtual void OnCompactionFinished() = 0; + virtual void OnCompactionFailed(const TString& reason) = 0; + virtual void OnCompactionCanceled(const TString& reason) = 0; + virtual TString DebugString() const = 0; +}; + struct TCompactionInfo { - TSet<ui64> Granules; - bool InGranule{false}; +private: + std::shared_ptr<ICompactionObjectCallback> CompactionObject; + mutable bool StatusProvided = false; + const bool InGranuleFlag = false; +public: + TCompactionInfo(std::shared_ptr<ICompactionObjectCallback> compactionObject, const bool inGranule) + : CompactionObject(compactionObject) + , InGranuleFlag(inGranule) + { + Y_VERIFY(compactionObject); + CompactionObject->OnCompactionStarted(InGranuleFlag); + } - bool Empty() const { return Granules.empty(); } - bool Good() const { return Granules.size() == 1; } + bool InGranule() const { + return InGranuleFlag; + } - friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) { - if (info.Good() == 1) { - ui64 granule = *info.Granules.begin(); - out << (info.InGranule ? "in granule" : "split granule") << " compaction of granule " << granule; - } else { - out << "wrong compaction of " << info.Granules.size() << " granules"; + template <class T> + const T& GetObject() const { + auto result = dynamic_cast<const T*>(CompactionObject.get()); + Y_VERIFY(result); + return *result; + } + + void CompactionFinished() const { + Y_VERIFY(!StatusProvided); + StatusProvided = true; + CompactionObject->OnCompactionFinished(); + } + + void CompactionCanceled(const TString& reason) const { + Y_VERIFY(!StatusProvided); + StatusProvided = true; + CompactionObject->OnCompactionCanceled(reason); + } + + void CompactionFailed(const TString& reason) const { + Y_VERIFY(!StatusProvided); + StatusProvided = true; + CompactionObject->OnCompactionFailed(reason); + } + + ~TCompactionInfo() { + Y_VERIFY_DEBUG(StatusProvided); + if (!StatusProvided) { + CompactionObject->OnCompactionFailed("compaction unexpectedly finished"); } + } + + friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) { + out << (info.InGranuleFlag ? "in granule" : "split granule") << " compaction of granule: " << info.CompactionObject->DebugString(); return out; } }; @@ -194,7 +242,7 @@ public: return "insert"; case COMPACTION: return CompactionInfo - ? (CompactionInfo->InGranule ? "compaction in granule" : "compaction split granule" ) + ? (CompactionInfo->InGranule() ? "compaction in granule" : "compaction split granule" ) : "compaction"; case CLEANUP: return "cleanup"; @@ -593,7 +641,7 @@ public: virtual std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, const THashSet<ui32>& columnIds, const TPKRangesFilter& pkRangesFilter) const = 0; - virtual std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) = 0; + virtual std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TSnapshot& outdatedSnapshot, const TCompactionLimits& limits) = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 1646b6e8d5..5503cef08a 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -5,6 +5,7 @@ #include <ydb/core/formats/arrow/one_batch_input_stream.h> #include <ydb/core/formats/arrow/merging_sorted_input_stream.h> +#include <ydb/library/conclusion/status.h> #include <concepts> @@ -12,7 +13,7 @@ namespace NKikimr::NOlap { namespace { -bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& portions, const TCompactionLimits& limits, +TConclusionStatus InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& portions, const TCompactionLimits& limits, const TSnapshot& snap, TColumnEngineForLogs::TMarksGranules& marksGranules) { ui64 oldTimePlanStep = snap.GetPlanStep() - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds(); ui32 insertedCount = 0; @@ -83,12 +84,12 @@ bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& por Y_VERIFY(insertedCount); // Trigger compaction if we have lots of inserted or if all inserted are old enough if (insertedNew && insertedCount < limits.InGranuleCompactInserts) { - return false; + return TConclusionStatus::Fail("not enough inserted portions (" + ::ToString(insertedNew) + ")"); } // Nothing to filter. Leave portions as is, no borders needed. if (filtered.empty() && goodCompacted.empty()) { - return true; + return TConclusionStatus::Success(); } // It's a map for SliceIntoGranules(). We use fake granule ids here to slice batch with borders. @@ -124,7 +125,7 @@ bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& por } marksGranules = TColumnEngineForLogs::TMarksGranules(std::move(borders)); - return true; + return TConclusionStatus::Success(); } } // namespace @@ -188,7 +189,7 @@ TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<ar TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits) - : Limits(limits) + : GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, limits)) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) @@ -212,10 +213,7 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() { Counters.Tables = PathGranules.size(); Counters.Granules = Granules.size(); Counters.EmptyGranules = EmptyGranules.size(); - Counters.OverloadedGranules = 0; - for (const auto& [_, set] : PathsGranulesOverloaded) { - Counters.OverloadedGranules += set.size(); - } + Counters.OverloadedGranules = GranulesStorage->GetOverloadedGranulesCount(); return Counters; } @@ -296,32 +294,18 @@ void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndex bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) { ClearIndex(); - - if (!LoadGranules(db)) { - return false; - } - if (!LoadColumns(db, lostBlobs)) { - return false; - } - if (!LoadCounters(db)) { - return false; - } - -#if 0 // Clear index data - for (auto& [granule, meta] : Granules) { - for (auto& [portion, portionInfo] : meta->Portions) { - for (auto& rec : portionInfo.Records) { - ColumnsTable->Erase(db, rec); - } + { + auto guard = GranulesStorage->StartPackModification(); + if (!LoadGranules(db)) { + return false; + } + if (!LoadColumns(db, lostBlobs)) { + return false; + } + if (!LoadCounters(db)) { + return false; } - GranulesTable->Erase(db, meta->Record); } - CountersTable->Write(db, LAST_PORTION, 0); - CountersTable->Write(db, LAST_GRANULE, 0); - CountersTable->Write(db, LAST_PLAN_STEP, 0); - CountersTable->Write(db, LAST_TX_ID, 0); - return true; -#endif THashSet<ui64> emptyGranulePaths; for (const auto& [granule, spg] : Granules) { @@ -329,10 +313,9 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl EmptyGranules.insert(granule); emptyGranulePaths.insert(spg->PathId()); } else { - CompactionGranules.insert(granule); CleanupGranules.insert(granule); } - for (const auto& [_, portionInfo] : spg->Portions) { + for (const auto& [_, portionInfo] : spg->GetPortions()) { UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD); } } @@ -357,8 +340,6 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl } } - UpdateOverloaded(Granules, Limits); - Y_VERIFY(!(LastPortion >> 63), "near to int overflow"); Y_VERIFY(!(LastGranule >> 63), "near to int overflow"); return true; @@ -366,8 +347,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) { auto callback = [&](const TGranuleRecord& rec) { - bool ok = SetGranule(rec, true); - Y_VERIFY(ok); + Y_VERIFY(SetGranule(rec, true)); }; return GranulesTable->Load(db, callback); @@ -381,15 +361,9 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs.erase(rec.BlobRange.BlobId); // Locate granule and append the record. if (const auto gi = Granules.find(rec.Granule); gi != Granules.end()) { - gi->second->Portions[rec.Portion].AddRecord(indexInfo, rec); + gi->second->AddColumnRecord(indexInfo, rec); } else { -#if 0 - LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId); - Granules.erase(rec.Granule); - return; -#else Y_VERIFY(false); -#endif } }); } @@ -429,7 +403,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(const TC if (PathGranules.contains(pathId)) { // Abort inserting if the path has overloaded granules. - if (PathsGranulesOverloaded.contains(pathId)) { + if (GranulesStorage->GetOverloaded(pathId)) { return {}; } @@ -455,44 +429,39 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: const TSnapshot& outdatedSnapshot, const TCompactionLimits& limits) { Y_VERIFY(info); - Y_VERIFY(info->Granules.size() == 1); auto changes = TChanges::BuildCompactionChanges(DefaultMark(), std::move(info), limits, LastSnapshot); - const ui64 granule = *changes->CompactionInfo->Granules.begin(); - const auto gi = Granules.find(granule); - // Check granule exists. - Y_VERIFY(gi != Granules.end()); + const auto& granuleInfo = changes->CompactionInfo->GetObject<TGranuleMeta>(); - changes->SwitchedPortions.reserve(gi->second->Portions.size()); + changes->SwitchedPortions.reserve(granuleInfo.GetPortions().size()); // Collect active portions for the granule. - for (const auto& [_, portionInfo] : gi->second->Portions) { + for (const auto& [_, portionInfo] : granuleInfo.GetPortions()) { if (portionInfo.IsActive()) { changes->SwitchedPortions.push_back(portionInfo); - Y_VERIFY(portionInfo.Granule() == granule); + Y_VERIFY(portionInfo.Granule() == granuleInfo.GetGranuleId()); } } - const ui64 pathId = gi->second->Record.PathId; + const ui64 pathId = granuleInfo.Record.PathId; Y_VERIFY(PathGranules.contains(pathId)); // Locate mark for the granule. for (const auto& [mark, pathGranule] : PathGranules[pathId]) { - if (pathGranule == granule) { - changes->SrcGranule = TChanges::TSrcGranule(pathId, granule, mark); + if (pathGranule == granuleInfo.GetGranuleId()) { + changes->SrcGranule = TChanges::TSrcGranule(pathId, granuleInfo.GetGranuleId(), mark); break; } } Y_VERIFY(changes->SrcGranule); - if (changes->CompactionInfo->InGranule) { + if (changes->CompactionInfo->InGranule()) { const TSnapshot completedSnap = std::max(LastSnapshot, outdatedSnapshot); - if (!InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, limits, completedSnap, changes->MergeBorders)) { + auto mergeInitResult = InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, limits, completedSnap, changes->MergeBorders); + if (!mergeInitResult) { // Return granule to Compaction list. This is equal to single compaction worker behaviour. - CompactionGranules.insert(granule); + changes->CompactionInfo->CompactionCanceled("cannot init in granule merge: " + mergeInitResult.GetErrorMessage()); return {}; } - } else { - GranulesInSplit.insert(granule); } Y_VERIFY(!changes->SwitchedPortions.empty()); @@ -519,7 +488,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T Y_VERIFY(Granules.contains(granule)); auto spg = Granules[granule]; Y_VERIFY(spg); - for (auto& [portion, info] : spg->Portions) { + for (auto& [portion, info] : spg->GetPortions()) { affectedRecords += info.NumRecords(); changes->PortionsToDrop.push_back(info); dropPortions.insert(portion); @@ -550,7 +519,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T Y_VERIFY(spg); bool isClean = true; - for (auto& [portion, info] : spg->Portions) { + for (auto& [portion, info] : spg->GetPortions()) { if (info.IsActive() || dropPortions.contains(portion)) { continue; } @@ -612,7 +581,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash auto spg = Granules[granule]; Y_VERIFY(spg); - for (auto& [portion, info] : spg->Portions) { + for (auto& [portion, info] : spg->GetPortions()) { if (!info.IsActive()) { continue; } @@ -690,37 +659,6 @@ std::vector<std::vector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGran return emptyGranules; } -void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const TCompactionLimits& limits) { - for (const auto& [granule, spg] : granules) { - const ui64 pathId = spg->Record.PathId; - - ui64 size = 0; - // Calculate byte-size of active portions. - for (const auto& [_, portionInfo] : spg->Portions) { - if (portionInfo.IsActive()) { - size += portionInfo.BlobsBytes(); - } - } - - // Size exceeds the configured limit. Mark granule as overloaded. - if (size >= limits.GranuleOverloadSize) { - if (PathsGranulesOverloaded[pathId].emplace(granule).second) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "overloaded")("path_id", pathId)("granule", granule); - } - } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { - // Size is under limit. Remove granule from the overloaded set. - if (pi->second.erase(granule)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "unoverloaded")("path_id", pathId)("granule", granule)("remained", pi->second.size()); - } - // Remove entry for the pathId if there it has no overloaded granules any more. - if (pi->second.empty()) { - PathsGranulesOverloaded.erase(pi); - } - } - } - SignalCounters.OverloadGranules->Set(PathsGranulesOverloaded.size()); -} - bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) { auto changes = std::static_pointer_cast<TChanges>(indexChanges); @@ -737,8 +675,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE // If it's a split compaction with moves appended portions are INSERTED (could have overlaps with others) if (changes->IsCompaction() && changes->PortionsToMove.empty()) { Y_VERIFY(changes->CompactionInfo); - produced = changes->CompactionInfo->InGranule ? - TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED; + produced = changes->CompactionInfo->InGranule() ? TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED; } portionInfo.UpdateRecordsMeta(indexInfo, produced); @@ -763,22 +700,15 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE } if (!ApplyChanges(db, *changes, snapshot, false)) { // validate only - if (changes->IsCompaction()) { - // Return granule to Compaction list. This is equal to single compaction worker behaviour. - for (const auto& portionInfo : changes->SwitchedPortions) { - CompactionGranules.insert(portionInfo.Granule()); - } + if (changes->CompactionInfo) { + changes->CompactionInfo->CompactionFailed("cannot apply changes"); } return false; } bool ok = ApplyChanges(db, *changes, snapshot, true); Y_VERIFY(ok); - - // Save updated granules for comapaction - if (changes->IsInsert()) { - for (auto& portionInfo : changes->AppendedPortions) { - CompactionGranules.insert(portionInfo.Granule()); - } + if (changes->CompactionInfo) { + changes->CompactionInfo->CompactionFinished(); } // Save updated granules for cleanup @@ -788,38 +718,6 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE CleanupGranules.insert(portionInfo.Granule()); } } - - // Update overloaded granules (only if tx would be applyed) - if (changes->IsInsert() || changes->IsCompaction() || changes->IsCleanup()) { - THashMap<ui64, std::shared_ptr<TGranuleMeta>> granules; - - const auto emplace_granule = [&](const ui64 id) { - // Lookup granule in the global table. - const auto gi = Granules.find(id); - // Granule should exists. - Y_VERIFY(gi != Granules.end()); - // Emplace granule. - granules.emplace(id, gi->second); - }; - - if (changes->IsCleanup()) { - granules.reserve(changes->PortionsToDrop.size()); - - for (const auto& portionInfo : changes->PortionsToDrop) { - emplace_granule(portionInfo.Granule()); - } - } else if (changes->IsCompaction() && !changes->CompactionInfo->InGranule) { - emplace_granule(changes->SrcGranule->Granule); - } else { - granules.reserve(changes->AppendedPortions.size()); - - for (const auto& portionInfo : changes->AppendedPortions) { - emplace_granule(portionInfo.Granule()); - } - } - - UpdateOverloaded(granules, indexChanges->Limits); - } return true; } @@ -829,14 +727,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, Y_VERIFY(changes.CompactionInfo); switchedPortions = &changes.SwitchedPortions; - if (changes.CompactionInfo->InGranule) { -#if 0 - if (changes.SwitchedPortions.size() <= changes.AppendedPortions.size()) { - LOG_S_ERROR("Cannot compact granule " << changes.SrcGranule->Granule << " at tablet " << TabletId); - return false; - } -#endif - } else { + if (!changes.CompactionInfo->InGranule()) { if (changes.NewGranules.empty()) { LOG_S_ERROR("Cannot split granule " << changes.SrcGranule->Granule << " at tablet " << TabletId); return false; @@ -876,7 +767,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, LOG_S_DEBUG("Cannot update unknown granule " << granule << " at tablet " << TabletId); return false; } - if (!Granules[granule]->Portions.contains(portion)) { + if (!Granules[granule]->GetPortions().contains(portion)) { LOG_S_ERROR("Cannot update unknown portion " << portionInfo << " at tablet " << TabletId); return false; } @@ -916,14 +807,14 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, ui64 granule = portionInfo.Granule(); ui64 portion = portionInfo.Portion(); - if (!Granules.contains(granule) || !Granules[granule]->Portions.contains(portion)) { + if (!Granules.contains(granule) || !Granules[granule]->GetPortions().contains(portion)) { LOG_S_ERROR("Cannot evict unknown portion " << portionInfo << " at tablet " << TabletId); return false; } // In case of race with compaction portion could become inactive // TODO: evict others instead of abort eviction - auto& oldInfo = Granules[granule]->Portions[portion]; + const TPortionInfo& oldInfo = Granules[granule]->GetPortionVerified(portion); if (!oldInfo.IsActive()) { LOG_S_WARN("Cannot evict inactive portion " << oldInfo << " at tablet " << TabletId); return false; @@ -1045,10 +936,8 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, void TColumnEngineForLogs::FreeLocks(std::shared_ptr<TColumnEngineChanges> indexChanges) { auto changes = std::static_pointer_cast<TChanges>(indexChanges); - if (changes->IsCompaction()) { - // Set granule not in split. Do not block writes in it. - Y_VERIFY(changes->SrcGranule); - GranulesInSplit.erase(changes->SrcGranule->Granule); + if (changes->CompactionInfo) { + changes->CompactionInfo->GetObject<TGranuleMeta>().AllowedInsertion(); } } @@ -1060,7 +949,7 @@ bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) { Y_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second); // Allocate granule info and ensure that there is no granule with same id inserted before. - Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec)).second); + Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage)).second); } else { // Granule with same id already exists. if (Granules.contains(rec.Granule)) { @@ -1078,11 +967,11 @@ bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) { void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) { Y_VERIFY(PathGranules.contains(pathId)); - Y_VERIFY(Granules.contains(granule)); - - Granules.erase(granule); + auto it = Granules.find(granule); + Y_VERIFY(it != Granules.end()); + Y_VERIFY(it->second->IsErasable()); + Granules.erase(it); EmptyGranules.erase(granule); - CompactionGranules.erase(granule); PathGranules[pathId].erase(mark); } @@ -1099,13 +988,12 @@ bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool a } Y_VERIFY(portionInfo.Valid()); - ui64 portion = portionInfo.Portion(); auto& spg = Granules[granule]; Y_VERIFY(spg); if (updateStats) { UpdatePortionStats(portionInfo); } - spg->Portions[portion] = portionInfo; + spg->UpsertPortion(portionInfo); return true; // It must return true if (apply == true) } @@ -1115,7 +1003,7 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap ui64 portion = portionInfo.Portion(); if (!apply) { - if (!Granules.contains(granule) || !Granules[granule]->Portions.contains(portion)) { + if (!Granules.contains(granule) || !Granules[granule]->GetPortions().contains(portion)) { LOG_S_ERROR("Cannot erase unknown portion " << portionInfo << " at tablet " << TabletId); return false; } @@ -1124,12 +1012,11 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap auto& spg = Granules[granule]; Y_VERIFY(spg); - Y_VERIFY(spg->Portions.contains(portion)); if (updateStats) { - UpdatePortionStats(spg->Portions[portion], EStatsUpdateType::ERASE); + UpdatePortionStats(spg->GetPortionVerified(portion), EStatsUpdateType::ERASE); } - spg->Portions.erase(portion); + Y_VERIFY(spg->ErasePortion(portion)); return true; // It must return true if (apply == true) } @@ -1139,9 +1026,9 @@ bool TColumnEngineForLogs::CanInsert(const TChanges& changes, const TSnapshot& c // Does insert have granule in split? for (const auto& portionInfo : changes.AppendedPortions) { Y_VERIFY(!portionInfo.Empty()); - ui64 granule = portionInfo.Granule(); - if (GranulesInSplit.contains(granule)) { - LOG_S_NOTICE("Cannot insert into splitting granule " << granule << " at tablet " << TabletId); + auto g = GetGranuleOptional(portionInfo.Granule()); + if (!!g && !g->IsInsertAllowed()) { + LOG_S_NOTICE("Cannot insert into splitting granule " << portionInfo.Granule() << " at tablet " << TabletId); return false; } } @@ -1238,7 +1125,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot Y_VERIFY(it != Granules.end()); auto& spg = it->second; Y_VERIFY(spg); - auto& portions = spg->Portions; + auto& portions = spg->GetPortions(); bool granuleHasDataForSnaphsot = false; TMap<TSnapshot, std::vector<const TPortionInfo*>> orderedPortions = GroupPortionsBySnapshot(portions, snapshot); @@ -1278,107 +1165,37 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot return out; } -static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompactionLimits& limits, bool& inserted) { - ui64 sumSize = 0; - ui64 sumMaxSize = 0; - size_t activeCount = 0; - THashSet<NArrow::TReplaceKey> borders; - bool differentBorders = false; - - for (const auto& [_, info] : portions) { - // We need only actual portions here (with empty XPlanStep:XTxId) - if (info.IsActive()) { - ++activeCount; - } else { - continue; - } - - // Find at least 2 unique borders - if (!differentBorders) { - borders.insert(info.IndexKeyStart()); - borders.insert(info.IndexKeyEnd()); - differentBorders = (borders.size() > 1); - } - - auto sizes = info.BlobsSizes(); - sumSize += sizes.first; - sumMaxSize += sizes.second; - if (info.IsInserted()) { - inserted = true; +std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompactionLimits& limits) { + const auto filter = [&](const ui64 granuleId) { + std::shared_ptr<TGranuleMeta> compactGranule = GetGranulePtrVerified(granuleId); + if (!compactGranule->NeedCompaction(limits)) { + return false; } - } - - // Do nothing if count of active portions is less than two. - if (activeCount < 2) { - inserted = false; - return false; - } - - return differentBorders && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize); -} - -std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) { - if (CompactionGranules.empty()) { + return true; + }; + auto gCompaction = GranulesStorage->GetGranuleForCompaction(filter); + if (!gCompaction) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_granule_for_compaction"); + SignalCounters.NoCompactGranulesSelection->Add(1); return {}; } + std::shared_ptr<TGranuleMeta> compactGranule = GetGranulePtrVerified(*gCompaction); - ui64 granule = 0; - bool inGranule = true; - - if (PathsGranulesOverloaded.size()) { - { - auto it = PathsGranulesOverloaded.begin(); - Y_VERIFY(it->second.size()); - granule = *it->second.begin(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_overload")("path_id", it->first)("granules_count", it->second.size())("granule", granule)("paths_count", PathsGranulesOverloaded.size()); - SignalCounters.CompactOverloadGranulesSelection->Add(1); - } - - const auto gi = Granules.find(granule); - Y_VERIFY(gi != Granules.end()); - - bool inserted = false; - Y_VERIFY(NeedSplit(gi->second->Portions, limits, inserted)); - inGranule = false; - Y_VERIFY(CompactionGranules.erase(granule)); - SignalCounters.SplitCompactGranulesSelection->Add(1); - } else { - for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty() && !granule;) { - // Start from the beginning if the end is reached. - if (it == CompactionGranules.end()) { - it = CompactionGranules.begin(); - } - - const auto gi = Granules.find(*it); - // Check granule exists. - Y_VERIFY(gi != Granules.end()); - - bool inserted = false; - if (NeedSplit(gi->second->Portions, limits, inserted)) { - inGranule = false; - granule = *it; - SignalCounters.SplitCompactGranulesSelection->Add(1); - } else if (inserted) { - granule = *it; - SignalCounters.InternalCompactGranulesSelection->Add(1); - } - - // Nothing to compact in the current granule. Throw it. - it = CompactionGranules.erase(it); - } - if (granule) { - lastCompactedGranule = granule; - } + if (compactGranule->IsOverloaded(limits)) { + SignalCounters.CompactOverloadGranulesSelection->Add(1); } - if (granule) { - auto info = std::make_unique<TCompactionInfo>(); - info->Granules.insert(granule); - info->InGranule = inGranule; - return info; + if (compactGranule->NeedSplitCompaction(limits)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("granule", compactGranule->DebugString())("compaction", "split"); + SignalCounters.SplitCompactGranulesSelection->Add(1); + } else if (compactGranule->NeedInternalCompaction(limits)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("granule", compactGranule->DebugString())("compaction", "internal"); + SignalCounters.InternalCompactGranulesSelection->Add(1); } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("granule", compactGranule->DebugString())("compaction", "no_need"); SignalCounters.NoCompactGranulesSelection->Add(1); + return {}; } - return {}; + return std::make_unique<TCompactionInfo>(compactGranule, compactGranule->NeedInternalCompaction(limits)); } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 409a6ae848..3130ebcae2 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -4,6 +4,8 @@ #include "column_engine.h" #include "scalars.h" #include <ydb/core/tx/columnshard/counters/engine_logs.h> +#include "storage/granule.h" +#include "storage/storage.h" namespace NKikimr::NArrow { struct TSortDescription; @@ -24,6 +26,7 @@ class TCountersTable; class TColumnEngineForLogs : public IColumnEngine { private: const NColumnShard::TEngineLogsCounters SignalCounters; + std::shared_ptr<TGranulesStorage> GranulesStorage; public: class TMarksGranules { public: @@ -56,6 +59,14 @@ public: }; public: + const TGranuleMeta* GetGranuleMeta() const { + if (CompactionInfo) { + return &CompactionInfo->GetObject<TGranuleMeta>(); + } else { + return nullptr; + } + } + struct TSrcGranule { ui64 PathId{0}; ui64 Granule{0}; @@ -187,13 +198,10 @@ public: } const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override { - if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { - return &pi->second; - } - return nullptr; + return GranulesStorage->GetOverloaded(pathId); } - bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); } + bool HasOverloadedGranules() const override { return GranulesStorage->HasOverloadedGranules(); } TString SerializeMark(const NArrow::TReplaceKey& key) const override { if (UseCompositeMarks()) { @@ -239,25 +247,32 @@ public: std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, const THashSet<ui32>& columnIds, const TPKRangesFilter& pkRangesFilter) const override; - std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) override; + std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits) override; private: using TMarksMap = std::map<TMark, ui64, TMark::TCompare>; - struct TGranuleMeta { - const TGranuleRecord Record; - THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo + const TGranuleMeta& GetGranuleVerified(const ui64 granuleId) const { + auto it = Granules.find(granuleId); + Y_VERIFY(it != Granules.end()); + return *it->second; + } - explicit TGranuleMeta(const TGranuleRecord& rec) - : Record(rec) - {} + std::shared_ptr<TGranuleMeta> GetGranulePtrVerified(const ui64 granuleId) const { + auto result = GetGranuleOptional(granuleId); + Y_VERIFY(result); + return result; + } - ui64 PathId() const noexcept { return Record.PathId; } - bool Empty() const noexcept { return Portions.empty(); } - }; + std::shared_ptr<TGranuleMeta> GetGranuleOptional(const ui64 granuleId) const { + auto it = Granules.find(granuleId); + if (it == Granules.end()) { + return nullptr; + } + return it->second; + } TVersionedIndex VersionedIndex; - const TCompactionLimits Limits; ui64 TabletId; std::shared_ptr<TGranulesTable> GranulesTable; std::shared_ptr<TColumnsTable> ColumnsTable; @@ -265,12 +280,9 @@ private: THashMap<ui64, std::shared_ptr<TGranuleMeta>> Granules; // granule -> meta THashMap<ui64, TMarksMap> PathGranules; // path_id -> {mark, granule} TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id - THashSet<ui64> GranulesInSplit; /// Set of empty granules. /// Just for providing count of empty granules. THashSet<ui64> EmptyGranules; - THashMap<ui64, THashSet<ui64>> PathsGranulesOverloaded; - TSet<ui64> CompactionGranules; THashSet<ui64> CleanupGranules; TColumnEngineStats Counters; ui64 LastPortion; @@ -298,10 +310,7 @@ private: Granules.clear(); PathGranules.clear(); PathStats.clear(); - GranulesInSplit.clear(); EmptyGranules.clear(); - PathsGranulesOverloaded.clear(); - CompactionGranules.clear(); CleanupGranules.clear(); Counters.Clear(); @@ -326,7 +335,6 @@ private: EStatsUpdateType updateType) const; bool CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const; - void UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const TCompactionLimits& limits); /// Return lists of adjacent empty granules for the path. std::vector<std::vector<std::pair<TMark, ui64>>> EmptyGranuleTracks(const ui64 pathId) const; diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 288d5883d1..69ce114e3a 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -104,6 +104,16 @@ TString TIndexInfo::GetColumnName(ui32 id, bool required) const { } } +std::vector<ui32> TIndexInfo::GetColumnIds() const { + std::vector<ui32> result; + for (auto&& i : Columns) { + result.emplace_back(i.first); + } + result.emplace_back((ui32)ESpecialColumn::PLAN_STEP); + result.emplace_back((ui32)ESpecialColumn::TX_ID); + return result; +} + std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) const { std::vector<TString> out; out.reserve(ids.size()); @@ -181,19 +191,18 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString& } std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema() const { - if (Schema) { - return Schema; - } + if (!Schema) { + std::vector<ui32> ids; + ids.reserve(Columns.size()); + for (const auto& [id, _] : Columns) { + ids.push_back(id); + } - std::vector<ui32> ids; - ids.reserve(Columns.size()); - for (const auto& [id, _] : Columns) { - ids.push_back(id); + // The ids had a set type before so we keep them sorted. + std::sort(ids.begin(), ids.end()); + Schema = MakeArrowSchema(Columns, ids); } - // The ids had a set type before so we keep them sorted. - std::sort(ids.begin(), ids.end()); - Schema = MakeArrowSchema(Columns, ids); return Schema; } diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 881dd26fc9..449ff1b64e 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -215,6 +215,7 @@ public: /// Returns names of columns defined by the specific ids. std::vector<TString> GetColumnNames(const std::vector<ui32>& ids) const; + std::vector<ui32> GetColumnIds() const; /// Returns info of columns defined by specific ids. std::vector<TNameTypeInfo> GetColumns(const std::vector<ui32>& ids) const; diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index 9eadd403d6..7288ee2e49 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -50,7 +50,7 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo, for (auto& rec : portionInfo.Records) { auto pos = resultSchema->GetFieldIndex(rec.ColumnId); Y_VERIFY(pos >= 0); - auto field = resultSchema->GetField(pos); + auto field = resultSchema->GetFieldByIndex(pos); auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext); auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver); @@ -75,7 +75,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, - std::vector<TString>& blobs) const { + std::vector<TString>& blobs, const TGranuleMeta* granuleMeta) const { Y_VERIFY(batch->num_rows()); auto resultSchema = SchemaVersions.GetSchema(snapshot); @@ -101,34 +101,48 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI TPortionInfo portionInfo; portionInfo.Records.reserve(resultSchema->GetSchema()->num_fields()); std::vector<TString> portionBlobs; - portionBlobs.reserve(resultSchema->GetSchema()->num_fields()); + portionBlobs.resize(resultSchema->GetSchema()->num_fields()); // Serialize portion's columns into blobs bool ok = true; - for (const auto& field : resultSchema->GetSchema()->fields()) { - const auto& name = field->name(); - ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(name); + + std::vector<TColumnSummary> sortedColumnIds; + if (granuleMeta) { + sortedColumnIds = granuleMeta->GetSummary().GetColumnIdsSortedBySizeDescending(); + } else { + for (auto&& i : resultSchema->GetIndexInfo().GetColumnIds()) { + sortedColumnIds.emplace_back(TColumnSummary(i, 0)); + } + } + + ui32 fillCounter = 0; + for (const auto& columnSummary : sortedColumnIds) { + const TString& columnName = resultSchema->GetIndexInfo().GetColumnName(columnSummary.GetColumnId()); + const int idx = resultSchema->GetFieldIndex(columnSummary.GetColumnId()); + Y_VERIFY(idx >= 0); + auto field = resultSchema->GetFieldByIndex(idx); + Y_VERIFY(field); + auto array = portionBatch->GetColumnByName(columnName); + Y_VERIFY(array); /// @warnign records are not valid cause of empty BlobId and zero Portion - TColumnRecord record = TColumnRecord::Make(granule, columnId, snapshot, 0); - auto columnSaver = resultSchema->GetColumnSaver(name, saverContext); - auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver, Counters); - if (!blob.size()) { - Counters.TrashDataSerializationBytes->Add(blob.size()); - Counters.TrashDataSerialization->Add(1); + TColumnRecord record = TColumnRecord::Make(granule, columnSummary.GetColumnId(), snapshot, 0); + auto columnSaver = resultSchema->GetColumnSaver(columnSummary.GetColumnId(), saverContext); + TString blob = TPortionInfo::SerializeColumnWithLimit(array, field, columnSaver, Counters); + if (!blob) { ok = false; break; - } else { - Counters.CorrectDataSerializationBytes->Add(blob.size()); - Counters.CorrectDataSerialization->Add(1); } - // TODO: combine small columns in one blob - portionBlobs.emplace_back(std::move(blob)); + portionInfo.InsertOneChunkColumn(idx, std::move(record)); + + portionBlobs[idx] = std::move(blob); + ++fillCounter; } if (ok) { + Y_VERIFY(fillCounter == portionBlobs.size()); portionInfo.AddMetadata(*resultSchema, portionBatch, tierName); out.emplace_back(std::move(portionInfo)); for (auto& blob : portionBlobs) { @@ -268,9 +282,9 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange // We could merge data here cause tablet limits indexing data portions #if 0 - auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortDescription()); // insert: no replace - Y_VERIFY(merged); - Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey())); + auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortDescription()); // insert: no replace + Y_VERIFY(merged); + Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey())); #else auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); Y_VERIFY(merged); @@ -280,7 +294,7 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], resultSchema->GetIndexInfo()); for (auto& [granule, batch] : granuleBatches) { - auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs); + auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta()); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -338,13 +352,13 @@ std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnE if (!slice || slice->num_rows() == 0) { continue; } - auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs); + auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, changes->GetGranuleMeta()); for (auto&& portionInfo : tmp) { portions.emplace_back(std::move(portionInfo)); } } } else { - portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs); + portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta()); } Y_VERIFY(portions.size() > 0); @@ -689,10 +703,9 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) { ui64 tmpGranule = changes->SetTmpGranule(pathId, mark); - for (const auto& batch : idBatches[id]) { // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs); + auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, changes->GetGranuleMeta()); Y_VERIFY(newPortions.size() > 0); for (auto& portion : newPortions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -708,7 +721,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr ui64 tmpGranule = changes->SetTmpGranule(pathId, ts); // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs); + auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, changes->GetGranuleMeta()); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -721,7 +734,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr bool TCompactionLogic::IsSplit(std::shared_ptr<TColumnEngineChanges> changes) { auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes); - return !castedChanges->CompactionInfo->InGranule; + return !castedChanges->CompactionInfo->InGranule(); } std::vector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const { diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h index dd5d896169..abaf034e86 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.h +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h @@ -14,7 +14,8 @@ protected: private: const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr; public: - TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, const NColumnShard::TIndexationCounters& counters) + TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, + const NColumnShard::TIndexationCounters& counters) : SchemaVersions(indexInfo) , Counters(counters) , TieringMap(&tieringMap) @@ -40,7 +41,7 @@ protected: const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& minSnapshot, - std::vector<TString>& blobs) const; + std::vector<TString>& blobs, const TGranuleMeta* granuleMeta) const; static std::shared_ptr<arrow::RecordBatch> GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch, const TIndexInfo& indexInfo); diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 0de422f5db..1d7f6f0fea 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -18,7 +18,7 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps auto columnId = GetIndexInfo().GetColumnId(resultField->name()); auto oldColumnIndex = dataSchema.GetFieldIndex(columnId); if (oldColumnIndex >= 0) { // ClumnExists - auto oldColumnInfo = dataSchema.GetField(oldColumnIndex); + auto oldColumnInfo = dataSchema.GetFieldByIndex(oldColumnIndex); Y_VERIFY(oldColumnInfo); auto columnData = batch->GetColumnByName(oldColumnInfo->name()); Y_VERIFY(columnData); @@ -41,12 +41,9 @@ TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array return saver.Apply(batch); } -TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - TColumnRecord&& record, - const TColumnSaver saver, - const NColumnShard::TIndexationCounters& counters, - const ui32 limitBytes) { +TString TPortionInfo::SerializeColumnWithLimit(const std::shared_ptr<arrow::Array>& array, + const std::shared_ptr<arrow::Field>& field, + const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, const ui32 limitBytes) { auto blob = SerializeColumn(array, field, saver); if (blob.size() >= limitBytes) { counters.TrashDataSerializationBytes->Add(blob.size()); @@ -56,10 +53,15 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr counters.CorrectDataSerializationBytes->Add(blob.size()); counters.CorrectDataSerialization->Add(1); } + return blob; +} +void TPortionInfo::InsertOneChunkColumn(const ui32 idx, TColumnRecord&& record) { record.Chunk = 0; - Records.emplace_back(std::move(record)); - return blob; + if (Records.size() <= idx) { + Records.resize(idx + 1); + } + Records[idx] = std::move(record); } void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) { diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 149a1496c0..96cac1c77c 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -34,7 +34,16 @@ public: virtual ui32 GetColumnId(const std::string& columnName) const = 0; virtual int GetFieldIndex(const ui32 columnId) const = 0; - virtual std::shared_ptr<arrow::Field> GetField(const int index) const = 0; + std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const { + auto schema = GetSchema(); + if (!schema || index < 0 || index >= schema->num_fields()) { + return nullptr; + } + return schema->field(index); + } + std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const { + return GetFieldByIndex(GetFieldIndex(columnId)); + } virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0; virtual const TIndexInfo& GetIndexInfo() const = 0; virtual const TSnapshot& GetSnapshot() const = 0; @@ -68,7 +77,7 @@ public: } virtual int GetFieldIndex(const ui32 columnId) const override { - TString columnName = IndexInfo.GetColumnName(columnId, false); + const TString& columnName = IndexInfo.GetColumnName(columnId, false); if (!columnName) { return -1; } @@ -76,10 +85,6 @@ public: return Schema->GetFieldIndex(name); } - std::shared_ptr<arrow::Field> GetField(const int index) const override { - return Schema->field(index); - } - const std::shared_ptr<arrow::Schema>& GetSchema() const override { return Schema; } @@ -142,10 +147,6 @@ public: return Schema->GetFieldIndex(name); } - std::shared_ptr<arrow::Field> GetField(const int index) const override { - return Schema->field(index); - } - const std::shared_ptr<arrow::Schema>& GetSchema() const override { return Schema; } @@ -550,8 +551,8 @@ public: for (auto& [pos, orderedChunks] : columnChunks) { Y_VERIFY(positionsMap.contains(pos)); size_t dataPos = positionsMap[pos]; - auto portionField = dataSchema.GetField(dataPos); - auto resultField = resultSchema.GetField(pos); + auto portionField = dataSchema.GetFieldByIndex(dataPos); + auto resultField = resultSchema.GetFieldByIndex(pos); Y_VERIFY(portionField->IsCompatibleWith(*resultField)); @@ -581,15 +582,13 @@ public: } static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - const TColumnSaver saver); + const std::shared_ptr<arrow::Field>& field, + const TColumnSaver saver); + static TString SerializeColumnWithLimit(const std::shared_ptr<arrow::Array>& array, + const std::shared_ptr<arrow::Field>& field, + const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, const ui32 sizeLimit = BLOB_BYTES_LIMIT); - TString AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - TColumnRecord&& record, - const TColumnSaver saver, - const NColumnShard::TIndexationCounters& counters, - ui32 limitBytes = BLOB_BYTES_LIMIT); + void InsertOneChunkColumn(const ui32 idx, TColumnRecord&& record); friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) { for (auto& rec : info.Records) { @@ -597,6 +596,7 @@ public: out << " (1 of " << info.Records.size() << " blobs shown)"; break; } + out << ";activity=" << info.IsActive() << ";"; if (!info.TierName.empty()) { out << " tier: " << info.TierName; } diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..566f967553 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-storage) +target_link_libraries(columnshard-engines-storage PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(columnshard-engines-storage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/storage.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..bafe85e672 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-storage) +target_link_libraries(columnshard-engines-storage PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(columnshard-engines-storage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/storage.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..bafe85e672 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-storage) +target_link_libraries(columnshard-engines-storage PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(columnshard-engines-storage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/storage.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..566f967553 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-storage) +target_link_libraries(columnshard-engines-storage PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(columnshard-engines-storage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/storage.cpp +) diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp new file mode 100644 index 0000000000..1a3ed38214 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -0,0 +1,123 @@ +#include "granule.h" +#include "storage.h" + +namespace NKikimr::NOlap { + +bool TGranuleMeta::NeedSplit(const TCompactionLimits& limits, bool& inserted) const { + inserted = GetSummary().GetInserted().GetPortionsCount(); + bool differentBorders = GetSummary().GetDifferentBorders(); + if (GetSummary().GetActivePortionsCount() < 2) { + inserted = false; + return false; + } + return differentBorders && (GetSummary().GetMaxColumnsSize() >= limits.GranuleBlobSplitSize || GetSummary().GetGranuleSize() >= limits.GranuleOverloadSize); +} + +ui64 TGranuleMeta::Size() const { + return GetSummary().GetGranuleSize(); +} + +void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { + Portions[info.Portion()] = info; + OnAfterChangePortion(info.Portion()); +} + +bool TGranuleMeta::ErasePortion(const ui64 portion) { + auto it = Portions.find(portion); + if (it == Portions.end()) { + return false; + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second)("pathId", Record.PathId); + Portions.erase(it); + OnAfterChangePortion(portion); + return true; +} + +void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) { + Portions[rec.Portion].AddRecord(indexInfo, rec); + OnAfterChangePortion(rec.Portion); +} + +void TGranuleMeta::OnAfterChangePortion(const ui64 /*portion*/) { + ResetCaches(); + Owner->UpdateGranuleInfo(*this); +} + +void TGranuleMeta::OnCompactionFinished() { + AllowInsertionFlag = false; + Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnCompactionFinished")("info", DebugString()); + CompactionPriorityInfo.OnCompactionFinished(); + Owner->UpdateGranuleInfo(*this); +} + +void TGranuleMeta::OnCompactionFailed(const TString& reason) { + AllowInsertionFlag = false; + Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction)); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "OnCompactionFailed")("reason", reason)("info", DebugString()); + CompactionPriorityInfo.OnCompactionFailed(); + Owner->UpdateGranuleInfo(*this); +} + +void TGranuleMeta::OnCompactionCanceled(const TString& reason) { + AllowInsertionFlag = false; + Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction)); + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "OnCompactionCanceled")("reason", reason)("info", DebugString()); + CompactionPriorityInfo.OnCompactionCanceled(); + Owner->UpdateGranuleInfo(*this); +} + +void TGranuleMeta::OnCompactionStarted(const bool inGranule) { + AllowInsertionFlag = false; + Y_VERIFY(Activity.empty()); + if (inGranule) { + Activity.emplace(EActivity::InternalCompaction); + } else { + Activity.emplace(EActivity::SplitCompaction); + } +} + +void TGranuleMeta::RebuildMetrics() const { + TGranuleSummary result; + std::map<ui32, ui64> sizeByColumns; + bool differentBorders = false; + THashSet<NArrow::TReplaceKey> borders; + + for (auto&& i : Portions) { + if (i.second.IsActive()) { + if (!differentBorders) { + borders.insert(i.second.IndexKeyStart()); + borders.insert(i.second.IndexKeyEnd()); + differentBorders = (borders.size() > 1); + } + for (auto&& c : i.second.Records) { + sizeByColumns[c.ColumnId] += c.BlobRange.Size; + } + auto sizes = i.second.BlobsSizes(); + if (i.second.IsInserted()) { + result.Inserted.PortionsSize += sizes.first; + result.Inserted.MaxColumnsSize += sizes.second; + ++result.Inserted.PortionsCount; + } else { + result.Other.PortionsSize += sizes.first; + result.Other.MaxColumnsSize += sizes.second; + ++result.Other.PortionsCount; + } + } + } + std::map<ui64, std::vector<ui32>> transpSorted; + for (auto&& i : sizeByColumns) { + transpSorted[i.second].emplace_back(i.first); + } + result.ColumnIdsSortedBySizeDescending.reserve(sizeByColumns.size()); + for (auto it = transpSorted.rbegin(); it != transpSorted.rend(); ++it) { + for (auto&& v : it->second) { + result.ColumnIdsSortedBySizeDescending.emplace_back(TColumnSummary(v, it->first)); + } + } + result.DifferentBorders = differentBorders; + + SummaryCache = result; +} + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h new file mode 100644 index 0000000000..104b8baf8e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -0,0 +1,250 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/column_engine.h> +#include <ydb/core/tx/columnshard/engines/portion_info.h> + +namespace NKikimr::NOlap { + +class TGranulesStorage; + +class TCompactionPriorityInfo { +protected: + ui32 ProblemSequenceLength = 0; + TInstant NextAttemptInstant; +public: + TInstant GetNextAttemptInstant() const { + return NextAttemptInstant; + } + + void OnCompactionFailed() { + ++ProblemSequenceLength; + NextAttemptInstant = TInstant::Now() + TDuration::Seconds(1); + } + + void OnCompactionFinished() { + ProblemSequenceLength = 0; + NextAttemptInstant = TInstant::Zero(); + } + + void OnCompactionCanceled() { + NextAttemptInstant = TInstant::Now() + TDuration::Seconds(1); + } +}; + +class TDataClassSummary { +private: + ui64 PortionsSize = 0; + ui64 MaxColumnsSize = 0; + ui64 PortionsCount = 0; + friend class TGranuleMeta; +public: + ui64 GetPortionsSize() const { + return PortionsSize; + } + ui64 GetMaxColumnsSize() const { + return MaxColumnsSize; + } + ui64 GetPortionsCount() const { + return PortionsCount; + } + + TDataClassSummary operator+(const TDataClassSummary& item) const { + TDataClassSummary result; + result.PortionsSize = PortionsSize + item.PortionsSize; + result.MaxColumnsSize = MaxColumnsSize + item.MaxColumnsSize; + result.PortionsCount = PortionsCount + item.PortionsCount; + return result; + } +}; + +class TColumnSummary { +private: + ui32 ColumnId; + ui64 BlobsSize; +public: + TColumnSummary(const ui32 columnId, const ui64 blobsSize) + : ColumnId(columnId) + , BlobsSize(blobsSize) + { + + } + + ui32 GetColumnId() const { + return ColumnId; + } + + ui64 GetBlobsSize() const { + return BlobsSize; + } +}; + +class TGranuleSummary { +private: + TDataClassSummary Inserted; + TDataClassSummary Other; + friend class TGranuleMeta; + std::vector<TColumnSummary> ColumnIdsSortedBySizeDescending; + bool DifferentBorders = false; +public: + const std::vector<TColumnSummary>& GetColumnIdsSortedBySizeDescending() const { + return ColumnIdsSortedBySizeDescending; + } + const TDataClassSummary& GetInserted() const { + return Inserted; + } + const TDataClassSummary& GetOther() const { + return Other; + } + bool GetDifferentBorders() const { + return DifferentBorders; + } + ui64 GetGranuleSize() const { + return (Inserted + Other).GetPortionsSize(); + } + ui64 GetActivePortionsCount() const { + return (Inserted + Other).GetPortionsCount(); + } + ui64 GetMaxColumnsSize() const { + return (Inserted + Other).GetMaxColumnsSize(); + } +}; + +class TCompactionPriority: public TCompactionPriorityInfo { +private: + using TBase = TCompactionPriorityInfo; + TGranuleSummary GranuleSummary; + ui64 GetWeightCorrected() const { + if (!GranuleSummary.GetDifferentBorders()) { + return 0; + } + if (GranuleSummary.GetActivePortionsCount() <= 1) { + return 0; + } + const ui64 weightedSize = (1.0 * GranuleSummary.GetGranuleSize() / 1024) * GranuleSummary.GetActivePortionsCount(); + return weightedSize; + } +public: + TCompactionPriority(const TCompactionPriorityInfo& data, const TGranuleSummary& granuleSummary) + : TBase(data) + , GranuleSummary(granuleSummary) + { + + } + bool operator<(const TCompactionPriority& item) const { + return std::tuple(GetWeightCorrected(), GranuleSummary.GetActivePortionsCount(), item.NextAttemptInstant) + < std::tuple(item.GetWeightCorrected(), item.GranuleSummary.GetActivePortionsCount(), NextAttemptInstant); + } + +}; + +class TGranuleMeta: public ICompactionObjectCallback, TNonCopyable { +private: + THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo + mutable std::optional<TGranuleSummary> SummaryCache; + bool NeedSplit(const TCompactionLimits& limits, bool& inserted) const; + void RebuildMetrics() const; + + void ResetCaches() { + SummaryCache = {}; + } + + enum class EActivity { + SplitCompaction, + InternalCompaction, + }; + + std::set<EActivity> Activity; + TCompactionPriorityInfo CompactionPriorityInfo; + mutable bool AllowInsertionFlag = false; + std::shared_ptr<TGranulesStorage> Owner; + + void OnAfterChangePortion(const ui64 portion); +public: + const TGranuleSummary& GetSummary() const { + if (!SummaryCache) { + RebuildMetrics(); + } + return *SummaryCache; + } + TCompactionPriority GetCompactionPriority() const { + return TCompactionPriority(CompactionPriorityInfo, GetSummary()); + } + + bool NeedCompaction(const TCompactionLimits& limits) const { + if (InCompaction() || Empty()) { + return false; + } + return NeedSplitCompaction(limits) || NeedInternalCompaction(limits); + } + + bool InCompaction() const { + return Activity.contains(EActivity::SplitCompaction) || Activity.contains(EActivity::InternalCompaction); + } + + void AllowedInsertion() const { + if (InCompaction()) { + AllowInsertionFlag = true; + } + } + + bool IsInsertAllowed() const { + return AllowInsertionFlag || !Activity.contains(EActivity::SplitCompaction); + } + + bool IsErasable() const { + return Activity.empty(); + } + + virtual void OnCompactionStarted(const bool inGranule) override; + + virtual void OnCompactionCanceled(const TString& reason) override; + virtual void OnCompactionFailed(const TString& reason) override; + virtual void OnCompactionFinished() override; + + void UpsertPortion(const TPortionInfo& info); + + virtual TString DebugString() const override { + return TStringBuilder() << "granule:" << GetGranuleId() << ";path_id:" << Record.PathId << ";"; + } + + const TGranuleRecord Record; + + void AddColumnRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec); + + const THashMap<ui64, TPortionInfo>& GetPortions() const { + return Portions; + } + + const TPortionInfo& GetPortionVerified(const ui64 portion) const { + auto it = Portions.find(portion); + Y_VERIFY(it != Portions.end()); + return it->second; + } + + bool ErasePortion(const ui64 portion); + + explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner) + : Owner(owner) + , Record(rec) { + } + + ui64 GetGranuleId() const { + return Record.Granule; + } + ui64 PathId() const noexcept { return Record.PathId; } + bool Empty() const noexcept { return Portions.empty(); } + + ui64 Size() const; + bool IsOverloaded(const TCompactionLimits& limits) const { + return Size() >= limits.GranuleOverloadSize; + } + bool NeedSplitCompaction(const TCompactionLimits& limits) const { + bool inserted = false; + return NeedSplit(limits, inserted); + } + bool NeedInternalCompaction(const TCompactionLimits& limits) const { + bool inserted = false; + return !NeedSplit(limits, inserted) && inserted; + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/storage.cpp b/ydb/core/tx/columnshard/engines/storage/storage.cpp new file mode 100644 index 0000000000..01c66c8bc8 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/storage.cpp @@ -0,0 +1,71 @@ +#include "storage.h" + +namespace NKikimr::NOlap { + +const THashSet<ui64>* TGranulesStorage::GetOverloaded(ui64 pathId) const { + if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { + return &pi->second; + } + return nullptr; +} + +void TGranulesStorage::UpdateGranuleInfo(const TGranuleMeta& granule) { + if (PackModificationFlag) { + PackModifiedGranules[granule.GetGranuleId()] = &granule; + return; + } + { + auto it = GranulesCompactionPriority.find(granule.GetGranuleId()); + auto gPriority = granule.GetCompactionPriority(); + if (it == GranulesCompactionPriority.end()) { + it = GranulesCompactionPriority.emplace(granule.GetGranuleId(), gPriority).first; + Y_VERIFY(GranuleCompactionPrioritySorting[gPriority].emplace(granule.GetGranuleId()).second); + } else { + auto itSorting = GranuleCompactionPrioritySorting.find(it->second); + Y_VERIFY(itSorting != GranuleCompactionPrioritySorting.end()); + Y_VERIFY(itSorting->second.erase(granule.GetGranuleId())); + if (itSorting->second.empty()) { + GranuleCompactionPrioritySorting.erase(itSorting); + } + it->second = gPriority; + Y_VERIFY(GranuleCompactionPrioritySorting[gPriority].emplace(granule.GetGranuleId()).second); + } + } + const ui64 pathId = granule.Record.PathId; + + // Size exceeds the configured limit. Mark granule as overloaded. + if (granule.Size() >= Limits.GranuleOverloadSize) { + if (PathsGranulesOverloaded[pathId].emplace(granule.GetGranuleId()).second) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "overloaded")("path_id", pathId)("granule", granule.GetGranuleId()); + } + } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { + // Size is under limit. Remove granule from the overloaded set. + if (pi->second.erase(granule.GetGranuleId())) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "unoverloaded")("path_id", pathId)("granule", granule.GetGranuleId())("remained", pi->second.size()); + } + // Remove entry for the pathId if there it has no overloaded granules any more. + if (pi->second.empty()) { + PathsGranulesOverloaded.erase(pi); + } + } + Counters.OverloadGranules->Set(PathsGranulesOverloaded.size()); + if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { + TStringBuilder sb; + for (auto&& i : PathsGranulesOverloaded) { + sb << i.first << ":"; + bool isFirst = true; + for (auto&& g : i.second) { + if (!isFirst) { + sb << ","; + } else { + isFirst = false; + } + sb << g; + } + sb << ";"; + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("overload_granules", sb); + } +} + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h new file mode 100644 index 0000000000..14785f2c27 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/storage.h @@ -0,0 +1,99 @@ +#pragma once +#include "granule.h" +#include <ydb/core/tx/columnshard/counters/engine_logs.h> + +namespace NKikimr::NOlap { + +class TGranulesStorage { +private: + const TCompactionLimits Limits; + THashMap<ui64, THashSet<ui64>> PathsGranulesOverloaded; + const NColumnShard::TEngineLogsCounters Counters; + THashMap<ui64, TCompactionPriority> GranulesCompactionPriority; + std::map<TCompactionPriority, std::set<ui64>> GranuleCompactionPrioritySorting; + bool PackModificationFlag = false; + THashMap<ui64, const TGranuleMeta*> PackModifiedGranules; + void StartModificationImpl() { + Y_VERIFY(!PackModificationFlag); + PackModificationFlag = true; + } + + void FinishModificationImpl() { + Y_VERIFY(PackModificationFlag); + PackModificationFlag = false; + for (auto&& i : PackModifiedGranules) { + UpdateGranuleInfo(*i.second); + } + PackModifiedGranules.clear(); + } + +public: + TGranulesStorage(const NColumnShard::TEngineLogsCounters counters, const TCompactionLimits& limits) + : Limits(limits) + , Counters(counters) { + + } + + class TModificationGuard { + private: + TGranulesStorage& Owner; + public: + TModificationGuard(TGranulesStorage& storage) + : Owner(storage) { + Owner.StartModificationImpl(); + } + + ~TModificationGuard() { + Owner.FinishModificationImpl(); + } + }; + + ui32 GetOverloadedGranulesCount() const { + ui32 result = 0; + for (auto&& i : PathsGranulesOverloaded) { + result += i.second.size(); + } + return result; + } + + bool HasOverloadedGranules() const { + return PathsGranulesOverloaded.size(); + } + + TModificationGuard StartPackModification() { + return TModificationGuard(*this); + } + + const THashSet<ui64>* GetOverloaded(ui64 pathId) const; + template <class TFilter> + std::optional<ui64> GetGranuleForCompaction(const TFilter& filter) const { + if (!GranuleCompactionPrioritySorting.size()) { + return {}; + } + const TInstant now = TInstant::Now(); + std::optional<ui64> reserve; + TInstant reserveInstant; + for (auto it = GranuleCompactionPrioritySorting.rbegin(); it != GranuleCompactionPrioritySorting.rend(); ++it) { + auto itSorting = GranuleCompactionPrioritySorting.rbegin(); + Y_VERIFY(itSorting->second.size()); + for (auto&& i : itSorting->second) { + if (filter(i)) { + if (it->first.GetNextAttemptInstant() > now) { + if (!reserve || reserveInstant > it->first.GetNextAttemptInstant()) { + reserveInstant = it->first.GetNextAttemptInstant(); + reserve = i; + } + } else { + return i; + } + } + } + } + return reserve; + } + + void UpdateGranuleInfo(const TGranuleMeta& granule); + +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 35e1347a0f..6dbb7f3465 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -297,10 +297,9 @@ struct TExpected { bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step, const TExpected& expected) { - ui64 lastCompactedGranule = 0; - auto compactionInfo = engine.Compact(TestLimits(), lastCompactedGranule); - UNIT_ASSERT_VALUES_EQUAL(compactionInfo->Granules.size(), 1); - UNIT_ASSERT(!compactionInfo->InGranule); + auto compactionInfo = engine.Compact(TestLimits()); + UNIT_ASSERT(!!compactionInfo); + UNIT_ASSERT(!compactionInfo->InGranule()); std::shared_ptr<TColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TSnapshot::Zero(), TestLimits()); UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 299020b6bb..ad1cb024f4 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2568,7 +2568,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } if (msg->IndexChanges->CompactionInfo) { ++compactionsHappened; - Cerr << "Compaction at snaphsot "<< msg->IndexChanges->InitSnapshot + Cerr << "Compaction at snapshot "<< msg->IndexChanges->InitSnapshot << " old portions:"; ui64 srcGranule{0}; for (const auto& portionInfo : msg->IndexChanges->SwitchedPortions) { |