aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-12-03 18:31:04 +0300
committerGitHub <noreply@github.com>2024-12-03 18:31:04 +0300
commit14a54ab15843ef7b15c3c45bf424294b2444607a (patch)
treefd1795951afb2eb39b662ceb1f6934d58f49fb60
parente0898efe0a808c2ec2c0d14b00dd5a0ea28b84fc (diff)
downloadydb-14a54ab15843ef7b15c3c45bf424294b2444607a.tar.gz
fix cleanup volume limits and speed up versions index copy (#12249)
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h10
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h5
6 files changed, 24 insertions, 9 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 2d8e108dd1..2fa20b684b 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -717,7 +717,7 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TCommittedData*>&& da
auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data));
Y_ABORT_UNLESS(indexChanges);
- auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterIndexing);
@@ -875,7 +875,7 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
compaction->SetQueueGuard(guard);
compaction->Start(*this);
- auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
auto request = compaction->ExtractDataAccessorsRequest();
const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) +
indexChanges->CalcMemoryForUsage();
@@ -973,7 +973,7 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
return false;
}
- auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
for (auto&& i : indexChanges) {
i->Start(*this);
auto request = i->ExtractDataAccessorsRequest();
@@ -1033,7 +1033,7 @@ void TColumnShard::SetupCleanupPortions() {
changes->Start(*this);
auto request = changes->ExtractDataAccessorsRequest();
- auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema());
const auto subscriber = std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId(), changes, actualIndexInfo);
@@ -1064,7 +1064,7 @@ void TColumnShard::SetupCleanupTables() {
}
ACFL_DEBUG("background", "cleanup")("changes_info", changes->DebugString());
- auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, changes, false);
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 58581bab51..29dc37d9f7 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -329,6 +329,7 @@ public:
virtual std::vector<TCSMetadataRequest> CollectMetadataRequests() const = 0;
virtual const TVersionedIndex& GetVersionedIndex() const = 0;
+ virtual const std::shared_ptr<TVersionedIndex>& GetVersionedIndexReadonlyCopy() = 0;
virtual std::shared_ptr<TVersionedIndex> CopyVersionedIndexPtr() const = 0;
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 8fd014582c..4e05f1846d 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -333,7 +333,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
ui32 skipLocked = 0;
ui32 portionsFromDrop = 0;
bool limitExceeded = false;
- const ui32 maxChunksCount = 100000;
+ const ui32 maxChunksCount = 500000;
const ui32 maxPortionsCount = 1000;
for (ui64 pathId : pathsToDrop) {
auto g = GranulesStorage->GetGranuleOptional(pathId);
@@ -401,7 +401,8 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
}
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())(
- "portions_prepared", changes->GetPortionsToDrop().size())("drop", portionsFromDrop)("skip", skipLocked);
+ "portions_prepared", changes->GetPortionsToDrop().size())("drop", portionsFromDrop)("skip", skipLocked)("portions_counter", portionsCount)(
+ "chunks", chunksCount)("limit", limitExceeded)("max_portions", maxPortionsCount)("max_chunks", maxChunksCount);
if (changes->GetPortionsToDrop().empty()) {
return nullptr;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 92f1a4ed5e..5c6d5b988e 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -63,8 +63,17 @@ private:
std::shared_ptr<NActualizer::TController> ActualizationController;
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
+ TVersionedIndex VersionedIndex;
+ std::shared_ptr<TVersionedIndex> VersionedIndexCopy;
public:
+ virtual const std::shared_ptr<TVersionedIndex>& GetVersionedIndexReadonlyCopy() override {
+ if (!VersionedIndexCopy || !VersionedIndexCopy->IsEqualTo(VersionedIndex)) {
+ VersionedIndexCopy = std::make_shared<TVersionedIndex>(VersionedIndex);
+ }
+ return VersionedIndexCopy;
+ }
+
const std::shared_ptr<NActualizer::TController>& GetActualizationController() const {
return ActualizationController;
}
@@ -224,7 +233,6 @@ public:
void AppendPortion(const TPortionDataAccessor& portionInfo, const bool addAsAccessor = true);
private:
- TVersionedIndex VersionedIndex;
ui64 TabletId;
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
std::map<TInstant, std::vector<TPortionInfo::TConstPtr>> CleanupPortions;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 8cc9c4e3be..96d307390e 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -51,7 +51,7 @@ ui64 TPortionInfo::GetMetadataMemorySize() const {
}
ui64 TPortionInfo::GetApproxChunksCount(const ui32 schemaColumnsCount) const {
- return schemaColumnsCount * 256 * (GetRecordsCount() / 10000 + 1);
+ return schemaColumnsCount * (GetRecordsCount() / 10000 + 1);
}
void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const {
diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h
index e320089f85..c5fc018e7e 100644
--- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h
+++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h
@@ -33,6 +33,11 @@ class TVersionedIndex {
ISnapshotSchema::TPtr SchemeForActualization;
public:
+ bool IsEqualTo(const TVersionedIndex& vIndex) {
+ return LastSchemaVersion == vIndex.LastSchemaVersion && SnapshotByVersion.size() == vIndex.SnapshotByVersion.size() &&
+ ShardingInfo.size() == vIndex.ShardingInfo.size() && SchemeVersionForActualization == vIndex.SchemeVersionForActualization;
+ }
+
ISnapshotSchema::TPtr GetLastCriticalSchema() const {
return SchemeForActualization;
}