diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-27 09:00:32 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-27 09:00:32 +0300 |
commit | 7f0c50f5391d6bb23e61f598c1332c05bd2f9439 (patch) | |
tree | 5ceef2d5a1701288bf15d36d6833efe0f43db4a0 | |
parent | 92529fe7ede3280622c6a21f3f50b303bd0ca765 (diff) | |
download | ydb-7f0c50f5391d6bb23e61f598c1332c05bd2f9439.tar.gz |
additional signals
fix overload list filling
overload priority temporary for experiments
14 files changed, 163 insertions, 64 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index dfb3f88835..563b3957d3 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -28,7 +28,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID, IndexationCounters)); CompactionActor = ctx.Register( - CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS, CompactionCounters), + CreateCompactionActor(TabletID(), ctx.SelfID, TSettings::MAX_ACTIVE_COMPACTIONS), // Default mail-box and batch pool. TMailboxType::HTSwap, AppData(ctx)->BatchPoolId); EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID, EvictionCounters)); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 7c3788993c..7bfc833709 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -25,7 +25,7 @@ namespace NKikimr::NColumnShard { extern bool gAllowLogBatchingDefaultValue; IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters); -IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers, const TIndexationCounters& counters); +IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers); IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters); IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, @@ -384,7 +384,6 @@ private: const TScanCounters ReadCounters; const TScanCounters ScanCounters; const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation"); - const TIndexationCounters CompactionCounters = TIndexationCounters("Compaction"); const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction"); diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index ea6cc4d366..b08984a1b1 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -11,15 +11,15 @@ namespace { class TCompactionActor: public TActorBootstrapped<TCompactionActor> { private: - const TIndexationCounters Counters; + const TIndexationCounters InternalCounters = TIndexationCounters("InternalCompaction"); + const TIndexationCounters SplitCounters = TIndexationCounters("SplitCompaction"); public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::TX_COLUMNSHARD_COMPACTION_ACTOR; } - TCompactionActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters) - : Counters(counters) - , TabletId(tabletId) + TCompactionActor(ui64 tabletId, const TActorId& parent) + : TabletId(tabletId) , Parent(parent) , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) { } @@ -27,10 +27,10 @@ public: void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& /*ctx*/) { Y_VERIFY(!TxEvent); Y_VERIFY(Blobs.empty() && !NumRead); - LastActivationTime = TAppData::TimeProvider->Now(); auto& event = *ev->Get(); TxEvent = std::move(event.TxEvent); + IsSplitCurrently = NOlap::TCompactionLogic::IsSplit(TxEvent->IndexChanges); auto& indexChanges = TxEvent->IndexChanges; Y_VERIFY(indexChanges); @@ -44,7 +44,7 @@ public: for (const auto& blobRange : ranges) { Y_VERIFY(blobId == blobRange.BlobId); Blobs[blobRange] = {}; - Counters.ReadBytes->Add(blobRange.Size); + GetCurrentCounters().ReadBytes->Add(blobRange.Size); } SendReadRequest(std::move(ranges), event.Externals.contains(blobId)); } @@ -67,6 +67,11 @@ public: TString blobData = event.Data; Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size); Blobs[blobId] = blobData; + } else if (event.Status == NKikimrProto::EReplyStatus::NODATA) { + Y_ASSERT(false); + LOG_S_WARN("TEvReadBlobRangeResult cannot get blob " + << blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet " + << TabletId << " (compaction)"); } else { LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet " @@ -77,8 +82,7 @@ public: } } - ++NumRead; - if (NumRead == Blobs.size()) { + if (++NumRead == Blobs.size()) { CompactGranules(ctx); Clear(); } @@ -99,6 +103,7 @@ public: } private: + bool IsSplitCurrently = false; ui64 TabletId; TActorId Parent; TActorId BlobCacheActorId; @@ -113,6 +118,14 @@ private: NumRead = 0; } + const TIndexationCounters& GetCurrentCounters() const { + if (IsSplitCurrently) { + return SplitCounters; + } else { + return InternalCounters; + } + } + void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) { Y_VERIFY(!ranges.empty()); @@ -134,7 +147,7 @@ private: TxEvent->IndexChanges->SetBlobs(std::move(Blobs)); - NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters); + NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, GetCurrentCounters()); TxEvent->Blobs = compactionLogic.Apply(TxEvent->IndexChanges); if (TxEvent->Blobs.empty()) { TxEvent->PutStatus = NKikimrProto::OK; // nothing to write, commit @@ -150,16 +163,13 @@ private: }; class TCompactionGroupActor: public TActorBootstrapped<TCompactionGroupActor> { -private: - const TIndexationCounters Counters; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::TX_COLUMNSHARD_COMPACTION_ACTOR; } - TCompactionGroupActor(ui64 tabletId, const TActorId& parent, const ui64 size, const TIndexationCounters& counters) - : Counters(counters) - , TabletId(tabletId) + TCompactionGroupActor(ui64 tabletId, const TActorId& parent, const ui64 size) + : TabletId(tabletId) , Parent(parent) , Idle(size == 0 ? 1 : size) { @@ -169,7 +179,7 @@ public: Become(&TThis::StateWait); for (auto& worker : Idle) { - worker = ctx.Register(new TCompactionActor(TabletId, ctx.SelfID, Counters)); + worker = ctx.Register(new TCompactionActor(TabletId, ctx.SelfID)); } } @@ -225,8 +235,8 @@ private: } // namespace -IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers, const TIndexationCounters& counters) { - return new TCompactionGroupActor(tabletId, parent, workers, counters); +IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers) { + return new TCompactionGroupActor(tabletId, parent, workers); } } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt index 9632aa025c..10c00092fd 100644 --- a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt @@ -17,4 +17,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC target_sources(tx-columnshard-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp ) diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt index 6efe68a424..0502f0daa8 100644 --- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt @@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC target_sources(tx-columnshard-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp ) diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt index 6efe68a424..0502f0daa8 100644 --- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt @@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC target_sources(tx-columnshard-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp ) diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt index 9632aa025c..10c00092fd 100644 --- a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt @@ -17,4 +17,5 @@ target_link_libraries(tx-columnshard-counters PUBLIC target_sources(tx-columnshard-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/scan.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/engine_logs.cpp ) diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp new file mode 100644 index 0000000000..b855fcff8c --- /dev/null +++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp @@ -0,0 +1,21 @@ +#include "engine_logs.h" +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/counters.h> + +namespace NKikimr::NColumnShard { + +TEngineLogsCounters::TEngineLogsCounters() { + const TString module = "EngineLogs"; + if (NActors::TlsActivationContext) { + SubGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard"); + } else { + SubGroup = new NMonitoring::TDynamicCounters(); + } + OverloadGranules = SubGroup->GetCounter(module + "/Granules/Overload", false); + CompactOverloadGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Overload/Count", true); + NoCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/No/Count", true); + SplitCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Split/Count", true); + InternalCompactGranulesSelection = SubGroup->GetCounter(module + "/Granules/Selection/Internal/Count", true); +} + +} diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h new file mode 100644 index 0000000000..7774f3c3aa --- /dev/null +++ b/ydb/core/tx/columnshard/counters/engine_logs.h @@ -0,0 +1,19 @@ +#pragma once +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NKikimr::NColumnShard { + +class TEngineLogsCounters { +private: + ::NMonitoring::TDynamicCounterPtr SubGroup; +public: + NMonitoring::TDynamicCounters::TCounterPtr OverloadGranules; + NMonitoring::TDynamicCounters::TCounterPtr CompactOverloadGranulesSelection; + NMonitoring::TDynamicCounters::TCounterPtr NoCompactGranulesSelection; + NMonitoring::TDynamicCounters::TCounterPtr SplitCompactGranulesSelection; + NMonitoring::TDynamicCounters::TCounterPtr InternalCompactGranulesSelection; + + TEngineLogsCounters(); +}; + +} diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp index f2de68ebee..8eb7450ee9 100644 --- a/ydb/core/tx/columnshard/counters/indexation.cpp +++ b/ydb/core/tx/columnshard/counters/indexation.cpp @@ -10,21 +10,22 @@ TIndexationCounters::TIndexationCounters(const TString& module) { } else { SubGroup = new NMonitoring::TDynamicCounters(); } - ReadBytes = SubGroup->GetCounter(module + "/ReadBytes", true); - AnalizeCompactedPortions = SubGroup->GetCounter(module + "/AnalizeCompactedPortions", true); - AnalizeInsertedPortions = SubGroup->GetCounter(module + "/AnalizeInsertedPortions", true); - RepackedInsertedPortions = SubGroup->GetCounter(module + "/RepackedInsertedPortions", true); - RepackedInsertedPortionBytes = SubGroup->GetCounter(module + "/RepackedInsertedPortionBytes", true); - SkipPortionsMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipPortionsMoveThroughIntersection", true); - SkipPortionBytesMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipPortionBytesMoveThroughIntersection", true); - RepackedCompactedPortions = SubGroup->GetCounter(module + "/RepackedCompactedPortions", true); - MovedPortions = SubGroup->GetCounter(module + "/MovedPortions", true); - MovedPortionBytes = SubGroup->GetCounter(module + "/MovedPortionBytes", true); + ReadBytes = SubGroup->GetCounter(module + "/Read/Bytes", true); + AnalizeInsertedPortions = SubGroup->GetCounter(module + "/AnalizeInsertion/Portions", true); + RepackedInsertedPortions = SubGroup->GetCounter(module + "/RepackedInsertion/Portions", true); + RepackedInsertedPortionBytes = SubGroup->GetCounter(module + "/RepackedInsertion/Bytes", true); - TrashDataSerializationBytes = SubGroup->GetCounter(module + "/TrashDataSerializationBytes", true); - TrashDataSerialization = SubGroup->GetCounter(module + "/TrashDataSerialization", true); - CorrectDataSerializationBytes = SubGroup->GetCounter(module + "/CorrectDataSerializationBytes", true); - CorrectDataSerialization = SubGroup->GetCounter(module + "/CorrectDataSerialization", true); + AnalizeCompactedPortions = SubGroup->GetCounter(module + "/AnalizeCompaction/Portions", true); + SkipPortionsMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Portions", true); + SkipPortionBytesMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Bytes", true); + RepackedCompactedPortions = SubGroup->GetCounter(module + "/RepackedCompaction/Portions", true); + MovedPortions = SubGroup->GetCounter(module + "/Moved/Portions", true); + MovedPortionBytes = SubGroup->GetCounter(module + "/Moved/Bytes", true); + + TrashDataSerializationBytes = SubGroup->GetCounter(module + "/TrashDataSerialization/Bytes", true); + TrashDataSerialization = SubGroup->GetCounter(module + "/TrashDataSerialization/Count", true); + CorrectDataSerializationBytes = SubGroup->GetCounter(module + "/CorrectDataSerialization/Bytes", true); + CorrectDataSerialization = SubGroup->GetCounter(module + "/CorrectDataSerialization/Count", true); } } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 02099110aa..1646b6e8d5 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -469,6 +469,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: for (const auto& [_, portionInfo] : gi->second->Portions) { if (portionInfo.IsActive()) { changes->SwitchedPortions.push_back(portionInfo); + Y_VERIFY(portionInfo.Granule() == granule); } } @@ -703,16 +704,21 @@ void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr // Size exceeds the configured limit. Mark granule as overloaded. if (size >= limits.GranuleOverloadSize) { - PathsGranulesOverloaded.emplace(pathId, granule); + if (PathsGranulesOverloaded[pathId].emplace(granule).second) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "overloaded")("path_id", pathId)("granule", granule); + } } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { // Size is under limit. Remove granule from the overloaded set. - pi->second.erase(granule); + if (pi->second.erase(granule)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "unoverloaded")("path_id", pathId)("granule", granule)("remained", pi->second.size()); + } // Remove entry for the pathId if there it has no overloaded granules any more. if (pi->second.empty()) { PathsGranulesOverloaded.erase(pi); } } } + SignalCounters.OverloadGranules->Set(PathsGranulesOverloaded.size()); } bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, @@ -758,7 +764,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE if (!ApplyChanges(db, *changes, snapshot, false)) { // validate only if (changes->IsCompaction()) { - // Return granule to Compation list. This is equal to single compaction worker behaviour. + // Return granule to Compaction list. This is equal to single compaction worker behaviour. for (const auto& portionInfo : changes->SwitchedPortions) { CompactionGranules.insert(portionInfo.Granule()); } @@ -866,7 +872,11 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, ui64 granule = portionInfo.Granule(); ui64 portion = portionInfo.Portion(); - if (!Granules.contains(granule) || !Granules[granule]->Portions.contains(portion)) { + if (!Granules.contains(granule)) { + LOG_S_DEBUG("Cannot update unknown granule " << granule << " at tablet " << TabletId); + return false; + } + if (!Granules[granule]->Portions.contains(portion)) { LOG_S_ERROR("Cannot update unknown portion " << portionInfo << " at tablet " << TabletId); return false; } @@ -1315,38 +1325,58 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompaction ui64 granule = 0; bool inGranule = true; - for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty();) { - // Start from the beginning if the end is reached. - if (it == CompactionGranules.end()) { - it = CompactionGranules.begin(); + if (PathsGranulesOverloaded.size()) { + { + auto it = PathsGranulesOverloaded.begin(); + Y_VERIFY(it->second.size()); + granule = *it->second.begin(); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_overload")("path_id", it->first)("granules_count", it->second.size())("granule", granule)("paths_count", PathsGranulesOverloaded.size()); + SignalCounters.CompactOverloadGranulesSelection->Add(1); } - const auto gi = Granules.find(*it); - // Check granule exists. + const auto gi = Granules.find(granule); Y_VERIFY(gi != Granules.end()); bool inserted = false; - if (NeedSplit(gi->second->Portions, limits, inserted)) { - inGranule = false; - granule = *it; - CompactionGranules.erase(it); - break; - } else if (inserted) { - granule = *it; - CompactionGranules.erase(it); - break; - } + Y_VERIFY(NeedSplit(gi->second->Portions, limits, inserted)); + inGranule = false; + Y_VERIFY(CompactionGranules.erase(granule)); + SignalCounters.SplitCompactGranulesSelection->Add(1); + } else { + for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty() && !granule;) { + // Start from the beginning if the end is reached. + if (it == CompactionGranules.end()) { + it = CompactionGranules.begin(); + } - // Nothing to compact in the current granule. Throw it. - it = CompactionGranules.erase(it); - } + const auto gi = Granules.find(*it); + // Check granule exists. + Y_VERIFY(gi != Granules.end()); + + bool inserted = false; + if (NeedSplit(gi->second->Portions, limits, inserted)) { + inGranule = false; + granule = *it; + SignalCounters.SplitCompactGranulesSelection->Add(1); + } else if (inserted) { + granule = *it; + SignalCounters.InternalCompactGranulesSelection->Add(1); + } + // Nothing to compact in the current granule. Throw it. + it = CompactionGranules.erase(it); + } + if (granule) { + lastCompactedGranule = granule; + } + } if (granule) { auto info = std::make_unique<TCompactionInfo>(); info->Granules.insert(granule); info->InGranule = inGranule; - lastCompactedGranule = granule; return info; + } else { + SignalCounters.NoCompactGranulesSelection->Add(1); } return {}; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 6736a395c5..409a6ae848 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -3,6 +3,7 @@ #include "defs.h" #include "column_engine.h" #include "scalars.h" +#include <ydb/core/tx/columnshard/counters/engine_logs.h> namespace NKikimr::NArrow { struct TSortDescription; @@ -21,6 +22,8 @@ class TCountersTable; /// /// @note One instance per tablet. class TColumnEngineForLogs : public IColumnEngine { +private: + const NColumnShard::TEngineLogsCounters SignalCounters; public: class TMarksGranules { public: diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index a3c42fd078..9eadd403d6 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -115,8 +115,13 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI auto columnSaver = resultSchema->GetColumnSaver(name, saverContext); auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver, Counters); if (!blob.size()) { + Counters.TrashDataSerializationBytes->Add(blob.size()); + Counters.TrashDataSerialization->Add(1); ok = false; break; + } else { + Counters.CorrectDataSerializationBytes->Add(blob.size()); + Counters.CorrectDataSerialization->Add(1); } // TODO: combine small columns in one blob @@ -316,11 +321,11 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TCompactionLogic::Comp std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const { const ui64 pathId = changes->SrcGranule->PathId; std::vector<TString> blobs; - auto& switchedProtions = changes->SwitchedPortions; - Y_VERIFY(switchedProtions.size()); + auto& switchedPortions = changes->SwitchedPortions; + Y_VERIFY(switchedPortions.size()); - ui64 granule = switchedProtions[0].Granule(); - auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedProtions, changes->Blobs); + ui64 granule = switchedPortions[0].Granule(); + auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedPortions, changes->Blobs); auto resultSchema = SchemaVersions.GetLastSchema(); std::vector<TPortionInfo> portions; @@ -714,6 +719,11 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr return blobs; } +bool TCompactionLogic::IsSplit(std::shared_ptr<TColumnEngineChanges> changes) { + auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes); + return !castedChanges->CompactionInfo->InGranule; +} + std::vector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const { Y_VERIFY(changes); Y_VERIFY(changes->CompactionInfo); @@ -723,10 +733,11 @@ std::vector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChange Y_VERIFY(changes->AppendedPortions.empty()); // dst meta auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes); - if (castedChanges->CompactionInfo->InGranule) { + if (!IsSplit(castedChanges)) { return CompactInGranule(castedChanges); + } else { + return CompactSplitGranule(castedChanges); } - return CompactSplitGranule(castedChanges); } std::vector<TString> TEvictionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const { diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h index 46ef19696b..dd5d896169 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.h +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h @@ -71,6 +71,7 @@ public: using TIndexLogicBase::TIndexLogicBase; std::vector<TString> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const override; + static bool IsSplit(std::shared_ptr<TColumnEngineChanges> changes); private: std::vector<TString> CompactSplitGranule(const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) const; |