aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-29 16:33:13 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-29 16:33:13 +0300
commit0e07043ae62bc543d90237be4bfaf04fab67a318 (patch)
treecdd361c43931acf0d17e458fe73d91e352f811d7
parent2eb5121615620a9b4347646a764a2028bcfd5ef3 (diff)
downloadydb-0e07043ae62bc543d90237be4bfaf04fab67a318.tar.gz
granules compaction priority usage
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h72
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp347
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h54
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp29
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h1
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp67
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.h5
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h40
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp123
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h250
-rw-r--r--ydb/core/tx/columnshard/engines/storage/storage.cpp71
-rw-r--r--ydb/core/tx/columnshard/engines/storage/storage.h99
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp7
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp2
28 files changed, 936 insertions, 389 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 563b3957d3..62b978efd1 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -27,10 +27,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
LOG_S_INFO("Switched to work at " << TabletID() << " actor " << ctx.SelfID);
IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID, IndexationCounters));
- CompactionActor = ctx.Register(
- CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS),
- // Default mail-box and batch pool.
- TMailboxType::HTSwap, AppData(ctx)->BatchPoolId);
+ CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS));
EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID, EvictionCounters));
for (auto&& i : TablesManager.GetTables()) {
ActivateTiering(i.first, i.second.GetTieringUsage());
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index fab45d3bde..427259b209 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -259,7 +259,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Self->ActiveCompaction--;
Y_VERIFY(changes->CompactionInfo);
- bool inGranule = changes->CompactionInfo->InGranule;
+ bool inGranule = changes->CompactionInfo->InGranule();
if (inGranule) {
Self->IncCounter(ok ? COUNTER_COMPACTION_SUCCESS : COUNTER_COMPACTION_FAIL);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 22564ded18..0cdc3955fb 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -681,6 +681,11 @@ bool TColumnShard::SetupIndexation() {
dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT);
THashMap<ui64, ui64> overloadedPathGranules;
for (auto& [pathId, committed] : InsertTable->GetCommitted()) {
+ auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(pathId);
+ if (pMap) {
+ overloadedPathGranules[pathId] = pMap->size();
+ }
+ InsertTable->SetOverloaded(pathId, !!pMap);
for (auto& data : committed) {
ui32 dataSize = data.BlobSize();
Y_VERIFY(dataSize);
@@ -689,13 +694,9 @@ bool TColumnShard::SetupIndexation() {
if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) {
continue;
}
- if (auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(data.PathId)) {
- overloadedPathGranules[pathId] = pMap->size();
- InsertTable->SetOverloaded(data.PathId, true);
+ if (pMap) {
++ignored;
continue;
- } else {
- InsertTable->SetOverloaded(data.PathId, false);
}
++blobs;
bytesToIndex += dataSize;
@@ -713,7 +714,7 @@ bool TColumnShard::SetupIndexation() {
LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored "
<< ignored << ") at tablet " << TabletID());
- // Force small indexations simetimes to keep BatchCache smaller
+ // Force small indexations sometimes to keep BatchCache smaller
if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) {
++SkippedIndexations;
return false;
@@ -760,16 +761,14 @@ bool TColumnShard::SetupCompaction() {
while (ActiveCompaction < TSettings::MAX_ACTIVE_COMPACTIONS) {
auto limits = CompactionLimits.Get();
- auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(limits, LastCompactedGranule);
- if (!compactionInfo || compactionInfo->Empty()) {
+ auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(limits);
+ if (!compactionInfo) {
if (events.empty()) {
LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID());
}
break;
}
- Y_VERIFY(compactionInfo->Good());
-
LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID());
ui64 outdatedStep = GetOutdatedStep();
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 7bfc833709..0f91676649 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -349,7 +349,6 @@ private:
TWriteId LastWriteId = TWriteId{0};
ui64 LastPlannedStep = 0;
ui64 LastPlannedTxId = 0;
- ui64 LastCompactedGranule = 0;
ui64 LastExportNo = 0;
ui64 WritesInFly = 0;
ui64 OwnerPathId = 0;
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index bc23d223a5..8a29dbda94 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(predicate)
add_subdirectory(reader)
+add_subdirectory(storage)
add_subdirectory(ut)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -32,6 +33,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-tablet_flat
columnshard-engines-reader
columnshard-engines-predicate
+ columnshard-engines-storage
formats-arrow-compression
udf-service-exception_policy
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index 3dd6df8132..43a5fa4870 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -8,6 +8,7 @@
add_subdirectory(predicate)
add_subdirectory(reader)
+add_subdirectory(storage)
add_subdirectory(ut)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -33,6 +34,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-tablet_flat
columnshard-engines-reader
columnshard-engines-predicate
+ columnshard-engines-storage
formats-arrow-compression
udf-service-exception_policy
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index 3dd6df8132..43a5fa4870 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(predicate)
add_subdirectory(reader)
+add_subdirectory(storage)
add_subdirectory(ut)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -33,6 +34,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-tablet_flat
columnshard-engines-reader
columnshard-engines-predicate
+ columnshard-engines-storage
formats-arrow-compression
udf-service-exception_policy
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index bc23d223a5..8a29dbda94 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(predicate)
add_subdirectory(reader)
+add_subdirectory(storage)
add_subdirectory(ut)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -32,6 +33,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-tablet_flat
columnshard-engines-reader
columnshard-engines-predicate
+ columnshard-engines-storage
formats-arrow-compression
udf-service-exception_policy
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index de4b774b9c..49b49c68aa 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -113,20 +113,68 @@ private:
static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type);
};
+class ICompactionObjectCallback {
+public:
+ virtual ~ICompactionObjectCallback() = default;
+ virtual void OnCompactionStarted(const bool inGranule) = 0;
+ virtual void OnCompactionFinished() = 0;
+ virtual void OnCompactionFailed(const TString& reason) = 0;
+ virtual void OnCompactionCanceled(const TString& reason) = 0;
+ virtual TString DebugString() const = 0;
+};
+
struct TCompactionInfo {
- TSet<ui64> Granules;
- bool InGranule{false};
+private:
+ std::shared_ptr<ICompactionObjectCallback> CompactionObject;
+ mutable bool StatusProvided = false;
+ const bool InGranuleFlag = false;
+public:
+ TCompactionInfo(std::shared_ptr<ICompactionObjectCallback> compactionObject, const bool inGranule)
+ : CompactionObject(compactionObject)
+ , InGranuleFlag(inGranule)
+ {
+ Y_VERIFY(compactionObject);
+ CompactionObject->OnCompactionStarted(InGranuleFlag);
+ }
- bool Empty() const { return Granules.empty(); }
- bool Good() const { return Granules.size() == 1; }
+ bool InGranule() const {
+ return InGranuleFlag;
+ }
- friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) {
- if (info.Good() == 1) {
- ui64 granule = *info.Granules.begin();
- out << (info.InGranule ? "in granule" : "split granule") << " compaction of granule " << granule;
- } else {
- out << "wrong compaction of " << info.Granules.size() << " granules";
+ template <class T>
+ const T& GetObject() const {
+ auto result = dynamic_cast<const T*>(CompactionObject.get());
+ Y_VERIFY(result);
+ return *result;
+ }
+
+ void CompactionFinished() const {
+ Y_VERIFY(!StatusProvided);
+ StatusProvided = true;
+ CompactionObject->OnCompactionFinished();
+ }
+
+ void CompactionCanceled(const TString& reason) const {
+ Y_VERIFY(!StatusProvided);
+ StatusProvided = true;
+ CompactionObject->OnCompactionCanceled(reason);
+ }
+
+ void CompactionFailed(const TString& reason) const {
+ Y_VERIFY(!StatusProvided);
+ StatusProvided = true;
+ CompactionObject->OnCompactionFailed(reason);
+ }
+
+ ~TCompactionInfo() {
+ Y_VERIFY_DEBUG(StatusProvided);
+ if (!StatusProvided) {
+ CompactionObject->OnCompactionFailed("compaction unexpectedly finished");
}
+ }
+
+ friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) {
+ out << (info.InGranuleFlag ? "in granule" : "split granule") << " compaction of granule: " << info.CompactionObject->DebugString();
return out;
}
};
@@ -194,7 +242,7 @@ public:
return "insert";
case COMPACTION:
return CompactionInfo
- ? (CompactionInfo->InGranule ? "compaction in granule" : "compaction split granule" )
+ ? (CompactionInfo->InGranule() ? "compaction in granule" : "compaction split granule" )
: "compaction";
case CLEANUP:
return "cleanup";
@@ -593,7 +641,7 @@ public:
virtual std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot,
const THashSet<ui32>& columnIds,
const TPKRangesFilter& pkRangesFilter) const = 0;
- virtual std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) = 0;
+ virtual std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo,
const TSnapshot& outdatedSnapshot, const TCompactionLimits& limits) = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 1646b6e8d5..5503cef08a 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
+#include <ydb/library/conclusion/status.h>
#include <concepts>
@@ -12,7 +13,7 @@ namespace NKikimr::NOlap {
namespace {
-bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& portions, const TCompactionLimits& limits,
+TConclusionStatus InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& portions, const TCompactionLimits& limits,
const TSnapshot& snap, TColumnEngineForLogs::TMarksGranules& marksGranules) {
ui64 oldTimePlanStep = snap.GetPlanStep() - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds();
ui32 insertedCount = 0;
@@ -83,12 +84,12 @@ bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& por
Y_VERIFY(insertedCount);
// Trigger compaction if we have lots of inserted or if all inserted are old enough
if (insertedNew && insertedCount < limits.InGranuleCompactInserts) {
- return false;
+ return TConclusionStatus::Fail("not enough inserted portions (" + ::ToString(insertedNew) + ")");
}
// Nothing to filter. Leave portions as is, no borders needed.
if (filtered.empty() && goodCompacted.empty()) {
- return true;
+ return TConclusionStatus::Success();
}
// It's a map for SliceIntoGranules(). We use fake granule ids here to slice batch with borders.
@@ -124,7 +125,7 @@ bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& por
}
marksGranules = TColumnEngineForLogs::TMarksGranules(std::move(borders));
- return true;
+ return TConclusionStatus::Success();
}
} // namespace
@@ -188,7 +189,7 @@ TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<ar
TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits)
- : Limits(limits)
+ : GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, limits))
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0)
@@ -212,10 +213,7 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() {
Counters.Tables = PathGranules.size();
Counters.Granules = Granules.size();
Counters.EmptyGranules = EmptyGranules.size();
- Counters.OverloadedGranules = 0;
- for (const auto& [_, set] : PathsGranulesOverloaded) {
- Counters.OverloadedGranules += set.size();
- }
+ Counters.OverloadedGranules = GranulesStorage->GetOverloadedGranulesCount();
return Counters;
}
@@ -296,32 +294,18 @@ void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndex
bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) {
ClearIndex();
-
- if (!LoadGranules(db)) {
- return false;
- }
- if (!LoadColumns(db, lostBlobs)) {
- return false;
- }
- if (!LoadCounters(db)) {
- return false;
- }
-
-#if 0 // Clear index data
- for (auto& [granule, meta] : Granules) {
- for (auto& [portion, portionInfo] : meta->Portions) {
- for (auto& rec : portionInfo.Records) {
- ColumnsTable->Erase(db, rec);
- }
+ {
+ auto guard = GranulesStorage->StartPackModification();
+ if (!LoadGranules(db)) {
+ return false;
+ }
+ if (!LoadColumns(db, lostBlobs)) {
+ return false;
+ }
+ if (!LoadCounters(db)) {
+ return false;
}
- GranulesTable->Erase(db, meta->Record);
}
- CountersTable->Write(db, LAST_PORTION, 0);
- CountersTable->Write(db, LAST_GRANULE, 0);
- CountersTable->Write(db, LAST_PLAN_STEP, 0);
- CountersTable->Write(db, LAST_TX_ID, 0);
- return true;
-#endif
THashSet<ui64> emptyGranulePaths;
for (const auto& [granule, spg] : Granules) {
@@ -329,10 +313,9 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
EmptyGranules.insert(granule);
emptyGranulePaths.insert(spg->PathId());
} else {
- CompactionGranules.insert(granule);
CleanupGranules.insert(granule);
}
- for (const auto& [_, portionInfo] : spg->Portions) {
+ for (const auto& [_, portionInfo] : spg->GetPortions()) {
UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD);
}
}
@@ -357,8 +340,6 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
}
}
- UpdateOverloaded(Granules, Limits);
-
Y_VERIFY(!(LastPortion >> 63), "near to int overflow");
Y_VERIFY(!(LastGranule >> 63), "near to int overflow");
return true;
@@ -366,8 +347,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) {
auto callback = [&](const TGranuleRecord& rec) {
- bool ok = SetGranule(rec, true);
- Y_VERIFY(ok);
+ Y_VERIFY(SetGranule(rec, true));
};
return GranulesTable->Load(db, callback);
@@ -381,15 +361,9 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>&
lostBlobs.erase(rec.BlobRange.BlobId);
// Locate granule and append the record.
if (const auto gi = Granules.find(rec.Granule); gi != Granules.end()) {
- gi->second->Portions[rec.Portion].AddRecord(indexInfo, rec);
+ gi->second->AddColumnRecord(indexInfo, rec);
} else {
-#if 0
- LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId);
- Granules.erase(rec.Granule);
- return;
-#else
Y_VERIFY(false);
-#endif
}
});
}
@@ -429,7 +403,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(const TC
if (PathGranules.contains(pathId)) {
// Abort inserting if the path has overloaded granules.
- if (PathsGranulesOverloaded.contains(pathId)) {
+ if (GranulesStorage->GetOverloaded(pathId)) {
return {};
}
@@ -455,44 +429,39 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
const TSnapshot& outdatedSnapshot,
const TCompactionLimits& limits) {
Y_VERIFY(info);
- Y_VERIFY(info->Granules.size() == 1);
auto changes = TChanges::BuildCompactionChanges(DefaultMark(), std::move(info), limits, LastSnapshot);
- const ui64 granule = *changes->CompactionInfo->Granules.begin();
- const auto gi = Granules.find(granule);
- // Check granule exists.
- Y_VERIFY(gi != Granules.end());
+ const auto& granuleInfo = changes->CompactionInfo->GetObject<TGranuleMeta>();
- changes->SwitchedPortions.reserve(gi->second->Portions.size());
+ changes->SwitchedPortions.reserve(granuleInfo.GetPortions().size());
// Collect active portions for the granule.
- for (const auto& [_, portionInfo] : gi->second->Portions) {
+ for (const auto& [_, portionInfo] : granuleInfo.GetPortions()) {
if (portionInfo.IsActive()) {
changes->SwitchedPortions.push_back(portionInfo);
- Y_VERIFY(portionInfo.Granule() == granule);
+ Y_VERIFY(portionInfo.Granule() == granuleInfo.GetGranuleId());
}
}
- const ui64 pathId = gi->second->Record.PathId;
+ const ui64 pathId = granuleInfo.Record.PathId;
Y_VERIFY(PathGranules.contains(pathId));
// Locate mark for the granule.
for (const auto& [mark, pathGranule] : PathGranules[pathId]) {
- if (pathGranule == granule) {
- changes->SrcGranule = TChanges::TSrcGranule(pathId, granule, mark);
+ if (pathGranule == granuleInfo.GetGranuleId()) {
+ changes->SrcGranule = TChanges::TSrcGranule(pathId, granuleInfo.GetGranuleId(), mark);
break;
}
}
Y_VERIFY(changes->SrcGranule);
- if (changes->CompactionInfo->InGranule) {
+ if (changes->CompactionInfo->InGranule()) {
const TSnapshot completedSnap = std::max(LastSnapshot, outdatedSnapshot);
- if (!InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, limits, completedSnap, changes->MergeBorders)) {
+ auto mergeInitResult = InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, limits, completedSnap, changes->MergeBorders);
+ if (!mergeInitResult) {
// Return granule to Compaction list. This is equal to single compaction worker behaviour.
- CompactionGranules.insert(granule);
+ changes->CompactionInfo->CompactionCanceled("cannot init in granule merge: " + mergeInitResult.GetErrorMessage());
return {};
}
- } else {
- GranulesInSplit.insert(granule);
}
Y_VERIFY(!changes->SwitchedPortions.empty());
@@ -519,7 +488,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
Y_VERIFY(Granules.contains(granule));
auto spg = Granules[granule];
Y_VERIFY(spg);
- for (auto& [portion, info] : spg->Portions) {
+ for (auto& [portion, info] : spg->GetPortions()) {
affectedRecords += info.NumRecords();
changes->PortionsToDrop.push_back(info);
dropPortions.insert(portion);
@@ -550,7 +519,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
Y_VERIFY(spg);
bool isClean = true;
- for (auto& [portion, info] : spg->Portions) {
+ for (auto& [portion, info] : spg->GetPortions()) {
if (info.IsActive() || dropPortions.contains(portion)) {
continue;
}
@@ -612,7 +581,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
auto spg = Granules[granule];
Y_VERIFY(spg);
- for (auto& [portion, info] : spg->Portions) {
+ for (auto& [portion, info] : spg->GetPortions()) {
if (!info.IsActive()) {
continue;
}
@@ -690,37 +659,6 @@ std::vector<std::vector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGran
return emptyGranules;
}
-void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const TCompactionLimits& limits) {
- for (const auto& [granule, spg] : granules) {
- const ui64 pathId = spg->Record.PathId;
-
- ui64 size = 0;
- // Calculate byte-size of active portions.
- for (const auto& [_, portionInfo] : spg->Portions) {
- if (portionInfo.IsActive()) {
- size += portionInfo.BlobsBytes();
- }
- }
-
- // Size exceeds the configured limit. Mark granule as overloaded.
- if (size >= limits.GranuleOverloadSize) {
- if (PathsGranulesOverloaded[pathId].emplace(granule).second) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "overloaded")("path_id", pathId)("granule", granule);
- }
- } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
- // Size is under limit. Remove granule from the overloaded set.
- if (pi->second.erase(granule)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "unoverloaded")("path_id", pathId)("granule", granule)("remained", pi->second.size());
- }
- // Remove entry for the pathId if there it has no overloaded granules any more.
- if (pi->second.empty()) {
- PathsGranulesOverloaded.erase(pi);
- }
- }
- }
- SignalCounters.OverloadGranules->Set(PathsGranulesOverloaded.size());
-}
-
bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) {
auto changes = std::static_pointer_cast<TChanges>(indexChanges);
@@ -737,8 +675,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
// If it's a split compaction with moves appended portions are INSERTED (could have overlaps with others)
if (changes->IsCompaction() && changes->PortionsToMove.empty()) {
Y_VERIFY(changes->CompactionInfo);
- produced = changes->CompactionInfo->InGranule ?
- TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED;
+ produced = changes->CompactionInfo->InGranule() ? TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED;
}
portionInfo.UpdateRecordsMeta(indexInfo, produced);
@@ -763,22 +700,15 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
}
if (!ApplyChanges(db, *changes, snapshot, false)) { // validate only
- if (changes->IsCompaction()) {
- // Return granule to Compaction list. This is equal to single compaction worker behaviour.
- for (const auto& portionInfo : changes->SwitchedPortions) {
- CompactionGranules.insert(portionInfo.Granule());
- }
+ if (changes->CompactionInfo) {
+ changes->CompactionInfo->CompactionFailed("cannot apply changes");
}
return false;
}
bool ok = ApplyChanges(db, *changes, snapshot, true);
Y_VERIFY(ok);
-
- // Save updated granules for comapaction
- if (changes->IsInsert()) {
- for (auto& portionInfo : changes->AppendedPortions) {
- CompactionGranules.insert(portionInfo.Granule());
- }
+ if (changes->CompactionInfo) {
+ changes->CompactionInfo->CompactionFinished();
}
// Save updated granules for cleanup
@@ -788,38 +718,6 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
CleanupGranules.insert(portionInfo.Granule());
}
}
-
- // Update overloaded granules (only if tx would be applyed)
- if (changes->IsInsert() || changes->IsCompaction() || changes->IsCleanup()) {
- THashMap<ui64, std::shared_ptr<TGranuleMeta>> granules;
-
- const auto emplace_granule = [&](const ui64 id) {
- // Lookup granule in the global table.
- const auto gi = Granules.find(id);
- // Granule should exists.
- Y_VERIFY(gi != Granules.end());
- // Emplace granule.
- granules.emplace(id, gi->second);
- };
-
- if (changes->IsCleanup()) {
- granules.reserve(changes->PortionsToDrop.size());
-
- for (const auto& portionInfo : changes->PortionsToDrop) {
- emplace_granule(portionInfo.Granule());
- }
- } else if (changes->IsCompaction() && !changes->CompactionInfo->InGranule) {
- emplace_granule(changes->SrcGranule->Granule);
- } else {
- granules.reserve(changes->AppendedPortions.size());
-
- for (const auto& portionInfo : changes->AppendedPortions) {
- emplace_granule(portionInfo.Granule());
- }
- }
-
- UpdateOverloaded(granules, indexChanges->Limits);
- }
return true;
}
@@ -829,14 +727,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
Y_VERIFY(changes.CompactionInfo);
switchedPortions = &changes.SwitchedPortions;
- if (changes.CompactionInfo->InGranule) {
-#if 0
- if (changes.SwitchedPortions.size() <= changes.AppendedPortions.size()) {
- LOG_S_ERROR("Cannot compact granule " << changes.SrcGranule->Granule << " at tablet " << TabletId);
- return false;
- }
-#endif
- } else {
+ if (!changes.CompactionInfo->InGranule()) {
if (changes.NewGranules.empty()) {
LOG_S_ERROR("Cannot split granule " << changes.SrcGranule->Granule << " at tablet " << TabletId);
return false;
@@ -876,7 +767,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
LOG_S_DEBUG("Cannot update unknown granule " << granule << " at tablet " << TabletId);
return false;
}
- if (!Granules[granule]->Portions.contains(portion)) {
+ if (!Granules[granule]->GetPortions().contains(portion)) {
LOG_S_ERROR("Cannot update unknown portion " << portionInfo << " at tablet " << TabletId);
return false;
}
@@ -916,14 +807,14 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
ui64 granule = portionInfo.Granule();
ui64 portion = portionInfo.Portion();
- if (!Granules.contains(granule) || !Granules[granule]->Portions.contains(portion)) {
+ if (!Granules.contains(granule) || !Granules[granule]->GetPortions().contains(portion)) {
LOG_S_ERROR("Cannot evict unknown portion " << portionInfo << " at tablet " << TabletId);
return false;
}
// In case of race with compaction portion could become inactive
// TODO: evict others instead of abort eviction
- auto& oldInfo = Granules[granule]->Portions[portion];
+ const TPortionInfo& oldInfo = Granules[granule]->GetPortionVerified(portion);
if (!oldInfo.IsActive()) {
LOG_S_WARN("Cannot evict inactive portion " << oldInfo << " at tablet " << TabletId);
return false;
@@ -1045,10 +936,8 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
void TColumnEngineForLogs::FreeLocks(std::shared_ptr<TColumnEngineChanges> indexChanges) {
auto changes = std::static_pointer_cast<TChanges>(indexChanges);
- if (changes->IsCompaction()) {
- // Set granule not in split. Do not block writes in it.
- Y_VERIFY(changes->SrcGranule);
- GranulesInSplit.erase(changes->SrcGranule->Granule);
+ if (changes->CompactionInfo) {
+ changes->CompactionInfo->GetObject<TGranuleMeta>().AllowedInsertion();
}
}
@@ -1060,7 +949,7 @@ bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) {
Y_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second);
// Allocate granule info and ensure that there is no granule with same id inserted before.
- Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec)).second);
+ Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage)).second);
} else {
// Granule with same id already exists.
if (Granules.contains(rec.Granule)) {
@@ -1078,11 +967,11 @@ bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) {
void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) {
Y_VERIFY(PathGranules.contains(pathId));
- Y_VERIFY(Granules.contains(granule));
-
- Granules.erase(granule);
+ auto it = Granules.find(granule);
+ Y_VERIFY(it != Granules.end());
+ Y_VERIFY(it->second->IsErasable());
+ Granules.erase(it);
EmptyGranules.erase(granule);
- CompactionGranules.erase(granule);
PathGranules[pathId].erase(mark);
}
@@ -1099,13 +988,12 @@ bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool a
}
Y_VERIFY(portionInfo.Valid());
- ui64 portion = portionInfo.Portion();
auto& spg = Granules[granule];
Y_VERIFY(spg);
if (updateStats) {
UpdatePortionStats(portionInfo);
}
- spg->Portions[portion] = portionInfo;
+ spg->UpsertPortion(portionInfo);
return true; // It must return true if (apply == true)
}
@@ -1115,7 +1003,7 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap
ui64 portion = portionInfo.Portion();
if (!apply) {
- if (!Granules.contains(granule) || !Granules[granule]->Portions.contains(portion)) {
+ if (!Granules.contains(granule) || !Granules[granule]->GetPortions().contains(portion)) {
LOG_S_ERROR("Cannot erase unknown portion " << portionInfo << " at tablet " << TabletId);
return false;
}
@@ -1124,12 +1012,11 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap
auto& spg = Granules[granule];
Y_VERIFY(spg);
- Y_VERIFY(spg->Portions.contains(portion));
if (updateStats) {
- UpdatePortionStats(spg->Portions[portion], EStatsUpdateType::ERASE);
+ UpdatePortionStats(spg->GetPortionVerified(portion), EStatsUpdateType::ERASE);
}
- spg->Portions.erase(portion);
+ Y_VERIFY(spg->ErasePortion(portion));
return true; // It must return true if (apply == true)
}
@@ -1139,9 +1026,9 @@ bool TColumnEngineForLogs::CanInsert(const TChanges& changes, const TSnapshot& c
// Does insert have granule in split?
for (const auto& portionInfo : changes.AppendedPortions) {
Y_VERIFY(!portionInfo.Empty());
- ui64 granule = portionInfo.Granule();
- if (GranulesInSplit.contains(granule)) {
- LOG_S_NOTICE("Cannot insert into splitting granule " << granule << " at tablet " << TabletId);
+ auto g = GetGranuleOptional(portionInfo.Granule());
+ if (!!g && !g->IsInsertAllowed()) {
+ LOG_S_NOTICE("Cannot insert into splitting granule " << portionInfo.Granule() << " at tablet " << TabletId);
return false;
}
}
@@ -1238,7 +1125,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
Y_VERIFY(it != Granules.end());
auto& spg = it->second;
Y_VERIFY(spg);
- auto& portions = spg->Portions;
+ auto& portions = spg->GetPortions();
bool granuleHasDataForSnaphsot = false;
TMap<TSnapshot, std::vector<const TPortionInfo*>> orderedPortions = GroupPortionsBySnapshot(portions, snapshot);
@@ -1278,107 +1165,37 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
return out;
}
-static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompactionLimits& limits, bool& inserted) {
- ui64 sumSize = 0;
- ui64 sumMaxSize = 0;
- size_t activeCount = 0;
- THashSet<NArrow::TReplaceKey> borders;
- bool differentBorders = false;
-
- for (const auto& [_, info] : portions) {
- // We need only actual portions here (with empty XPlanStep:XTxId)
- if (info.IsActive()) {
- ++activeCount;
- } else {
- continue;
- }
-
- // Find at least 2 unique borders
- if (!differentBorders) {
- borders.insert(info.IndexKeyStart());
- borders.insert(info.IndexKeyEnd());
- differentBorders = (borders.size() > 1);
- }
-
- auto sizes = info.BlobsSizes();
- sumSize += sizes.first;
- sumMaxSize += sizes.second;
- if (info.IsInserted()) {
- inserted = true;
+std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompactionLimits& limits) {
+ const auto filter = [&](const ui64 granuleId) {
+ std::shared_ptr<TGranuleMeta> compactGranule = GetGranulePtrVerified(granuleId);
+ if (!compactGranule->NeedCompaction(limits)) {
+ return false;
}
- }
-
- // Do nothing if count of active portions is less than two.
- if (activeCount < 2) {
- inserted = false;
- return false;
- }
-
- return differentBorders && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize);
-}
-
-std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) {
- if (CompactionGranules.empty()) {
+ return true;
+ };
+ auto gCompaction = GranulesStorage->GetGranuleForCompaction(filter);
+ if (!gCompaction) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_granule_for_compaction");
+ SignalCounters.NoCompactGranulesSelection->Add(1);
return {};
}
+ std::shared_ptr<TGranuleMeta> compactGranule = GetGranulePtrVerified(*gCompaction);
- ui64 granule = 0;
- bool inGranule = true;
-
- if (PathsGranulesOverloaded.size()) {
- {
- auto it = PathsGranulesOverloaded.begin();
- Y_VERIFY(it->second.size());
- granule = *it->second.begin();
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_overload")("path_id", it->first)("granules_count", it->second.size())("granule", granule)("paths_count", PathsGranulesOverloaded.size());
- SignalCounters.CompactOverloadGranulesSelection->Add(1);
- }
-
- const auto gi = Granules.find(granule);
- Y_VERIFY(gi != Granules.end());
-
- bool inserted = false;
- Y_VERIFY(NeedSplit(gi->second->Portions, limits, inserted));
- inGranule = false;
- Y_VERIFY(CompactionGranules.erase(granule));
- SignalCounters.SplitCompactGranulesSelection->Add(1);
- } else {
- for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty() && !granule;) {
- // Start from the beginning if the end is reached.
- if (it == CompactionGranules.end()) {
- it = CompactionGranules.begin();
- }
-
- const auto gi = Granules.find(*it);
- // Check granule exists.
- Y_VERIFY(gi != Granules.end());
-
- bool inserted = false;
- if (NeedSplit(gi->second->Portions, limits, inserted)) {
- inGranule = false;
- granule = *it;
- SignalCounters.SplitCompactGranulesSelection->Add(1);
- } else if (inserted) {
- granule = *it;
- SignalCounters.InternalCompactGranulesSelection->Add(1);
- }
-
- // Nothing to compact in the current granule. Throw it.
- it = CompactionGranules.erase(it);
- }
- if (granule) {
- lastCompactedGranule = granule;
- }
+ if (compactGranule->IsOverloaded(limits)) {
+ SignalCounters.CompactOverloadGranulesSelection->Add(1);
}
- if (granule) {
- auto info = std::make_unique<TCompactionInfo>();
- info->Granules.insert(granule);
- info->InGranule = inGranule;
- return info;
+ if (compactGranule->NeedSplitCompaction(limits)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("granule", compactGranule->DebugString())("compaction", "split");
+ SignalCounters.SplitCompactGranulesSelection->Add(1);
+ } else if (compactGranule->NeedInternalCompaction(limits)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("granule", compactGranule->DebugString())("compaction", "internal");
+ SignalCounters.InternalCompactGranulesSelection->Add(1);
} else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("granule", compactGranule->DebugString())("compaction", "no_need");
SignalCounters.NoCompactGranulesSelection->Add(1);
+ return {};
}
- return {};
+ return std::make_unique<TCompactionInfo>(compactGranule, compactGranule->NeedInternalCompaction(limits));
}
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 409a6ae848..3130ebcae2 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -4,6 +4,8 @@
#include "column_engine.h"
#include "scalars.h"
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
+#include "storage/granule.h"
+#include "storage/storage.h"
namespace NKikimr::NArrow {
struct TSortDescription;
@@ -24,6 +26,7 @@ class TCountersTable;
class TColumnEngineForLogs : public IColumnEngine {
private:
const NColumnShard::TEngineLogsCounters SignalCounters;
+ std::shared_ptr<TGranulesStorage> GranulesStorage;
public:
class TMarksGranules {
public:
@@ -56,6 +59,14 @@ public:
};
public:
+ const TGranuleMeta* GetGranuleMeta() const {
+ if (CompactionInfo) {
+ return &CompactionInfo->GetObject<TGranuleMeta>();
+ } else {
+ return nullptr;
+ }
+ }
+
struct TSrcGranule {
ui64 PathId{0};
ui64 Granule{0};
@@ -187,13 +198,10 @@ public:
}
const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override {
- if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
- return &pi->second;
- }
- return nullptr;
+ return GranulesStorage->GetOverloaded(pathId);
}
- bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); }
+ bool HasOverloadedGranules() const override { return GranulesStorage->HasOverloadedGranules(); }
TString SerializeMark(const NArrow::TReplaceKey& key) const override {
if (UseCompositeMarks()) {
@@ -239,25 +247,32 @@ public:
std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot,
const THashSet<ui32>& columnIds,
const TPKRangesFilter& pkRangesFilter) const override;
- std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) override;
+ std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits) override;
private:
using TMarksMap = std::map<TMark, ui64, TMark::TCompare>;
- struct TGranuleMeta {
- const TGranuleRecord Record;
- THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo
+ const TGranuleMeta& GetGranuleVerified(const ui64 granuleId) const {
+ auto it = Granules.find(granuleId);
+ Y_VERIFY(it != Granules.end());
+ return *it->second;
+ }
- explicit TGranuleMeta(const TGranuleRecord& rec)
- : Record(rec)
- {}
+ std::shared_ptr<TGranuleMeta> GetGranulePtrVerified(const ui64 granuleId) const {
+ auto result = GetGranuleOptional(granuleId);
+ Y_VERIFY(result);
+ return result;
+ }
- ui64 PathId() const noexcept { return Record.PathId; }
- bool Empty() const noexcept { return Portions.empty(); }
- };
+ std::shared_ptr<TGranuleMeta> GetGranuleOptional(const ui64 granuleId) const {
+ auto it = Granules.find(granuleId);
+ if (it == Granules.end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
TVersionedIndex VersionedIndex;
- const TCompactionLimits Limits;
ui64 TabletId;
std::shared_ptr<TGranulesTable> GranulesTable;
std::shared_ptr<TColumnsTable> ColumnsTable;
@@ -265,12 +280,9 @@ private:
THashMap<ui64, std::shared_ptr<TGranuleMeta>> Granules; // granule -> meta
THashMap<ui64, TMarksMap> PathGranules; // path_id -> {mark, granule}
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
- THashSet<ui64> GranulesInSplit;
/// Set of empty granules.
/// Just for providing count of empty granules.
THashSet<ui64> EmptyGranules;
- THashMap<ui64, THashSet<ui64>> PathsGranulesOverloaded;
- TSet<ui64> CompactionGranules;
THashSet<ui64> CleanupGranules;
TColumnEngineStats Counters;
ui64 LastPortion;
@@ -298,10 +310,7 @@ private:
Granules.clear();
PathGranules.clear();
PathStats.clear();
- GranulesInSplit.clear();
EmptyGranules.clear();
- PathsGranulesOverloaded.clear();
- CompactionGranules.clear();
CleanupGranules.clear();
Counters.Clear();
@@ -326,7 +335,6 @@ private:
EStatsUpdateType updateType) const;
bool CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const;
- void UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const TCompactionLimits& limits);
/// Return lists of adjacent empty granules for the path.
std::vector<std::vector<std::pair<TMark, ui64>>> EmptyGranuleTracks(const ui64 pathId) const;
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 288d5883d1..69ce114e3a 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -104,6 +104,16 @@ TString TIndexInfo::GetColumnName(ui32 id, bool required) const {
}
}
+std::vector<ui32> TIndexInfo::GetColumnIds() const {
+ std::vector<ui32> result;
+ for (auto&& i : Columns) {
+ result.emplace_back(i.first);
+ }
+ result.emplace_back((ui32)ESpecialColumn::PLAN_STEP);
+ result.emplace_back((ui32)ESpecialColumn::TX_ID);
+ return result;
+}
+
std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) const {
std::vector<TString> out;
out.reserve(ids.size());
@@ -181,19 +191,18 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString&
}
std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema() const {
- if (Schema) {
- return Schema;
- }
+ if (!Schema) {
+ std::vector<ui32> ids;
+ ids.reserve(Columns.size());
+ for (const auto& [id, _] : Columns) {
+ ids.push_back(id);
+ }
- std::vector<ui32> ids;
- ids.reserve(Columns.size());
- for (const auto& [id, _] : Columns) {
- ids.push_back(id);
+ // The ids had a set type before so we keep them sorted.
+ std::sort(ids.begin(), ids.end());
+ Schema = MakeArrowSchema(Columns, ids);
}
- // The ids had a set type before so we keep them sorted.
- std::sort(ids.begin(), ids.end());
- Schema = MakeArrowSchema(Columns, ids);
return Schema;
}
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 881dd26fc9..449ff1b64e 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -215,6 +215,7 @@ public:
/// Returns names of columns defined by the specific ids.
std::vector<TString> GetColumnNames(const std::vector<ui32>& ids) const;
+ std::vector<ui32> GetColumnIds() const;
/// Returns info of columns defined by specific ids.
std::vector<TNameTypeInfo> GetColumns(const std::vector<ui32>& ids) const;
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index 9eadd403d6..7288ee2e49 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -50,7 +50,7 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
for (auto& rec : portionInfo.Records) {
auto pos = resultSchema->GetFieldIndex(rec.ColumnId);
Y_VERIFY(pos >= 0);
- auto field = resultSchema->GetField(pos);
+ auto field = resultSchema->GetFieldByIndex(pos);
auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext);
auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver);
@@ -75,7 +75,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 granule,
const TSnapshot& snapshot,
- std::vector<TString>& blobs) const {
+ std::vector<TString>& blobs, const TGranuleMeta* granuleMeta) const {
Y_VERIFY(batch->num_rows());
auto resultSchema = SchemaVersions.GetSchema(snapshot);
@@ -101,34 +101,48 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
TPortionInfo portionInfo;
portionInfo.Records.reserve(resultSchema->GetSchema()->num_fields());
std::vector<TString> portionBlobs;
- portionBlobs.reserve(resultSchema->GetSchema()->num_fields());
+ portionBlobs.resize(resultSchema->GetSchema()->num_fields());
// Serialize portion's columns into blobs
bool ok = true;
- for (const auto& field : resultSchema->GetSchema()->fields()) {
- const auto& name = field->name();
- ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(name);
+
+ std::vector<TColumnSummary> sortedColumnIds;
+ if (granuleMeta) {
+ sortedColumnIds = granuleMeta->GetSummary().GetColumnIdsSortedBySizeDescending();
+ } else {
+ for (auto&& i : resultSchema->GetIndexInfo().GetColumnIds()) {
+ sortedColumnIds.emplace_back(TColumnSummary(i, 0));
+ }
+ }
+
+ ui32 fillCounter = 0;
+ for (const auto& columnSummary : sortedColumnIds) {
+ const TString& columnName = resultSchema->GetIndexInfo().GetColumnName(columnSummary.GetColumnId());
+ const int idx = resultSchema->GetFieldIndex(columnSummary.GetColumnId());
+ Y_VERIFY(idx >= 0);
+ auto field = resultSchema->GetFieldByIndex(idx);
+ Y_VERIFY(field);
+ auto array = portionBatch->GetColumnByName(columnName);
+ Y_VERIFY(array);
/// @warnign records are not valid cause of empty BlobId and zero Portion
- TColumnRecord record = TColumnRecord::Make(granule, columnId, snapshot, 0);
- auto columnSaver = resultSchema->GetColumnSaver(name, saverContext);
- auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver, Counters);
- if (!blob.size()) {
- Counters.TrashDataSerializationBytes->Add(blob.size());
- Counters.TrashDataSerialization->Add(1);
+ TColumnRecord record = TColumnRecord::Make(granule, columnSummary.GetColumnId(), snapshot, 0);
+ auto columnSaver = resultSchema->GetColumnSaver(columnSummary.GetColumnId(), saverContext);
+ TString blob = TPortionInfo::SerializeColumnWithLimit(array, field, columnSaver, Counters);
+ if (!blob) {
ok = false;
break;
- } else {
- Counters.CorrectDataSerializationBytes->Add(blob.size());
- Counters.CorrectDataSerialization->Add(1);
}
- // TODO: combine small columns in one blob
- portionBlobs.emplace_back(std::move(blob));
+ portionInfo.InsertOneChunkColumn(idx, std::move(record));
+
+ portionBlobs[idx] = std::move(blob);
+ ++fillCounter;
}
if (ok) {
+ Y_VERIFY(fillCounter == portionBlobs.size());
portionInfo.AddMetadata(*resultSchema, portionBatch, tierName);
out.emplace_back(std::move(portionInfo));
for (auto& blob : portionBlobs) {
@@ -268,9 +282,9 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
// We could merge data here cause tablet limits indexing data portions
#if 0
- auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortDescription()); // insert: no replace
- Y_VERIFY(merged);
- Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey()));
+ auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortDescription()); // insert: no replace
+ Y_VERIFY(merged);
+ Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey()));
#else
auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
Y_VERIFY(merged);
@@ -280,7 +294,7 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], resultSchema->GetIndexInfo());
for (auto& [granule, batch] : granuleBatches) {
- auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs);
+ auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta());
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -338,13 +352,13 @@ std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnE
if (!slice || slice->num_rows() == 0) {
continue;
}
- auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs);
+ auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, changes->GetGranuleMeta());
for (auto&& portionInfo : tmp) {
portions.emplace_back(std::move(portionInfo));
}
}
} else {
- portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs);
+ portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta());
}
Y_VERIFY(portions.size() > 0);
@@ -689,10 +703,9 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) {
ui64 tmpGranule = changes->SetTmpGranule(pathId, mark);
-
for (const auto& batch : idBatches[id]) {
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs);
+ auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, changes->GetGranuleMeta());
Y_VERIFY(newPortions.size() > 0);
for (auto& portion : newPortions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -708,7 +721,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
ui64 tmpGranule = changes->SetTmpGranule(pathId, ts);
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs);
+ auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, changes->GetGranuleMeta());
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -721,7 +734,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
bool TCompactionLogic::IsSplit(std::shared_ptr<TColumnEngineChanges> changes) {
auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes);
- return !castedChanges->CompactionInfo->InGranule;
+ return !castedChanges->CompactionInfo->InGranule();
}
std::vector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const {
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h
index dd5d896169..abaf034e86 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.h
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h
@@ -14,7 +14,8 @@ protected:
private:
const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr;
public:
- TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, const NColumnShard::TIndexationCounters& counters)
+ TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
+ const NColumnShard::TIndexationCounters& counters)
: SchemaVersions(indexInfo)
, Counters(counters)
, TieringMap(&tieringMap)
@@ -40,7 +41,7 @@ protected:
const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 granule,
const TSnapshot& minSnapshot,
- std::vector<TString>& blobs) const;
+ std::vector<TString>& blobs, const TGranuleMeta* granuleMeta) const;
static std::shared_ptr<arrow::RecordBatch> GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch,
const TIndexInfo& indexInfo);
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 0de422f5db..1d7f6f0fea 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -18,7 +18,7 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps
auto columnId = GetIndexInfo().GetColumnId(resultField->name());
auto oldColumnIndex = dataSchema.GetFieldIndex(columnId);
if (oldColumnIndex >= 0) { // ClumnExists
- auto oldColumnInfo = dataSchema.GetField(oldColumnIndex);
+ auto oldColumnInfo = dataSchema.GetFieldByIndex(oldColumnIndex);
Y_VERIFY(oldColumnInfo);
auto columnData = batch->GetColumnByName(oldColumnInfo->name());
Y_VERIFY(columnData);
@@ -41,12 +41,9 @@ TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array
return saver.Apply(batch);
}
-TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array,
- const std::shared_ptr<arrow::Field>& field,
- TColumnRecord&& record,
- const TColumnSaver saver,
- const NColumnShard::TIndexationCounters& counters,
- const ui32 limitBytes) {
+TString TPortionInfo::SerializeColumnWithLimit(const std::shared_ptr<arrow::Array>& array,
+ const std::shared_ptr<arrow::Field>& field,
+ const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, const ui32 limitBytes) {
auto blob = SerializeColumn(array, field, saver);
if (blob.size() >= limitBytes) {
counters.TrashDataSerializationBytes->Add(blob.size());
@@ -56,10 +53,15 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr
counters.CorrectDataSerializationBytes->Add(blob.size());
counters.CorrectDataSerialization->Add(1);
}
+ return blob;
+}
+void TPortionInfo::InsertOneChunkColumn(const ui32 idx, TColumnRecord&& record) {
record.Chunk = 0;
- Records.emplace_back(std::move(record));
- return blob;
+ if (Records.size() <= idx) {
+ Records.resize(idx + 1);
+ }
+ Records[idx] = std::move(record);
}
void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 149a1496c0..96cac1c77c 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -34,7 +34,16 @@ public:
virtual ui32 GetColumnId(const std::string& columnName) const = 0;
virtual int GetFieldIndex(const ui32 columnId) const = 0;
- virtual std::shared_ptr<arrow::Field> GetField(const int index) const = 0;
+ std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const {
+ auto schema = GetSchema();
+ if (!schema || index < 0 || index >= schema->num_fields()) {
+ return nullptr;
+ }
+ return schema->field(index);
+ }
+ std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const {
+ return GetFieldByIndex(GetFieldIndex(columnId));
+ }
virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0;
virtual const TIndexInfo& GetIndexInfo() const = 0;
virtual const TSnapshot& GetSnapshot() const = 0;
@@ -68,7 +77,7 @@ public:
}
virtual int GetFieldIndex(const ui32 columnId) const override {
- TString columnName = IndexInfo.GetColumnName(columnId, false);
+ const TString& columnName = IndexInfo.GetColumnName(columnId, false);
if (!columnName) {
return -1;
}
@@ -76,10 +85,6 @@ public:
return Schema->GetFieldIndex(name);
}
- std::shared_ptr<arrow::Field> GetField(const int index) const override {
- return Schema->field(index);
- }
-
const std::shared_ptr<arrow::Schema>& GetSchema() const override {
return Schema;
}
@@ -142,10 +147,6 @@ public:
return Schema->GetFieldIndex(name);
}
- std::shared_ptr<arrow::Field> GetField(const int index) const override {
- return Schema->field(index);
- }
-
const std::shared_ptr<arrow::Schema>& GetSchema() const override {
return Schema;
}
@@ -550,8 +551,8 @@ public:
for (auto& [pos, orderedChunks] : columnChunks) {
Y_VERIFY(positionsMap.contains(pos));
size_t dataPos = positionsMap[pos];
- auto portionField = dataSchema.GetField(dataPos);
- auto resultField = resultSchema.GetField(pos);
+ auto portionField = dataSchema.GetFieldByIndex(dataPos);
+ auto resultField = resultSchema.GetFieldByIndex(pos);
Y_VERIFY(portionField->IsCompatibleWith(*resultField));
@@ -581,15 +582,13 @@ public:
}
static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
- const std::shared_ptr<arrow::Field>& field,
- const TColumnSaver saver);
+ const std::shared_ptr<arrow::Field>& field,
+ const TColumnSaver saver);
+ static TString SerializeColumnWithLimit(const std::shared_ptr<arrow::Array>& array,
+ const std::shared_ptr<arrow::Field>& field,
+ const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, const ui32 sizeLimit = BLOB_BYTES_LIMIT);
- TString AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array,
- const std::shared_ptr<arrow::Field>& field,
- TColumnRecord&& record,
- const TColumnSaver saver,
- const NColumnShard::TIndexationCounters& counters,
- ui32 limitBytes = BLOB_BYTES_LIMIT);
+ void InsertOneChunkColumn(const ui32 idx, TColumnRecord&& record);
friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) {
for (auto& rec : info.Records) {
@@ -597,6 +596,7 @@ public:
out << " (1 of " << info.Records.size() << " blobs shown)";
break;
}
+ out << ";activity=" << info.IsActive() << ";";
if (!info.TierName.empty()) {
out << " tier: " << info.TierName;
}
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..566f967553
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-storage)
+target_link_libraries(columnshard-engines-storage PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(columnshard-engines-storage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/storage.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..bafe85e672
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-storage)
+target_link_libraries(columnshard-engines-storage PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(columnshard-engines-storage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/storage.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..bafe85e672
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-storage)
+target_link_libraries(columnshard-engines-storage PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(columnshard-engines-storage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/storage.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..566f967553
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-storage)
+target_link_libraries(columnshard-engines-storage PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(columnshard-engines-storage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/storage.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
new file mode 100644
index 0000000000..1a3ed38214
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -0,0 +1,123 @@
+#include "granule.h"
+#include "storage.h"
+
+namespace NKikimr::NOlap {
+
+bool TGranuleMeta::NeedSplit(const TCompactionLimits& limits, bool& inserted) const {
+ inserted = GetSummary().GetInserted().GetPortionsCount();
+ bool differentBorders = GetSummary().GetDifferentBorders();
+ if (GetSummary().GetActivePortionsCount() < 2) {
+ inserted = false;
+ return false;
+ }
+ return differentBorders && (GetSummary().GetMaxColumnsSize() >= limits.GranuleBlobSplitSize || GetSummary().GetGranuleSize() >= limits.GranuleOverloadSize);
+}
+
+ui64 TGranuleMeta::Size() const {
+ return GetSummary().GetGranuleSize();
+}
+
+void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
+ Portions[info.Portion()] = info;
+ OnAfterChangePortion(info.Portion());
+}
+
+bool TGranuleMeta::ErasePortion(const ui64 portion) {
+ auto it = Portions.find(portion);
+ if (it == Portions.end()) {
+ return false;
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second)("pathId", Record.PathId);
+ Portions.erase(it);
+ OnAfterChangePortion(portion);
+ return true;
+}
+
+void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) {
+ Portions[rec.Portion].AddRecord(indexInfo, rec);
+ OnAfterChangePortion(rec.Portion);
+}
+
+void TGranuleMeta::OnAfterChangePortion(const ui64 /*portion*/) {
+ ResetCaches();
+ Owner->UpdateGranuleInfo(*this);
+}
+
+void TGranuleMeta::OnCompactionFinished() {
+ AllowInsertionFlag = false;
+ Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnCompactionFinished")("info", DebugString());
+ CompactionPriorityInfo.OnCompactionFinished();
+ Owner->UpdateGranuleInfo(*this);
+}
+
+void TGranuleMeta::OnCompactionFailed(const TString& reason) {
+ AllowInsertionFlag = false;
+ Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction));
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "OnCompactionFailed")("reason", reason)("info", DebugString());
+ CompactionPriorityInfo.OnCompactionFailed();
+ Owner->UpdateGranuleInfo(*this);
+}
+
+void TGranuleMeta::OnCompactionCanceled(const TString& reason) {
+ AllowInsertionFlag = false;
+ Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction));
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "OnCompactionCanceled")("reason", reason)("info", DebugString());
+ CompactionPriorityInfo.OnCompactionCanceled();
+ Owner->UpdateGranuleInfo(*this);
+}
+
+void TGranuleMeta::OnCompactionStarted(const bool inGranule) {
+ AllowInsertionFlag = false;
+ Y_VERIFY(Activity.empty());
+ if (inGranule) {
+ Activity.emplace(EActivity::InternalCompaction);
+ } else {
+ Activity.emplace(EActivity::SplitCompaction);
+ }
+}
+
+void TGranuleMeta::RebuildMetrics() const {
+ TGranuleSummary result;
+ std::map<ui32, ui64> sizeByColumns;
+ bool differentBorders = false;
+ THashSet<NArrow::TReplaceKey> borders;
+
+ for (auto&& i : Portions) {
+ if (i.second.IsActive()) {
+ if (!differentBorders) {
+ borders.insert(i.second.IndexKeyStart());
+ borders.insert(i.second.IndexKeyEnd());
+ differentBorders = (borders.size() > 1);
+ }
+ for (auto&& c : i.second.Records) {
+ sizeByColumns[c.ColumnId] += c.BlobRange.Size;
+ }
+ auto sizes = i.second.BlobsSizes();
+ if (i.second.IsInserted()) {
+ result.Inserted.PortionsSize += sizes.first;
+ result.Inserted.MaxColumnsSize += sizes.second;
+ ++result.Inserted.PortionsCount;
+ } else {
+ result.Other.PortionsSize += sizes.first;
+ result.Other.MaxColumnsSize += sizes.second;
+ ++result.Other.PortionsCount;
+ }
+ }
+ }
+ std::map<ui64, std::vector<ui32>> transpSorted;
+ for (auto&& i : sizeByColumns) {
+ transpSorted[i.second].emplace_back(i.first);
+ }
+ result.ColumnIdsSortedBySizeDescending.reserve(sizeByColumns.size());
+ for (auto it = transpSorted.rbegin(); it != transpSorted.rend(); ++it) {
+ for (auto&& v : it->second) {
+ result.ColumnIdsSortedBySizeDescending.emplace_back(TColumnSummary(v, it->first));
+ }
+ }
+ result.DifferentBorders = differentBorders;
+
+ SummaryCache = result;
+}
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
new file mode 100644
index 0000000000..104b8baf8e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -0,0 +1,250 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/column_engine.h>
+#include <ydb/core/tx/columnshard/engines/portion_info.h>
+
+namespace NKikimr::NOlap {
+
+class TGranulesStorage;
+
+class TCompactionPriorityInfo {
+protected:
+ ui32 ProblemSequenceLength = 0;
+ TInstant NextAttemptInstant;
+public:
+ TInstant GetNextAttemptInstant() const {
+ return NextAttemptInstant;
+ }
+
+ void OnCompactionFailed() {
+ ++ProblemSequenceLength;
+ NextAttemptInstant = TInstant::Now() + TDuration::Seconds(1);
+ }
+
+ void OnCompactionFinished() {
+ ProblemSequenceLength = 0;
+ NextAttemptInstant = TInstant::Zero();
+ }
+
+ void OnCompactionCanceled() {
+ NextAttemptInstant = TInstant::Now() + TDuration::Seconds(1);
+ }
+};
+
+class TDataClassSummary {
+private:
+ ui64 PortionsSize = 0;
+ ui64 MaxColumnsSize = 0;
+ ui64 PortionsCount = 0;
+ friend class TGranuleMeta;
+public:
+ ui64 GetPortionsSize() const {
+ return PortionsSize;
+ }
+ ui64 GetMaxColumnsSize() const {
+ return MaxColumnsSize;
+ }
+ ui64 GetPortionsCount() const {
+ return PortionsCount;
+ }
+
+ TDataClassSummary operator+(const TDataClassSummary& item) const {
+ TDataClassSummary result;
+ result.PortionsSize = PortionsSize + item.PortionsSize;
+ result.MaxColumnsSize = MaxColumnsSize + item.MaxColumnsSize;
+ result.PortionsCount = PortionsCount + item.PortionsCount;
+ return result;
+ }
+};
+
+class TColumnSummary {
+private:
+ ui32 ColumnId;
+ ui64 BlobsSize;
+public:
+ TColumnSummary(const ui32 columnId, const ui64 blobsSize)
+ : ColumnId(columnId)
+ , BlobsSize(blobsSize)
+ {
+
+ }
+
+ ui32 GetColumnId() const {
+ return ColumnId;
+ }
+
+ ui64 GetBlobsSize() const {
+ return BlobsSize;
+ }
+};
+
+class TGranuleSummary {
+private:
+ TDataClassSummary Inserted;
+ TDataClassSummary Other;
+ friend class TGranuleMeta;
+ std::vector<TColumnSummary> ColumnIdsSortedBySizeDescending;
+ bool DifferentBorders = false;
+public:
+ const std::vector<TColumnSummary>& GetColumnIdsSortedBySizeDescending() const {
+ return ColumnIdsSortedBySizeDescending;
+ }
+ const TDataClassSummary& GetInserted() const {
+ return Inserted;
+ }
+ const TDataClassSummary& GetOther() const {
+ return Other;
+ }
+ bool GetDifferentBorders() const {
+ return DifferentBorders;
+ }
+ ui64 GetGranuleSize() const {
+ return (Inserted + Other).GetPortionsSize();
+ }
+ ui64 GetActivePortionsCount() const {
+ return (Inserted + Other).GetPortionsCount();
+ }
+ ui64 GetMaxColumnsSize() const {
+ return (Inserted + Other).GetMaxColumnsSize();
+ }
+};
+
+class TCompactionPriority: public TCompactionPriorityInfo {
+private:
+ using TBase = TCompactionPriorityInfo;
+ TGranuleSummary GranuleSummary;
+ ui64 GetWeightCorrected() const {
+ if (!GranuleSummary.GetDifferentBorders()) {
+ return 0;
+ }
+ if (GranuleSummary.GetActivePortionsCount() <= 1) {
+ return 0;
+ }
+ const ui64 weightedSize = (1.0 * GranuleSummary.GetGranuleSize() / 1024) * GranuleSummary.GetActivePortionsCount();
+ return weightedSize;
+ }
+public:
+ TCompactionPriority(const TCompactionPriorityInfo& data, const TGranuleSummary& granuleSummary)
+ : TBase(data)
+ , GranuleSummary(granuleSummary)
+ {
+
+ }
+ bool operator<(const TCompactionPriority& item) const {
+ return std::tuple(GetWeightCorrected(), GranuleSummary.GetActivePortionsCount(), item.NextAttemptInstant)
+ < std::tuple(item.GetWeightCorrected(), item.GranuleSummary.GetActivePortionsCount(), NextAttemptInstant);
+ }
+
+};
+
+class TGranuleMeta: public ICompactionObjectCallback, TNonCopyable {
+private:
+ THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo
+ mutable std::optional<TGranuleSummary> SummaryCache;
+ bool NeedSplit(const TCompactionLimits& limits, bool& inserted) const;
+ void RebuildMetrics() const;
+
+ void ResetCaches() {
+ SummaryCache = {};
+ }
+
+ enum class EActivity {
+ SplitCompaction,
+ InternalCompaction,
+ };
+
+ std::set<EActivity> Activity;
+ TCompactionPriorityInfo CompactionPriorityInfo;
+ mutable bool AllowInsertionFlag = false;
+ std::shared_ptr<TGranulesStorage> Owner;
+
+ void OnAfterChangePortion(const ui64 portion);
+public:
+ const TGranuleSummary& GetSummary() const {
+ if (!SummaryCache) {
+ RebuildMetrics();
+ }
+ return *SummaryCache;
+ }
+ TCompactionPriority GetCompactionPriority() const {
+ return TCompactionPriority(CompactionPriorityInfo, GetSummary());
+ }
+
+ bool NeedCompaction(const TCompactionLimits& limits) const {
+ if (InCompaction() || Empty()) {
+ return false;
+ }
+ return NeedSplitCompaction(limits) || NeedInternalCompaction(limits);
+ }
+
+ bool InCompaction() const {
+ return Activity.contains(EActivity::SplitCompaction) || Activity.contains(EActivity::InternalCompaction);
+ }
+
+ void AllowedInsertion() const {
+ if (InCompaction()) {
+ AllowInsertionFlag = true;
+ }
+ }
+
+ bool IsInsertAllowed() const {
+ return AllowInsertionFlag || !Activity.contains(EActivity::SplitCompaction);
+ }
+
+ bool IsErasable() const {
+ return Activity.empty();
+ }
+
+ virtual void OnCompactionStarted(const bool inGranule) override;
+
+ virtual void OnCompactionCanceled(const TString& reason) override;
+ virtual void OnCompactionFailed(const TString& reason) override;
+ virtual void OnCompactionFinished() override;
+
+ void UpsertPortion(const TPortionInfo& info);
+
+ virtual TString DebugString() const override {
+ return TStringBuilder() << "granule:" << GetGranuleId() << ";path_id:" << Record.PathId << ";";
+ }
+
+ const TGranuleRecord Record;
+
+ void AddColumnRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec);
+
+ const THashMap<ui64, TPortionInfo>& GetPortions() const {
+ return Portions;
+ }
+
+ const TPortionInfo& GetPortionVerified(const ui64 portion) const {
+ auto it = Portions.find(portion);
+ Y_VERIFY(it != Portions.end());
+ return it->second;
+ }
+
+ bool ErasePortion(const ui64 portion);
+
+ explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner)
+ : Owner(owner)
+ , Record(rec) {
+ }
+
+ ui64 GetGranuleId() const {
+ return Record.Granule;
+ }
+ ui64 PathId() const noexcept { return Record.PathId; }
+ bool Empty() const noexcept { return Portions.empty(); }
+
+ ui64 Size() const;
+ bool IsOverloaded(const TCompactionLimits& limits) const {
+ return Size() >= limits.GranuleOverloadSize;
+ }
+ bool NeedSplitCompaction(const TCompactionLimits& limits) const {
+ bool inserted = false;
+ return NeedSplit(limits, inserted);
+ }
+ bool NeedInternalCompaction(const TCompactionLimits& limits) const {
+ bool inserted = false;
+ return !NeedSplit(limits, inserted) && inserted;
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/storage.cpp b/ydb/core/tx/columnshard/engines/storage/storage.cpp
new file mode 100644
index 0000000000..01c66c8bc8
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/storage.cpp
@@ -0,0 +1,71 @@
+#include "storage.h"
+
+namespace NKikimr::NOlap {
+
+const THashSet<ui64>* TGranulesStorage::GetOverloaded(ui64 pathId) const {
+ if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
+ return &pi->second;
+ }
+ return nullptr;
+}
+
+void TGranulesStorage::UpdateGranuleInfo(const TGranuleMeta& granule) {
+ if (PackModificationFlag) {
+ PackModifiedGranules[granule.GetGranuleId()] = &granule;
+ return;
+ }
+ {
+ auto it = GranulesCompactionPriority.find(granule.GetGranuleId());
+ auto gPriority = granule.GetCompactionPriority();
+ if (it == GranulesCompactionPriority.end()) {
+ it = GranulesCompactionPriority.emplace(granule.GetGranuleId(), gPriority).first;
+ Y_VERIFY(GranuleCompactionPrioritySorting[gPriority].emplace(granule.GetGranuleId()).second);
+ } else {
+ auto itSorting = GranuleCompactionPrioritySorting.find(it->second);
+ Y_VERIFY(itSorting != GranuleCompactionPrioritySorting.end());
+ Y_VERIFY(itSorting->second.erase(granule.GetGranuleId()));
+ if (itSorting->second.empty()) {
+ GranuleCompactionPrioritySorting.erase(itSorting);
+ }
+ it->second = gPriority;
+ Y_VERIFY(GranuleCompactionPrioritySorting[gPriority].emplace(granule.GetGranuleId()).second);
+ }
+ }
+ const ui64 pathId = granule.Record.PathId;
+
+ // Size exceeds the configured limit. Mark granule as overloaded.
+ if (granule.Size() >= Limits.GranuleOverloadSize) {
+ if (PathsGranulesOverloaded[pathId].emplace(granule.GetGranuleId()).second) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "overloaded")("path_id", pathId)("granule", granule.GetGranuleId());
+ }
+ } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
+ // Size is under limit. Remove granule from the overloaded set.
+ if (pi->second.erase(granule.GetGranuleId())) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "unoverloaded")("path_id", pathId)("granule", granule.GetGranuleId())("remained", pi->second.size());
+ }
+ // Remove entry for the pathId if there it has no overloaded granules any more.
+ if (pi->second.empty()) {
+ PathsGranulesOverloaded.erase(pi);
+ }
+ }
+ Counters.OverloadGranules->Set(PathsGranulesOverloaded.size());
+ if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
+ TStringBuilder sb;
+ for (auto&& i : PathsGranulesOverloaded) {
+ sb << i.first << ":";
+ bool isFirst = true;
+ for (auto&& g : i.second) {
+ if (!isFirst) {
+ sb << ",";
+ } else {
+ isFirst = false;
+ }
+ sb << g;
+ }
+ sb << ";";
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("overload_granules", sb);
+ }
+}
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h
new file mode 100644
index 0000000000..14785f2c27
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/storage.h
@@ -0,0 +1,99 @@
+#pragma once
+#include "granule.h"
+#include <ydb/core/tx/columnshard/counters/engine_logs.h>
+
+namespace NKikimr::NOlap {
+
+class TGranulesStorage {
+private:
+ const TCompactionLimits Limits;
+ THashMap<ui64, THashSet<ui64>> PathsGranulesOverloaded;
+ const NColumnShard::TEngineLogsCounters Counters;
+ THashMap<ui64, TCompactionPriority> GranulesCompactionPriority;
+ std::map<TCompactionPriority, std::set<ui64>> GranuleCompactionPrioritySorting;
+ bool PackModificationFlag = false;
+ THashMap<ui64, const TGranuleMeta*> PackModifiedGranules;
+ void StartModificationImpl() {
+ Y_VERIFY(!PackModificationFlag);
+ PackModificationFlag = true;
+ }
+
+ void FinishModificationImpl() {
+ Y_VERIFY(PackModificationFlag);
+ PackModificationFlag = false;
+ for (auto&& i : PackModifiedGranules) {
+ UpdateGranuleInfo(*i.second);
+ }
+ PackModifiedGranules.clear();
+ }
+
+public:
+ TGranulesStorage(const NColumnShard::TEngineLogsCounters counters, const TCompactionLimits& limits)
+ : Limits(limits)
+ , Counters(counters) {
+
+ }
+
+ class TModificationGuard {
+ private:
+ TGranulesStorage& Owner;
+ public:
+ TModificationGuard(TGranulesStorage& storage)
+ : Owner(storage) {
+ Owner.StartModificationImpl();
+ }
+
+ ~TModificationGuard() {
+ Owner.FinishModificationImpl();
+ }
+ };
+
+ ui32 GetOverloadedGranulesCount() const {
+ ui32 result = 0;
+ for (auto&& i : PathsGranulesOverloaded) {
+ result += i.second.size();
+ }
+ return result;
+ }
+
+ bool HasOverloadedGranules() const {
+ return PathsGranulesOverloaded.size();
+ }
+
+ TModificationGuard StartPackModification() {
+ return TModificationGuard(*this);
+ }
+
+ const THashSet<ui64>* GetOverloaded(ui64 pathId) const;
+ template <class TFilter>
+ std::optional<ui64> GetGranuleForCompaction(const TFilter& filter) const {
+ if (!GranuleCompactionPrioritySorting.size()) {
+ return {};
+ }
+ const TInstant now = TInstant::Now();
+ std::optional<ui64> reserve;
+ TInstant reserveInstant;
+ for (auto it = GranuleCompactionPrioritySorting.rbegin(); it != GranuleCompactionPrioritySorting.rend(); ++it) {
+ auto itSorting = GranuleCompactionPrioritySorting.rbegin();
+ Y_VERIFY(itSorting->second.size());
+ for (auto&& i : itSorting->second) {
+ if (filter(i)) {
+ if (it->first.GetNextAttemptInstant() > now) {
+ if (!reserve || reserveInstant > it->first.GetNextAttemptInstant()) {
+ reserveInstant = it->first.GetNextAttemptInstant();
+ reserve = i;
+ }
+ } else {
+ return i;
+ }
+ }
+ }
+ }
+ return reserve;
+ }
+
+ void UpdateGranuleInfo(const TGranuleMeta& granule);
+
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 35e1347a0f..6dbb7f3465 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -297,10 +297,9 @@ struct TExpected {
bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step,
const TExpected& expected) {
- ui64 lastCompactedGranule = 0;
- auto compactionInfo = engine.Compact(TestLimits(), lastCompactedGranule);
- UNIT_ASSERT_VALUES_EQUAL(compactionInfo->Granules.size(), 1);
- UNIT_ASSERT(!compactionInfo->InGranule);
+ auto compactionInfo = engine.Compact(TestLimits());
+ UNIT_ASSERT(!!compactionInfo);
+ UNIT_ASSERT(!compactionInfo->InGranule());
std::shared_ptr<TColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TSnapshot::Zero(), TestLimits());
UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions);
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 299020b6bb..ad1cb024f4 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2568,7 +2568,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
if (msg->IndexChanges->CompactionInfo) {
++compactionsHappened;
- Cerr << "Compaction at snaphsot "<< msg->IndexChanges->InitSnapshot
+ Cerr << "Compaction at snapshot "<< msg->IndexChanges->InitSnapshot
<< " old portions:";
ui64 srcGranule{0};
for (const auto& portionInfo : msg->IndexChanges->SwitchedPortions) {