From 7f0c50f5391d6bb23e61f598c1332c05bd2f9439 Mon Sep 17 00:00:00 2001
From: ivanmorozov <ivanmorozov@yandex-team.com>
Date: Sat, 27 May 2023 09:00:32 +0300
Subject: additional signals

fix overload list filling
overload priority temporary for experiments
---
 ydb/core/tx/columnshard/columnshard.cpp            |  2 +-
 ydb/core/tx/columnshard/columnshard_impl.h         |  3 +-
 ydb/core/tx/columnshard/compaction_actor.cpp       | 44 +++++++-----
 .../counters/CMakeLists.darwin-x86_64.txt          |  1 +
 .../counters/CMakeLists.linux-aarch64.txt          |  1 +
 .../counters/CMakeLists.linux-x86_64.txt           |  1 +
 .../counters/CMakeLists.windows-x86_64.txt         |  1 +
 ydb/core/tx/columnshard/counters/engine_logs.cpp   | 21 ++++++
 ydb/core/tx/columnshard/counters/engine_logs.h     | 19 ++++++
 ydb/core/tx/columnshard/counters/indexation.cpp    | 29 ++++----
 .../tx/columnshard/engines/column_engine_logs.cpp  | 78 +++++++++++++++-------
 .../tx/columnshard/engines/column_engine_logs.h    |  3 +
 .../tx/columnshard/engines/index_logic_logs.cpp    | 23 +++++--
 ydb/core/tx/columnshard/engines/index_logic_logs.h |  1 +
 14 files changed, 163 insertions(+), 64 deletions(-)
 create mode 100644 ydb/core/tx/columnshard/counters/engine_logs.cpp
 create mode 100644 ydb/core/tx/columnshard/counters/engine_logs.h

diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index dfb3f88835..563b3957d3 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -28,7 +28,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
 
     IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID, IndexationCounters));
     CompactionActor = ctx.Register(
-        CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS, CompactionCounters),
+        CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS),
         // Default mail-box and batch pool.
         TMailboxType::HTSwap, AppData(ctx)->BatchPoolId);
     EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID, EvictionCounters));
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 7c3788993c..7bfc833709 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -25,7 +25,7 @@ namespace NKikimr::NColumnShard {
 extern bool gAllowLogBatchingDefaultValue;
 
 IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters);
-IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers, const TIndexationCounters& counters);
+IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers);
 IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters);
 IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
                          const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
@@ -384,7 +384,6 @@ private:
     const TScanCounters ReadCounters;
     const TScanCounters ScanCounters;
     const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation");
-    const TIndexationCounters CompactionCounters = TIndexationCounters("Compaction");
     const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction");
     
 
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index ea6cc4d366..b08984a1b1 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -11,15 +11,15 @@ namespace {
 
 class TCompactionActor: public TActorBootstrapped<TCompactionActor> {
 private:
-    const TIndexationCounters Counters;
+    const TIndexationCounters InternalCounters = TIndexationCounters("InternalCompaction");
+    const TIndexationCounters SplitCounters = TIndexationCounters("SplitCompaction");
 public:
     static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
         return NKikimrServices::TActivity::TX_COLUMNSHARD_COMPACTION_ACTOR;
     }
 
-    TCompactionActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters)
-        : Counters(counters)
-        , TabletId(tabletId)
+    TCompactionActor(ui64 tabletId, const TActorId& parent)
+        : TabletId(tabletId)
         , Parent(parent)
         , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) {
     }
@@ -27,10 +27,10 @@ public:
     void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& /*ctx*/) {
         Y_VERIFY(!TxEvent);
         Y_VERIFY(Blobs.empty() && !NumRead);
-
         LastActivationTime = TAppData::TimeProvider->Now();
         auto& event = *ev->Get();
         TxEvent = std::move(event.TxEvent);
+        IsSplitCurrently = NOlap::TCompactionLogic::IsSplit(TxEvent->IndexChanges);
 
         auto& indexChanges = TxEvent->IndexChanges;
         Y_VERIFY(indexChanges);
@@ -44,7 +44,7 @@ public:
             for (const auto& blobRange : ranges) {
                 Y_VERIFY(blobId == blobRange.BlobId);
                 Blobs[blobRange] = {};
-                Counters.ReadBytes->Add(blobRange.Size);
+                GetCurrentCounters().ReadBytes->Add(blobRange.Size);
             }
             SendReadRequest(std::move(ranges), event.Externals.contains(blobId));
         }
