aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-27 09:00:32 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-27 09:00:32 +0300
commit7f0c50f5391d6bb23e61f598c1332c05bd2f9439 (patch)
tree5ceef2d5a1701288bf15d36d6833efe0f43db4a0
parent92529fe7ede3280622c6a21f3f50b303bd0ca765 (diff)
downloadydb-7f0c50f5391d6bb23e61f598c1332c05bd2f9439.tar.gz
additional signals
fix overload list filling overload priority temporary for experiments
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h3
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp44
-rw-r--r--ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/counters/engine_logs.cpp21
-rw-r--r--ydb/core/tx/columnshard/counters/engine_logs.h19
-rw-r--r--ydb/core/tx/columnshard/counters/indexation.cpp29
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp78
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h3
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp23
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.h1
14 files changed, 163 insertions, 64 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index dfb3f88835..563b3957d3 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -28,7 +28,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID, IndexationCounters));
CompactionActor = ctx.Register(
- CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS, CompactionCounters),
+ CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS),
// Default mail-box and batch pool.
TMailboxType::HTSwap, AppData(ctx)->BatchPoolId);
EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID, EvictionCounters));
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 7c3788993c..7bfc833709 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -25,7 +25,7 @@ namespace NKikimr::NColumnShard {
extern bool gAllowLogBatchingDefaultValue;
IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters);
-IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers, const TIndexationCounters& counters);
+IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers);
IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters);
IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
@@ -384,7 +384,6 @@ private:
const TScanCounters ReadCounters;
const TScanCounters ScanCounters;
const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation");
- const TIndexationCounters CompactionCounters = TIndexationCounters("Compaction");
const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction");
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index ea6cc4d366..b08984a1b1 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -11,15 +11,15 @@ namespace {
class TCompactionActor: public TActorBootstrapped<TCompactionActor> {
private:
- const TIndexationCounters Counters;
+ const TIndexationCounters InternalCounters = TIndexationCounters("InternalCompaction");
+ const TIndexationCounters SplitCounters = TIndexationCounters("SplitCompaction");
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::TX_COLUMNSHARD_COMPACTION_ACTOR;
}
- TCompactionActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters)
- : Counters(counters)
- , TabletId(tabletId)
+ TCompactionActor(ui64 tabletId, const TActorId& parent)
+ : TabletId(tabletId)
, Parent(parent)
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) {
}
@@ -27,10 +27,10 @@ public:
void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& /*ctx*/) {
Y_VERIFY(!TxEvent);
Y_VERIFY(Blobs.empty() && !NumRead);
-
LastActivationTime = TAppData::TimeProvider->Now();
auto& event = *ev->Get();
TxEvent = std::move(event.TxEvent);
+ IsSplitCurrently = NOlap::TCompactionLogic::IsSplit(TxEvent->IndexChanges);
auto& indexChanges = TxEvent->IndexChanges;
Y_VERIFY(indexChanges);
@@ -44,7 +44,7 @@ public:
for (const auto& blobRange : ranges) {
Y_VERIFY(blobId == blobRange.BlobId);
Blobs[blobRange] = {};
- Counters.ReadBytes->Add(blobRange.Size);
+ GetCurrentCounters().ReadBytes->Add(blobRange.Size);
}
SendReadRequest(std::move(ranges), event.Externals.contains(blobId));
}
@@ -67,6 +67,11 @@ public:
TString blobData = event.Data;
Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size);
Blobs[blobId] = blobData;
+ } else if (event.Status == NKikimrProto::EReplyStatus::NODATA) {
+ Y_ASSERT(false);
+ LOG_S_WARN("TEvReadBlobRangeResult cannot get blob "
+ << blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet "
+ << TabletId << " (compaction)");
} else {
LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob "
<< blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet "
@@ -77,8 +82,7 @@ public:
}
}
- ++NumRead;
- if (NumRead == Blobs.size()) {
+ if (++NumRead == Blobs.size()) {
CompactGranules(ctx);
Clear();
}
@@ -99,6 +103,7 @@ public:
}
private:
+ bool IsSplitCurrently = false;
ui64 TabletId;
TActorId Parent;
TActorId BlobCacheActorId;
@@ -113,6 +118,14 @@ private:
NumRead = 0;
}
+ const TIndexationCounters& GetCurrentCounters() const {
+ if (IsSplitCurrently) {
+ return SplitCounters;
+ } else {
+ return InternalCounters;
+ }
+ }
+
void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) {
Y_VERIFY(!ranges.empty());
@@ -134,7 +147,7 @@ private:
TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
- NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+ NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, GetCurrentCounters());
TxEvent->Blobs = compactionLogic.Apply(TxEvent->IndexChanges);
if (TxEvent->Blobs.empty()) {
TxEvent->PutStatus = NKikimrProto::OK; // nothing to write, commit
@@ -150,16 +163,13 @@ private:
};
class TCompactionGroupActor: public TActorBootstrapped<TCompactionGroupActor> {
-private:
- const TIndexationCounters Counters;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::TX_COLUMNSHARD_COMPACTION_ACTOR;
}
- TCompactionGroupActor(ui64 tabletId, const TActorId& parent, const ui64 size, const TIndexationCounters& counters)
- : Counters(counters)
- , TabletId(tabletId)
+ TCompactionGroupActor(ui64 tabletId, const TActorId& parent, const ui64 size)
+ : TabletId(tabletId)
, Parent(parent)
, Idle(size == 0 ? 1 : size)
{
@@ -169,7 +179,7 @@ public:
Become(&TThis::StateWait);
for (auto& worker : Idle) {
- worker = ctx.Register(new TCompactionActor(TabletId, ctx.SelfID, Counters));
+ worker = ctx.Register(new TCompactionActor(TabletId, ctx.SelfID));
}
}
@@ -225,8 +235,8 @@ private:
} // namespace
-IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers, const TIndexationCounters& counters) {
- return new TCompactionGroupActor(tabletId, parent, workers, counters);
+IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers) {
+ return new TCompactionGroupActor(tabletId, parent, workers);
}
} // namespace NKikimr::NColumnShard
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
index 9632aa025c..10c00092fd 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
@@ -17,4 +17,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC
target_sources(tx-columnshard-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
index 6efe68a424..0502f0daa8 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
@@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC
target_sources(tx-columnshard-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
index 6efe68a424..0502f0daa8 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
@@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC
target_sources(tx-columnshard-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
index 9632aa025c..10c00092fd 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
@@ -17,4 +17,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC
target_sources(tx-columnshard-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp
new file mode 100644
index 0000000000..b855fcff8c
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp
@@ -0,0 +1,21 @@
+#include "engine_logs.h"
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/counters.h>
+
+namespace NKikimr::NColumnShard {
+
+TEngineLogsCounters::TEngineLogsCounters() {
+ const TString module = "EngineLogs";
+ if (NActors::TlsActivationContext) {
+ SubGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard");
+ } else {
+ SubGroup = new NMonitoring::TDynamicCounters();
+ }
+ OverloadGranules = SubGroup->GetCounter(module + "/Granules/Overload", false);
+ CompactOverloadGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Overload/Count", true);
+ NoCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/No/Count", true);
+ SplitCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Split/Count", true);
+ InternalCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Internal/Count", true);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h
new file mode 100644
index 0000000000..7774f3c3aa
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/engine_logs.h
@@ -0,0 +1,19 @@
+#pragma once
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NKikimr::NColumnShard {
+
+class TEngineLogsCounters {
+private:
+ ::NMonitoring::TDynamicCounterPtr SubGroup;
+public:
+ NMonitoring::TDynamicCounters::TCounterPtr OverloadGranules;
+ NMonitoring::TDynamicCounters::TCounterPtr CompactOverloadGranulesSelection;
+ NMonitoring::TDynamicCounters::TCounterPtr NoCompactGranulesSelection;
+ NMonitoring::TDynamicCounters::TCounterPtr SplitCompactGranulesSelection;
+ NMonitoring::TDynamicCounters::TCounterPtr InternalCompactGranulesSelection;
+
+ TEngineLogsCounters();
+};
+
+}
diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp
index f2de68ebee..8eb7450ee9 100644
--- a/ydb/core/tx/columnshard/counters/indexation.cpp
+++ b/ydb/core/tx/columnshard/counters/indexation.cpp
@@ -10,21 +10,22 @@ TIndexationCounters::TIndexationCounters(const TString& module) {
} else {
SubGroup = new NMonitoring::TDynamicCounters();
}
- ReadBytes = SubGroup->GetCounter(module + "/ReadBytes", true);
- AnalizeCompactedPortions = SubGroup->GetCounter(module + "/AnalizeCompactedPortions", true);
- AnalizeInsertedPortions = SubGroup->GetCounter(module + "/AnalizeInsertedPortions", true);
- RepackedInsertedPortions = SubGroup->GetCounter(module + "/RepackedInsertedPortions", true);
- RepackedInsertedPortionBytes = SubGroup->GetCounter(module + "/RepackedInsertedPortionBytes", true);
- SkipPortionsMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipPortionsMoveThroughIntersection", true);
- SkipPortionBytesMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipPortionBytesMoveThroughIntersection", true);
- RepackedCompactedPortions = SubGroup->GetCounter(module + "/RepackedCompactedPortions", true);
- MovedPortions = SubGroup->GetCounter(module + "/MovedPortions", true);
- MovedPortionBytes = SubGroup->GetCounter(module + "/MovedPortionBytes", true);
+ ReadBytes = SubGroup->GetCounter(module + "/Read/Bytes", true);
+ AnalizeInsertedPortions = SubGroup->GetCounter(module + "/AnalizeInsertion/Portions", true);
+ RepackedInsertedPortions = SubGroup->GetCounter(module + "/RepackedInsertion/Portions", true);
+ RepackedInsertedPortionBytes = SubGroup->GetCounter(module + "/RepackedInsertion/Bytes", true);
- TrashDataSerializationBytes = SubGroup->GetCounter(module + "/TrashDataSerializationBytes", true);
- TrashDataSerialization = SubGroup->GetCounter(module + "/TrashDataSerialization", true);
- CorrectDataSerializationBytes = SubGroup->GetCounter(module + "/CorrectDataSerializationBytes", true);
- CorrectDataSerialization = SubGroup->GetCounter(module + "/CorrectDataSerialization", true);
+ AnalizeCompactedPortions = SubGroup->GetCounter(module + "/AnalizeCompaction/Portions", true);
+ SkipPortionsMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Portions", true);
+ SkipPortionBytesMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Bytes", true);
+ RepackedCompactedPortions = SubGroup->GetCounter(module + "/RepackedCompaction/Portions", true);
+ MovedPortions = SubGroup->GetCounter(module + "/Moved/Portions", true);
+ MovedPortionBytes = SubGroup->GetCounter(module + "/Moved/Bytes", true);
+
+ TrashDataSerializationBytes = SubGroup->GetCounter(module + "/TrashDataSerialization/Bytes", true);
+ TrashDataSerialization = SubGroup->GetCounter(module + "/TrashDataSerialization/Count", true);
+ CorrectDataSerializationBytes = SubGroup->GetCounter(module + "/CorrectDataSerialization/Bytes", true);
+ CorrectDataSerialization = SubGroup->GetCounter(module + "/CorrectDataSerialization/Count", true);
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 02099110aa..1646b6e8d5 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -469,6 +469,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
for (const auto& [_, portionInfo] : gi->second->Portions) {
if (portionInfo.IsActive()) {
changes->SwitchedPortions.push_back(portionInfo);
+ Y_VERIFY(portionInfo.Granule() == granule);
}
}
@@ -703,16 +704,21 @@ void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr
// Size exceeds the configured limit. Mark granule as overloaded.
if (size >= limits.GranuleOverloadSize) {
- PathsGranulesOverloaded.emplace(pathId, granule);
+ if (PathsGranulesOverloaded[pathId].emplace(granule).second) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "overloaded")("path_id", pathId)("granule", granule);
+ }
} else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
// Size is under limit. Remove granule from the overloaded set.
- pi->second.erase(granule);
+ if (pi->second.erase(granule)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "unoverloaded")("path_id", pathId)("granule", granule)("remained", pi->second.size());
+ }
// Remove entry for the pathId if there it has no overloaded granules any more.
if (pi->second.empty()) {
PathsGranulesOverloaded.erase(pi);
}
}
}
+ SignalCounters.OverloadGranules->Set(PathsGranulesOverloaded.size());
}
bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
@@ -758,7 +764,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
if (!ApplyChanges(db, *changes, snapshot, false)) { // validate only
if (changes->IsCompaction()) {
- // Return granule to Compation list. This is equal to single compaction worker behaviour.
+ // Return granule to Compaction list. This is equal to single compaction worker behaviour.
for (const auto& portionInfo : changes->SwitchedPortions) {
CompactionGranules.insert(portionInfo.Granule());
}
@@ -866,7 +872,11 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
ui64 granule = portionInfo.Granule();
ui64 portion = portionInfo.Portion();
- if (!Granules.contains(granule) || !Granules[granule]->Portions.contains(portion)) {
+ if (!Granules.contains(granule)) {
+ LOG_S_DEBUG("Cannot update unknown granule " << granule << " at tablet " << TabletId);
+ return false;
+ }
+ if (!Granules[granule]->Portions.contains(portion)) {
LOG_S_ERROR("Cannot update unknown portion " << portionInfo << " at tablet " << TabletId);
return false;
}
@@ -1315,38 +1325,58 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompaction
ui64 granule = 0;
bool inGranule = true;
- for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty();) {
- // Start from the beginning if the end is reached.
- if (it == CompactionGranules.end()) {
- it = CompactionGranules.begin();
+ if (PathsGranulesOverloaded.size()) {
+ {
+ auto it = PathsGranulesOverloaded.begin();
+ Y_VERIFY(it->second.size());
+ granule = *it->second.begin();
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_overload")("path_id", it->first)("granules_count", it->second.size())("granule", granule)("paths_count", PathsGranulesOverloaded.size());
+ SignalCounters.CompactOverloadGranulesSelection->Add(1);
}
- const auto gi = Granules.find(*it);
- // Check granule exists.
+ const auto gi = Granules.find(granule);
Y_VERIFY(gi != Granules.end());
bool inserted = false;
- if (NeedSplit(gi->second->Portions, limits, inserted)) {
- inGranule = false;
- granule = *it;
- CompactionGranules.erase(it);
- break;
- } else if (inserted) {
- granule = *it;
- CompactionGranules.erase(it);
- break;
- }
+ Y_VERIFY(NeedSplit(gi->second->Portions, limits, inserted));
+ inGranule = false;
+ Y_VERIFY(CompactionGranules.erase(granule));
+ SignalCounters.SplitCompactGranulesSelection->Add(1);
+ } else {
+ for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty() && !granule;) {
+ // Start from the beginning if the end is reached.
+ if (it == CompactionGranules.end()) {
+ it = CompactionGranules.begin();
+ }
- // Nothing to compact in the current granule. Throw it.
- it = CompactionGranules.erase(it);
- }
+ const auto gi = Granules.find(*it);
+ // Check granule exists.
+ Y_VERIFY(gi != Granules.end());
+
+ bool inserted = false;
+ if (NeedSplit(gi->second->Portions, limits, inserted)) {
+ inGranule = false;
+ granule = *it;
+ SignalCounters.SplitCompactGranulesSelection->Add(1);
+ } else if (inserted) {
+ granule = *it;
+ SignalCounters.InternalCompactGranulesSelection->Add(1);
+ }
+ // Nothing to compact in the current granule. Throw it.
+ it = CompactionGranules.erase(it);
+ }
+ if (granule) {
+ lastCompactedGranule = granule;
+ }
+ }
if (granule) {
auto info = std::make_unique<TCompactionInfo>();
info->Granules.insert(granule);
info->InGranule = inGranule;
- lastCompactedGranule = granule;
return info;
+ } else {
+ SignalCounters.NoCompactGranulesSelection->Add(1);
}
return {};
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 6736a395c5..409a6ae848 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -3,6 +3,7 @@
#include "defs.h"
#include "column_engine.h"
#include "scalars.h"
+#include <ydb/core/tx/columnshard/counters/engine_logs.h>
namespace NKikimr::NArrow {
struct TSortDescription;
@@ -21,6 +22,8 @@ class TCountersTable;
///
/// @note One instance per tablet.
class TColumnEngineForLogs : public IColumnEngine {
+private:
+ const NColumnShard::TEngineLogsCounters SignalCounters;
public:
class TMarksGranules {
public:
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index a3c42fd078..9eadd403d6 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -115,8 +115,13 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
auto columnSaver = resultSchema->GetColumnSaver(name, saverContext);
auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver, Counters);
if (!blob.size()) {
+ Counters.TrashDataSerializationBytes->Add(blob.size());
+ Counters.TrashDataSerialization->Add(1);
ok = false;
break;
+ } else {
+ Counters.CorrectDataSerializationBytes->Add(blob.size());
+ Counters.CorrectDataSerialization->Add(1);
}
// TODO: combine small columns in one blob
@@ -316,11 +321,11 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TCompactionLogic::Comp
std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const {
const ui64 pathId = changes->SrcGranule->PathId;
std::vector<TString> blobs;
- auto& switchedProtions = changes->SwitchedPortions;
- Y_VERIFY(switchedProtions.size());
+ auto& switchedPortions = changes->SwitchedPortions;
+ Y_VERIFY(switchedPortions.size());
- ui64 granule = switchedProtions[0].Granule();
- auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedProtions, changes->Blobs);
+ ui64 granule = switchedPortions[0].Granule();
+ auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedPortions, changes->Blobs);
auto resultSchema = SchemaVersions.GetLastSchema();
std::vector<TPortionInfo> portions;
@@ -714,6 +719,11 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
return blobs;
}
+bool TCompactionLogic::IsSplit(std::shared_ptr<TColumnEngineChanges> changes) {
+ auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes);
+ return !castedChanges->CompactionInfo->InGranule;
+}
+
std::vector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const {
Y_VERIFY(changes);
Y_VERIFY(changes->CompactionInfo);
@@ -723,10 +733,11 @@ std::vector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChange
Y_VERIFY(changes->AppendedPortions.empty()); // dst meta
auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes);
- if (castedChanges->CompactionInfo->InGranule) {
+ if (!IsSplit(castedChanges)) {
return CompactInGranule(castedChanges);
+ } else {
+ return CompactSplitGranule(castedChanges);
}
- return CompactSplitGranule(castedChanges);
}
std::vector<TString> TEvictionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const {
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h
index 46ef19696b..dd5d896169 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.h
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h
@@ -71,6 +71,7 @@ public:
using TIndexLogicBase::TIndexLogicBase;
std::vector<TString> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const override;
+ static bool IsSplit(std::shared_ptr<TColumnEngineChanges> changes);
private:
std::vector<TString> CompactSplitGranule(const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) const;