diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-12-03 18:31:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-03 18:31:04 +0300 |
commit | 14a54ab15843ef7b15c3c45bf424294b2444607a (patch) | |
tree | fd1795951afb2eb39b662ceb1f6934d58f49fb60 | |
parent | e0898efe0a808c2ec2c0d14b00dd5a0ea28b84fc (diff) | |
download | ydb-14a54ab15843ef7b15c3c45bf424294b2444607a.tar.gz |
fix cleanup volume limits and speed up versions index copy (#12249)
6 files changed, 24 insertions, 9 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 2d8e108dd1..2fa20b684b 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -717,7 +717,7 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TCommittedData*>&& da auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data)); Y_ABORT_UNLESS(indexChanges); - auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); indexChanges->Start(*this); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterIndexing); @@ -875,7 +875,7 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo compaction->SetQueueGuard(guard); compaction->Start(*this); - auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); auto request = compaction->ExtractDataAccessorsRequest(); const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) + indexChanges->CalcMemoryForUsage(); @@ -973,7 +973,7 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) { return false; } - auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); for (auto&& i : indexChanges) { i->Start(*this); auto request = i->ExtractDataAccessorsRequest(); @@ -1033,7 +1033,7 @@ void TColumnShard::SetupCleanupPortions() { changes->Start(*this); auto request = changes->ExtractDataAccessorsRequest(); - auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()); const auto subscriber = std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId(), changes, actualIndexInfo); @@ -1064,7 +1064,7 @@ void TColumnShard::SetupCleanupTables() { } ACFL_DEBUG("background", "cleanup")("changes_info", changes->DebugString()); - auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, changes, false); ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 58581bab51..29dc37d9f7 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -329,6 +329,7 @@ public: virtual std::vector<TCSMetadataRequest> CollectMetadataRequests() const = 0; virtual const TVersionedIndex& GetVersionedIndex() const = 0; + virtual const std::shared_ptr<TVersionedIndex>& GetVersionedIndexReadonlyCopy() = 0; virtual std::shared_ptr<TVersionedIndex> CopyVersionedIndexPtr() const = 0; virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 8fd014582c..4e05f1846d 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -333,7 +333,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start ui32 skipLocked = 0; ui32 portionsFromDrop = 0; bool limitExceeded = false; - const ui32 maxChunksCount = 100000; + const ui32 maxChunksCount = 500000; const ui32 maxPortionsCount = 1000; for (ui64 pathId : pathsToDrop) { auto g = GranulesStorage->GetGranuleOptional(pathId); @@ -401,7 +401,8 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start } } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())( - "portions_prepared", changes->GetPortionsToDrop().size())("drop", portionsFromDrop)("skip", skipLocked); + "portions_prepared", changes->GetPortionsToDrop().size())("drop", portionsFromDrop)("skip", skipLocked)("portions_counter", portionsCount)( + "chunks", chunksCount)("limit", limitExceeded)("max_portions", maxPortionsCount)("max_chunks", maxChunksCount); if (changes->GetPortionsToDrop().empty()) { return nullptr; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 92f1a4ed5e..5c6d5b988e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -63,8 +63,17 @@ private: std::shared_ptr<NActualizer::TController> ActualizationController; std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>(); + TVersionedIndex VersionedIndex; + std::shared_ptr<TVersionedIndex> VersionedIndexCopy; public: + virtual const std::shared_ptr<TVersionedIndex>& GetVersionedIndexReadonlyCopy() override { + if (!VersionedIndexCopy || !VersionedIndexCopy->IsEqualTo(VersionedIndex)) { + VersionedIndexCopy = std::make_shared<TVersionedIndex>(VersionedIndex); + } + return VersionedIndexCopy; + } + const std::shared_ptr<NActualizer::TController>& GetActualizationController() const { return ActualizationController; } @@ -224,7 +233,6 @@ public: void AppendPortion(const TPortionDataAccessor& portionInfo, const bool addAsAccessor = true); private: - TVersionedIndex VersionedIndex; ui64 TabletId; TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id std::map<TInstant, std::vector<TPortionInfo::TConstPtr>> CleanupPortions; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 8cc9c4e3be..96d307390e 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -51,7 +51,7 @@ ui64 TPortionInfo::GetMetadataMemorySize() const { } ui64 TPortionInfo::GetApproxChunksCount(const ui32 schemaColumnsCount) const { - return schemaColumnsCount * 256 * (GetRecordsCount() / 10000 + 1); + return schemaColumnsCount * (GetRecordsCount() / 10000 + 1); } void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index e320089f85..c5fc018e7e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -33,6 +33,11 @@ class TVersionedIndex { ISnapshotSchema::TPtr SchemeForActualization; public: + bool IsEqualTo(const TVersionedIndex& vIndex) { + return LastSchemaVersion == vIndex.LastSchemaVersion && SnapshotByVersion.size() == vIndex.SnapshotByVersion.size() && + ShardingInfo.size() == vIndex.ShardingInfo.size() && SchemeVersionForActualization == vIndex.SchemeVersionForActualization; + } + ISnapshotSchema::TPtr GetLastCriticalSchema() const { return SchemeForActualization; } |