@@ -67,6 +67,11 @@ public:
             TString blobData = event.Data;
             Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size);
             Blobs[blobId] = blobData;
+        } else if (event.Status == NKikimrProto::EReplyStatus::NODATA) {
+            Y_ASSERT(false);
+            LOG_S_WARN("TEvReadBlobRangeResult cannot get blob "
+                << blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet "
+                << TabletId << " (compaction)");
         } else {
             LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob "
                         << blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet "
@@ -77,8 +82,7 @@ public:
             }
         }
 
-        ++NumRead;
-        if (NumRead == Blobs.size()) {
+        if (++NumRead == Blobs.size()) {
             CompactGranules(ctx);
             Clear();
         }
@@ -99,6 +103,7 @@ public:
     }
 
 private:
+    bool IsSplitCurrently = false;
     ui64 TabletId;
     TActorId Parent;
     TActorId BlobCacheActorId;
@@ -113,6 +118,14 @@ private:
         NumRead = 0;
     }
 
+    const TIndexationCounters& GetCurrentCounters() const {
+        if (IsSplitCurrently) {
+            return SplitCounters;
+        } else {
+            return InternalCounters;
+        }
+    }
+
     void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) {
         Y_VERIFY(!ranges.empty());
 
@@ -134,7 +147,7 @@ private:
 
             TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
 
-            NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+            NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, GetCurrentCounters());
             TxEvent->Blobs = compactionLogic.Apply(TxEvent->IndexChanges);
             if (TxEvent->Blobs.empty()) {
                 TxEvent->PutStatus = NKikimrProto::OK; // nothing to write, commit
@@ -150,16 +163,13 @@ private:
 };
 
 class TCompactionGroupActor: public TActorBootstrapped<TCompactionGroupActor> {
-private:
-    const TIndexationCounters Counters;
 public:
     static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
         return NKikimrServices::TActivity::TX_COLUMNSHARD_COMPACTION_ACTOR;
     }
 
-    TCompactionGroupActor(ui64 tabletId, const TActorId& parent, const ui64 size, const TIndexationCounters& counters)
-        : Counters(counters)
-        , TabletId(tabletId)
+    TCompactionGroupActor(ui64 tabletId, const TActorId& parent, const ui64 size)
+        : TabletId(tabletId)
         , Parent(parent)
         , Idle(size == 0 ? 1 : size)
     {
@@ -169,7 +179,7 @@ public:
         Become(&TThis::StateWait);
 
         for (auto& worker : Idle) {
-            worker = ctx.Register(new TCompactionActor(TabletId, ctx.SelfID, Counters));
+            worker = ctx.Register(new TCompactionActor(TabletId, ctx.SelfID));
         }
     }
 
@@ -225,8 +235,8 @@ private:
 
 } // namespace
 
-IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers, const TIndexationCounters& counters) {
-    return new TCompactionGroupActor(tabletId, parent, workers, counters);
+IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers) {
+    return new TCompactionGroupActor(tabletId, parent, workers);
 }
 
 } // namespace NKikimr::NColumnShard
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
index 9632aa025c..10c00092fd 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
@@ -17,4 +17,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC
 target_sources(tx-columnshard-counters PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp
 )
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
index 6efe68a424..0502f0daa8 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
@@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC
 target_sources(tx-columnshard-counters PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp
 )
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
index 6efe68a424..0502f0daa8 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
@@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC
 target_sources(tx-columnshard-counters PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp
 )
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
index 9632aa025c..10c00092fd 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
@@ -17,4 +17,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC
 target_sources(tx-columnshard-counters PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp
 )
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp
new file mode 100644
index 0000000000..b855fcff8c
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp
@@ -0,0 +1,21 @@
+#include "engine_logs.h"
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/counters.h>
+
+namespace NKikimr::NColumnShard {
+
+TEngineLogsCounters::TEngineLogsCounters() {
+    const TString module = "EngineLogs";
+    if (NActors::TlsActivationContext) {
+        SubGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard");
+    } else {
+        SubGroup = new NMonitoring::TDynamicCounters();
+    }
+    OverloadGranules = SubGroup->GetCounter(module + "/Granules/Overload", false);
+    CompactOverloadGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Overload/Count", true);
+    NoCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/No/Count", true);
+    SplitCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Split/Count", true);
+    InternalCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Internal/Count", true);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h
new file mode 100644
index 0000000000..7774f3c3aa
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/engine_logs.h
@@ -0,0 +1,19 @@
+#pragma once
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NKikimr::NColumnShard {
+
+class TEngineLogsCounters {
+private:
+    ::NMonitoring::TDynamicCounterPtr SubGroup;
+public:
+    NMonitoring::TDynamicCounters::TCounterPtr OverloadGranules;
+    NMonitoring::TDynamicCounters::TCounterPtr CompactOverloadGranulesSelection;
+    NMonitoring::TDynamicCounters::TCounterPtr NoCompactGranulesSelection;
+    NMonitoring::TDynamicCounters::TCounterPtr SplitCompactGranulesSelection;
+    NMonitoring::TDynamicCounters::TCounterPtr InternalCompactGranulesSelection;
+
+    TEngineLogsCounters();
+};
+
+}
diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp
index f2de68ebee..8eb7450ee9 100644
--- a/ydb/core/tx/columnshard/counters/indexation.cpp
+++ b/ydb/core/tx/columnshard/counters/indexation.cpp
@@ -10,21 +10,22 @@ TIndexationCounters::TIndexationCounters(const TString& module) {
     } else {
         SubGroup = new NMonitoring::TDynamicCounters();
     }
-    ReadBytes = SubGroup->GetCounter(module + "/ReadBytes", true);
-    AnalizeCompactedPortions = SubGroup->GetCounter(module + "/AnalizeCompactedPortions", true);
-    AnalizeInsertedPortions = SubGroup->GetCounter(module + "/AnalizeInsertedPortions", true);
-    RepackedInsertedPortions = SubGroup->GetCounter(module + "/RepackedInsertedPortions", true);
-    RepackedInsertedPortionBytes = SubGroup->GetCounter(module + "/RepackedInsertedPortionBytes", true);
-    SkipPortionsMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipPortionsMoveThroughIntersection", true);
-    SkipPortionBytesMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipPortionBytesMoveThroughIntersection", true);
-    RepackedCompactedPortions = SubGroup->GetCounter(module + "/RepackedCompactedPortions", true);
-    MovedPortions = SubGroup->GetCounter(module + "/MovedPortions", true);
-    MovedPortionBytes = SubGroup->GetCounter(module + "/MovedPortionBytes", true);
+    ReadBytes = SubGroup->GetCounter(module + "/Read/Bytes", true);
+    AnalizeInsertedPortions = SubGroup->GetCounter(module + "/AnalizeInsertion/Portions", true);
+    RepackedInsertedPortions = SubGroup->GetCounter(module + "/RepackedInsertion/Portions", true);
+    RepackedInsertedPortionBytes = SubGroup->GetCounter(module + "/RepackedInsertion/Bytes", true);
 
-    TrashDataSerializationBytes = SubGroup->GetCounter(module + "/TrashDataSerializationBytes", true);
-    TrashDataSerialization = SubGroup->GetCounter(module + "/TrashDataSerialization", true);
-    CorrectDataSerializationBytes = SubGroup->GetCounter(module + "/CorrectDataSerializationBytes", true);
-    CorrectDataSerialization = SubGroup->GetCounter(module + "/CorrectDataSerialization", true);
+    AnalizeCompactedPortions = SubGroup->GetCounter(module + "/AnalizeCompaction/Portions", true);
+    SkipPortionsMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Portions", true);
+    SkipPortionBytesMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Bytes", true);
+    RepackedCompactedPortions = SubGroup->GetCounter(module + "/RepackedCompaction/Portions", true);
+    MovedPortions = SubGroup->GetCounter(module + "/Moved/Portions", true);
+    MovedPortionBytes = SubGroup->GetCounter(module + "/Moved/Bytes", true);
+
+    TrashDataSerializationBytes = SubGroup->GetCounter(module + "/TrashDataSerialization/Bytes", true);
+    TrashDataSerialization = SubGroup->GetCounter(module + "/TrashDataSerialization/Count", true);
+    CorrectDataSerializationBytes = SubGroup->GetCounter(module + "/CorrectDataSerialization/Bytes", true);
+    CorrectDataSerialization = SubGroup->GetCounter(module + "/CorrectDataSerialization/Count", true);
 }
 
 }
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 02099110aa..1646b6e8d5 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -469,6 +469,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
     for (const auto& [_, portionInfo] : gi->second->Portions) {
         if (portionInfo.IsActive()) {
             changes->SwitchedPortions.push_back(portionInfo);
+            Y_VERIFY(portionInfo.Granule() == granule);
         }
     }
 
@@ -703,16 +704,21 @@ void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr
 
         // Size exceeds the configured limit. Mark granule as overloaded.
         if (size >= limits.GranuleOverloadSize) {
-            PathsGranulesOverloaded.emplace(pathId, granule);
+            if (PathsGranulesOverloaded[pathId].emplace(granule).second) {
+                AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "overloaded")("path_id", pathId)("granule", granule);
+            }
         } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
             // Size is under limit. Remove granule from the overloaded set.
-            pi->second.erase(granule);
+            if (pi->second.erase(granule)) {
+                AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "unoverloaded")("path_id", pathId)("granule", granule)("remained", pi->second.size());
+            }
             // Remove entry for the pathId if there it has no overloaded granules any more.
             if (pi->second.empty()) {
                 PathsGranulesOverloaded.erase(pi);
             }
         }
     }
+    SignalCounters.OverloadGranules->Set(PathsGranulesOverloaded.size());
 }
 
 bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
@@ -758,7 +764,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
 
     if (!ApplyChanges(db, *changes, snapshot, false)) { // validate only
         if (changes->IsCompaction()) {
-            // Return granule to Compation list. This is equal to single compaction worker behaviour.
+            // Return granule to Compaction list. This is equal to single compaction worker behaviour.
             for (const auto& portionInfo : changes->SwitchedPortions) {
                 CompactionGranules.insert(portionInfo.Granule());
             }
@@ -866,7 +872,11 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
 
             ui64 granule = portionInfo.Granule();
             ui64 portion = portionInfo.Portion();
-            if (!Granules.contains(granule) || !Granules[granule]->Portions.contains(portion)) {
+            if (!Granules.contains(granule)) {
+                LOG_S_DEBUG("Cannot update unknown granule " << granule << " at tablet " << TabletId);
+                return false;
+            }
+            if (!Granules[granule]->Portions.contains(portion)) {
                 LOG_S_ERROR("Cannot update unknown portion " << portionInfo << " at tablet " << TabletId);
                 return false;
             }
@@ -1315,38 +1325,58 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompaction
     ui64 granule = 0;
     bool inGranule = true;
 
-    for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty();) {
-        // Start from the beginning if the end is reached.
-        if (it == CompactionGranules.end()) {
-            it = CompactionGranules.begin();
+    if (PathsGranulesOverloaded.size()) {
+        {
+            auto it = PathsGranulesOverloaded.begin();
+            Y_VERIFY(it->second.size());
+            granule = *it->second.begin();
+            AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_overload")("path_id", it->first)("granules_count", it->second.size())("granule", granule)("paths_count", PathsGranulesOverloaded.size());
+            SignalCounters.CompactOverloadGranulesSelection->Add(1);
         }
 
-        const auto gi = Granules.find(*it);
-        // Check granule exists.
+        const auto gi = Granules.find(granule);
         Y_VERIFY(gi != Granules.end());
 
         bool inserted = false;
-        if (NeedSplit(gi->second->Portions, limits, inserted)) {
-            inGranule = false;
-            granule = *it;
-            CompactionGranules.erase(it);
-            break;
-        } else if (inserted) {
-            granule = *it;
-            CompactionGranules.erase(it);
-            break;
-        }
+        Y_VERIFY(NeedSplit(gi->second->Portions, limits, inserted));
+        inGranule = false;
+        Y_VERIFY(CompactionGranules.erase(granule));
+        SignalCounters.SplitCompactGranulesSelection->Add(1);
+    } else {
+        for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty() && !granule;) {
+            // Start from the beginning if the end is reached.
+            if (it == CompactionGranules.end()) {
+                it = CompactionGranules.begin();
+            }
 
-        // Nothing to compact in the current granule. Throw it.
-        it = CompactionGranules.erase(it);
-    }
+            const auto gi = Granules.find(*it);
+            // Check granule exists.
+            Y_VERIFY(gi != Granules.end());
+
+            bool inserted = false;
+            if (NeedSplit(gi->second->Portions, limits, inserted)) {
+                inGranule = false;
+                granule = *it;
+                SignalCounters.SplitCompactGranulesSelection->Add(1);
+            } else if (inserted) {
+                granule = *it;
+                SignalCounters.InternalCompactGranulesSelection->Add(1);
+            }
 
+            // Nothing to compact in the current granule. Throw it.
+            it = CompactionGranules.erase(it);
+        }
+        if (granule) {
+            lastCompactedGranule = granule;
+        }
+    }
     if (granule) {
         auto info = std::make_unique<TCompactionInfo>();
         info->Granules.insert(granule);
         info->InGranule = inGranule;
-        lastCompactedGranule = granule;
         return info;
+    } else {
+        SignalCounters.NoCompactGranulesSelection->Add(1);
     }
     return {};
 }
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 6736a395c5..409a6ae848 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -3,6 +3,7 @@
 #include "defs.h"
 #include "column_engine.h"
 #include "scalars.h"
+#include <ydb/core/tx/columnshard/counters/engine_logs.h>
 
 namespace NKikimr::NArrow {
 struct TSortDescription;
@@ -21,6 +22,8 @@ class TCountersTable;
 ///
 /// @note One instance per tablet.
 class TColumnEngineForLogs : public IColumnEngine {
+private:
+    const NColumnShard::TEngineLogsCounters SignalCounters;
 public:
     class TMarksGranules {
     public:
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index a3c42fd078..9eadd403d6 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -115,8 +115,13 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
             auto columnSaver = resultSchema->GetColumnSaver(name, saverContext);
             auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver, Counters);
             if (!blob.size()) {
+                Counters.TrashDataSerializationBytes->Add(blob.size());
+                Counters.TrashDataSerialization->Add(1);
                 ok = false;
                 break;
+            } else {
+                Counters.CorrectDataSerializationBytes->Add(blob.size());
+                Counters.CorrectDataSerialization->Add(1);
             }
 
             // TODO: combine small columns in one blob
@@ -316,11 +321,11 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TCompactionLogic::Comp
 std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const {
     const ui64 pathId = changes->SrcGranule->PathId;
     std::vector<TString> blobs;
-    auto& switchedProtions = changes->SwitchedPortions;
-    Y_VERIFY(switchedProtions.size());
+    auto& switchedPortions = changes->SwitchedPortions;
+    Y_VERIFY(switchedPortions.size());
 
-    ui64 granule = switchedProtions[0].Granule();
-    auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedProtions, changes->Blobs);
+    ui64 granule = switchedPortions[0].Granule();
+    auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedPortions, changes->Blobs);
 
     auto resultSchema = SchemaVersions.GetLastSchema();
     std::vector<TPortionInfo> portions;
@@ -714,6 +719,11 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
     return blobs;
 }
 
+bool TCompactionLogic::IsSplit(std::shared_ptr<TColumnEngineChanges> changes) {
+    auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes);
+    return !castedChanges->CompactionInfo->InGranule;
+}
+
 std::vector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const {
     Y_VERIFY(changes);
     Y_VERIFY(changes->CompactionInfo);
@@ -723,10 +733,11 @@ std::vector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChange
     Y_VERIFY(changes->AppendedPortions.empty());  // dst meta
 
     auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes);
-    if (castedChanges->CompactionInfo->InGranule) {
+    if (!IsSplit(castedChanges)) {
         return CompactInGranule(castedChanges);
+    } else {
+        return CompactSplitGranule(castedChanges);
     }
-    return CompactSplitGranule(castedChanges);
 }
 
 std::vector<TString> TEvictionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const {
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h
index 46ef19696b..dd5d896169 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.h
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h
@@ -71,6 +71,7 @@ public:
     using TIndexLogicBase::TIndexLogicBase;
 
     std::vector<TString> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const override;
+    static bool IsSplit(std::shared_ptr<TColumnEngineChanges> changes);
 
 private:
     std::vector<TString> CompactSplitGranule(const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) const;
-- 
cgit v1.2.3