diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-05 15:18:32 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-05 15:47:56 +0300 |
commit | b31702172dc2daefa2653d49c5eed1107126a5a8 (patch) | |
tree | 1464b077c3631a9b07de3b9ea11e745dd781570c | |
parent | 8ae4e32a94ece3ed932340bf2efe0677596ea2fe (diff) | |
download | ydb-b31702172dc2daefa2653d49c5eed1107126a5a8.tar.gz |
KIKIMR-18932:signal for blobs distribution by size
19 files changed, 304 insertions, 57 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index b1f9d682e9..b66e91be9a 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -47,7 +47,7 @@ struct TBlobBatch::TBatchInfo : TNonCopyable { InFlight.push_back(true); ++InFlightCount; TotalSizeBytes += blobSize; - return MakeBlobId(BlobSizes.size()-1); + return MakeBlobId(BlobSizes.size() - 1); } TUnifiedBlobId MakeBlobId(ui32 i) const { diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt index f9f50dfaa1..9f20f37bd4 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,12 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(tx-columnshard-common) target_link_libraries(tx-columnshard-common PUBLIC @@ -13,9 +19,16 @@ target_link_libraries(tx-columnshard-common PUBLIC yutil ydb-core-protos libs-apache-arrow + tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/portion.cpp +) +generate_enum_serilization(tx-columnshard-common + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/portion.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/common/portion.h ) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt index 6128aa8f0e..56fe56b09e 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt @@ -6,6 +6,12 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(tx-columnshard-common) target_link_libraries(tx-columnshard-common PUBLIC @@ -14,9 +20,16 @@ target_link_libraries(tx-columnshard-common PUBLIC yutil ydb-core-protos libs-apache-arrow + tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/portion.cpp +) +generate_enum_serilization(tx-columnshard-common + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/portion.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/common/portion.h ) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt index 6128aa8f0e..56fe56b09e 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt @@ -6,6 +6,12 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(tx-columnshard-common) target_link_libraries(tx-columnshard-common PUBLIC @@ -14,9 +20,16 @@ target_link_libraries(tx-columnshard-common PUBLIC yutil ydb-core-protos libs-apache-arrow + tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/portion.cpp +) +generate_enum_serilization(tx-columnshard-common + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/portion.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/common/portion.h ) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt index f9f50dfaa1..9f20f37bd4 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt @@ -6,6 +6,12 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(tx-columnshard-common) target_link_libraries(tx-columnshard-common PUBLIC @@ -13,9 +19,16 @@ target_link_libraries(tx-columnshard-common PUBLIC yutil ydb-core-protos libs-apache-arrow + tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/portion.cpp +) +generate_enum_serilization(tx-columnshard-common + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/portion.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/common/portion.h ) diff --git a/ydb/core/tx/columnshard/common/portion.cpp b/ydb/core/tx/columnshard/common/portion.cpp new file mode 100644 index 0000000000..0359f5905d --- /dev/null +++ b/ydb/core/tx/columnshard/common/portion.cpp @@ -0,0 +1,5 @@ +#include "portion.h" + +namespace NKikimr::NOlap::NPortion { + +} diff --git a/ydb/core/tx/columnshard/common/portion.h b/ydb/core/tx/columnshard/common/portion.h new file mode 100644 index 0000000000..3b828374dc --- /dev/null +++ b/ydb/core/tx/columnshard/common/portion.h @@ -0,0 +1,15 @@ +#pragma once +#include <util/system/types.h> + +namespace NKikimr::NOlap::NPortion { +// NOTE: These values are persisted in LocalDB so they must be stable +enum EProduced: ui32 { + UNSPECIFIED = 0, + INSERTED, + COMPACTED, + SPLIT_COMPACTED, + INACTIVE, + EVICTED +}; + +} diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index 36c5c82b5d..50e899dab0 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -4,6 +4,7 @@ SRCS( reverse_accessor.cpp scalars.cpp snapshot.cpp + portion.cpp ) PEERDIR( @@ -11,4 +12,6 @@ PEERDIR( contrib/libs/apache/arrow ) +GENERATE_ENUM_SERIALIZATION(portion.h) + END() diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp index 25b8451757..584efe3c58 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.cpp +++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp @@ -1,6 +1,7 @@ #include "engine_logs.h" #include <ydb/core/base/appdata.h> #include <ydb/core/base/counters.h> +#include <util/generic/serialized_enum.h> namespace NKikimr::NColumnShard { @@ -8,6 +9,19 @@ TEngineLogsCounters::TEngineLogsCounters() : TBase("EngineLogs") , GranuleDataAgent("EngineLogs") { + const std::map<i64, TString> borders = {{0, "0"}, {512 * 1024, "512kb"}, {1024 * 1024, "1Mb"}, + {2 * 1024 * 1024, "2Mb"}, {4 * 1024 * 1024, "4Mb"}, + {5 * 1024 * 1024, "5Mb"}, {6 * 1024 * 1024, "6Mb"}, + {7 * 1024 * 1024, "7Mb"}, {8 * 1024 * 1024, "8Mb"}}; + for (auto&& i : GetEnumNames<NOlap::NPortion::EProduced>()) { + if (BlobSizeDistribution.size() <= (ui32)i.first) { + BlobSizeDistribution.resize((ui32)i.first + 1); + } + BlobSizeDistribution[(ui32)i.first] = std::make_shared<TIncrementalHistogram>("EngineLogs", "BlobSizeDistribution", i.second, borders); + } + for (auto&& i : BlobSizeDistribution) { + Y_VERIFY(i); + } OverloadGranules = TBase::GetValue("Granules/Overload"); CompactOverloadGranulesSelection = TBase::GetDeriviative("Granules/Selection/Overload/Count"); NoCompactGranulesSelection = TBase::GetDeriviative("Granules/Selection/No/Count"); diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h index 94febbd3a5..34cce4c583 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.h +++ b/ydb/core/tx/columnshard/counters/engine_logs.h @@ -1,7 +1,9 @@ #pragma once #include "common/owner.h" +#include <ydb/core/tx/columnshard/common/portion.h> #include <library/cpp/monlib/dynamic_counters/counters.h> #include <util/string/builder.h> +#include <set> namespace NKikimr::NColumnShard { @@ -109,6 +111,105 @@ public: } }; +class TIncrementalHistogram: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + std::map<i64, NMonitoring::TDynamicCounters::TCounterPtr> Counters; + NMonitoring::TDynamicCounters::TCounterPtr PlusInf; + + NMonitoring::TDynamicCounters::TCounterPtr GetQuantile(const i64 value) const { + auto it = Counters.lower_bound(value); + if (it == Counters.end()) { + return PlusInf; + } else { + return it->second; + } + } +public: + + class TGuard { + private: + class TLineGuard { + private: + NMonitoring::TDynamicCounters::TCounterPtr Counter; + i64 Value = 0; + public: + TLineGuard(NMonitoring::TDynamicCounters::TCounterPtr counter) + : Counter(counter) + { + + } + + ~TLineGuard() { + Sub(Value); + } + + void Add(const i64 value) { + Counter->Add(value); + Value += value; + } + + void Sub(const i64 value) { + Counter->Sub(value); + Value -= value; + Y_VERIFY(Value >= 0); + } + }; + + std::map<i64, TLineGuard> Counters; + TLineGuard PlusInf; + + TLineGuard& GetLineGuard(const i64 value) { + auto it = Counters.lower_bound(value); + if (it == Counters.end()) { + return PlusInf; + } else { + return it->second; + } + } + public: + TGuard(const TIncrementalHistogram& owner) + : PlusInf(owner.PlusInf) + { + for (auto&& i : owner.Counters) { + Counters.emplace(i.first, TLineGuard(i.second)); + } + } + void Add(const i64 value) { + GetLineGuard(value).Add(value); + } + + void Sub(const i64 value) { + GetLineGuard(value).Sub(value); + } + }; + + std::shared_ptr<TGuard> BuildGuard() const { + return std::make_shared<TGuard>(*this); + } + + TIncrementalHistogram(const TString& moduleId, const TString& metricId, const TString& category, const std::map<i64, TString>& values) + : TBase(moduleId) + { + DeepSubGroup("metric", metricId); + if (category) { + DeepSubGroup("category", category); + } + std::optional<TString> predName; + for (auto&& i : values) { + if (!predName) { + Counters.emplace(i.first, TBase::GetValue("(-Inf," + i.second + "]")); + } else { + Counters.emplace(i.first, TBase::GetValue("(" + *predName + "," + i.second + "]")); + } + predName = i.second; + } + Y_VERIFY(predName); + PlusInf = TBase::GetValue("(" + *predName + ",+Inf)"); + } + +}; + class TEngineLogsCounters: public TCommonCountersOwner { private: using TBase = TCommonCountersOwner; @@ -125,6 +226,7 @@ private: NMonitoring::TDynamicCounters::TCounterPtr PortionNoBorderBytes; TAgentGranuleDataCounters GranuleDataAgent; + std::vector<std::shared_ptr<TIncrementalHistogram>> BlobSizeDistribution; public: NMonitoring::TDynamicCounters::TCounterPtr OverloadGranules; NMonitoring::TDynamicCounters::TCounterPtr CompactOverloadGranulesSelection; @@ -132,6 +234,33 @@ public: NMonitoring::TDynamicCounters::TCounterPtr SplitCompactGranulesSelection; NMonitoring::TDynamicCounters::TCounterPtr InternalCompactGranulesSelection; + class TPortionsInfoGuard { + private: + std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> Guards; + public: + TPortionsInfoGuard(const std::vector<std::shared_ptr<TIncrementalHistogram>>& distr) + { + for (auto&& i : distr) { + Guards.emplace_back(i->BuildGuard()); + } + } + + void OnNewBlob(const NOlap::NPortion::EProduced produced, const ui64 size) const { + Y_VERIFY((ui32)produced < Guards.size()); + Guards[(ui32)produced]->Add(size); + } + + void OnDropBlob(const NOlap::NPortion::EProduced produced, const ui64 size) const { + Y_VERIFY((ui32)produced < Guards.size()); + Guards[(ui32)produced]->Sub(size); + } + + }; + + TPortionsInfoGuard BuildPortionBlobsGuard() const { + return TPortionsInfoGuard(BlobSizeDistribution); + } + TGranuleDataCounters RegisterGranuleDataCounters() const { return GranuleDataAgent.RegisterClient(); } diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h index 23bbbfc18c..46facb8a77 100644 --- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h @@ -18,7 +18,7 @@ private: protected: virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; virtual TPortionMeta::EProduced GetResultProducedClass() const override { - return TPortionMeta::COMPACTED; + return TPortionMeta::EProduced::COMPACTED; } virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override; diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.h b/ydb/core/tx/columnshard/engines/changes/split_compaction.h index 41d2460b91..e47b50ddd7 100644 --- a/ydb/core/tx/columnshard/engines/changes/split_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.h @@ -22,7 +22,7 @@ private: protected: virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; virtual TPortionMeta::EProduced GetResultProducedClass() const override { - return TPortionMeta::SPLIT_COMPACTED; + return TPortionMeta::EProduced::SPLIT_COMPACTED; } virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override; diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 006e07b818..f337a0c166 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -19,21 +19,21 @@ void TChangesWithAppend::DoDebugString(TStringOutput& out) const { void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& /*context*/) { for (auto& portionInfo : AppendedPortions) { switch (portionInfo.Meta.Produced) { - case NOlap::TPortionMeta::UNSPECIFIED: + case NOlap::TPortionMeta::EProduced::UNSPECIFIED: Y_VERIFY(false); // unexpected - case NOlap::TPortionMeta::INSERTED: + case NOlap::TPortionMeta::EProduced::INSERTED: self.IncCounter(NColumnShard::COUNTER_INDEXING_PORTIONS_WRITTEN); break; - case NOlap::TPortionMeta::COMPACTED: + case NOlap::TPortionMeta::EProduced::COMPACTED: self.IncCounter(NColumnShard::COUNTER_COMPACTION_PORTIONS_WRITTEN); break; - case NOlap::TPortionMeta::SPLIT_COMPACTED: + case NOlap::TPortionMeta::EProduced::SPLIT_COMPACTED: self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_PORTIONS_WRITTEN); break; - case NOlap::TPortionMeta::EVICTED: + case NOlap::TPortionMeta::EProduced::EVICTED: Y_FAIL("Unexpected evicted case"); break; - case NOlap::TPortionMeta::INACTIVE: + case NOlap::TPortionMeta::EProduced::INACTIVE: Y_FAIL("Unexpected inactive case"); break; } @@ -98,7 +98,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange void TChangesWithAppend::DoCompile(TFinalizationContext& context) { for (auto&& i : AppendedPortions) { i.SetPortion(context.NextPortionId()); - i.UpdateRecordsMeta(TPortionMeta::INSERTED); + i.UpdateRecordsMeta(TPortionMeta::EProduced::INSERTED); } } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 1f48fce0fa..761bf724a8 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -97,7 +97,7 @@ struct TSelectInfo { class TColumnEngineStats { private: static constexpr const ui64 NUM_KINDS = 5; - static_assert(NUM_KINDS == NOlap::TPortionMeta::EVICTED, "NUM_KINDS must match NOlap::TPortionMeta::EProduced enum"); + static_assert(NUM_KINDS == NOlap::TPortionMeta::EProduced::EVICTED, "NUM_KINDS must match NOlap::TPortionMeta::EProduced enum"); public: class TPortionsStats { private: @@ -178,7 +178,7 @@ public: std::vector<ui32> GetKinds() const { std::vector<ui32> result; for (auto&& i : GetEnumAllValues<NOlap::TPortionMeta::EProduced>()) { - if (i == NOlap::TPortionMeta::UNSPECIFIED) { + if (i == NOlap::TPortionMeta::EProduced::UNSPECIFIED) { continue; } result.emplace_back(i); @@ -190,7 +190,7 @@ public: std::vector<T> GetValues(const TAccessor accessor) const { std::vector<T> result; for (auto&& i : GetEnumAllValues<NOlap::TPortionMeta::EProduced>()) { - if (i == NOlap::TPortionMeta::UNSPECIFIED) { + if (i == NOlap::TPortionMeta::EProduced::UNSPECIFIED) { continue; } result.emplace_back(accessor(GetStats(i))); @@ -202,7 +202,7 @@ public: T GetValuesSum(const TAccessor accessor) const { T result = 0; for (auto&& i : GetEnumAllValues<NOlap::TPortionMeta::EProduced>()) { - if (i == NOlap::TPortionMeta::UNSPECIFIED) { + if (i == NOlap::TPortionMeta::EProduced::UNSPECIFIED) { continue; } result += accessor(GetStats(i)); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 9c9011bec1..9670b34dba 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -74,7 +74,7 @@ void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std: auto column = batch->GetColumnByName(columnName); Y_VERIFY(column); - bool isSorted = (columnId == Meta.FirstPkColumn); + const bool isSorted = (columnId == Meta.FirstPkColumn); AddMinMax(columnId, column, isSorted); Y_VERIFY(Meta.HasMinMax(columnId)); } @@ -101,21 +101,21 @@ TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const { auto* portionMeta = meta.MutablePortionMeta(); switch (Meta.Produced) { - case TPortionMeta::UNSPECIFIED: + case TPortionMeta::EProduced::UNSPECIFIED: Y_VERIFY(false); - case TPortionMeta::INSERTED: + case TPortionMeta::EProduced::INSERTED: portionMeta->SetIsInserted(true); break; - case TPortionMeta::COMPACTED: + case TPortionMeta::EProduced::COMPACTED: portionMeta->SetIsCompacted(true); break; - case TPortionMeta::SPLIT_COMPACTED: + case TPortionMeta::EProduced::SPLIT_COMPACTED: portionMeta->SetIsSplitCompacted(true); break; - case TPortionMeta::EVICTED: + case TPortionMeta::EProduced::EVICTED: portionMeta->SetIsEvicted(true); break; - case TPortionMeta::INACTIVE: + case TPortionMeta::EProduced::INACTIVE: Y_FAIL("Unexpected inactive case"); //portionMeta->SetInactive(true); break; @@ -158,13 +158,13 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord TierName = portionMeta.GetTierName(); if (portionMeta.GetIsInserted()) { - Meta.Produced = TPortionMeta::INSERTED; + Meta.Produced = TPortionMeta::EProduced::INSERTED; } else if (portionMeta.GetIsCompacted()) { - Meta.Produced = TPortionMeta::COMPACTED; + Meta.Produced = TPortionMeta::EProduced::COMPACTED; } else if (portionMeta.GetIsSplitCompacted()) { - Meta.Produced = TPortionMeta::SPLIT_COMPACTED; + Meta.Produced = TPortionMeta::EProduced::SPLIT_COMPACTED; } else if (portionMeta.GetIsEvicted()) { - Meta.Produced = TPortionMeta::EVICTED; + Meta.Produced = TPortionMeta::EProduced::EVICTED; } if (portionMeta.HasPrimaryKeyBorders()) { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 3058404e21..ab2ab68182 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -4,6 +4,7 @@ #include <ydb/core/formats/arrow/replace_key.h> #include <ydb/core/formats/arrow/serializer/abstract.h> #include <ydb/core/formats/arrow/dictionary/conversion.h> +#include <ydb/core/tx/columnshard/common/portion.h> #include <ydb/core/tx/columnshard/counters/indexation.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> @@ -12,15 +13,7 @@ namespace NKikimr::NOlap { struct TPortionMeta { - // NOTE: These values are persisted in LocalDB so they must be stable - enum EProduced : ui32 { - UNSPECIFIED = 0, - INSERTED = 1, - COMPACTED = 2, - SPLIT_COMPACTED = 3, - INACTIVE = 4, - EVICTED = 5, - }; + using EProduced = NPortion::EProduced; struct TColumnMeta { ui32 NumRows{0}; @@ -33,7 +26,11 @@ struct TPortionMeta { } }; - EProduced Produced{UNSPECIFIED}; + EProduced GetProduced() const { + return Produced; + } + + EProduced Produced{EProduced::UNSPECIFIED}; THashMap<ui32, TColumnMeta> ColumnMeta; ui32 FirstPkColumn = 0; std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows @@ -113,11 +110,11 @@ public: TString TierName; bool Empty() const { return Records.empty(); } - bool Produced() const { return Meta.Produced != TPortionMeta::UNSPECIFIED; } + bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; } bool Valid() const { return MinSnapshot.Valid() && Granule && Portion && !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && Granule && Portion; } - bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; } - bool IsEvicted() const { return Meta.Produced == TPortionMeta::EVICTED; } + bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; } + bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; } bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ } bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); } size_t NumRecords() const { return Records.size(); } @@ -163,15 +160,15 @@ public: } bool AllowEarlyFilter() const { - return Meta.Produced == TPortionMeta::COMPACTED - || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; + return Meta.GetProduced() == TPortionMeta::EProduced::COMPACTED + || Meta.GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED; } bool EvictReady(size_t hotSize) const { - return Meta.Produced == TPortionMeta::COMPACTED - || Meta.Produced == TPortionMeta::SPLIT_COMPACTED - || Meta.Produced == TPortionMeta::EVICTED - || (Meta.Produced == TPortionMeta::INSERTED && BlobsSizes().first >= hotSize); + return Meta.GetProduced() == TPortionMeta::EProduced::COMPACTED + || Meta.GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED + || Meta.GetProduced() == TPortionMeta::EProduced::EVICTED + || (Meta.GetProduced() == TPortionMeta::EProduced::INSERTED && BlobsSizes().first >= hotSize); } ui64 GetPortion() const { diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index e8166a9634..d2f970d4e6 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -55,14 +55,19 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) { void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec) { auto it = Portions.find(portion.GetPortion()); if (it == Portions.end()) { - it = Portions.emplace(portion.GetPortion(), portion).first; + auto portionNew = portion; + portionNew.AddRecord(indexInfo, rec); + OnBeforeChangePortion(nullptr, &portionNew); + Portions.emplace(portion.GetPortion(), std::move(portionNew)); + OnAfterChangePortion(); + } else { + Y_VERIFY(it->second.IsEqualWithSnapshots(portion)); + auto portionNew = it->second; + portionNew.AddRecord(indexInfo, rec); + OnBeforeChangePortion(&it->second, &portionNew); + it->second = std::move(portionNew); + OnAfterChangePortion(); } - Y_VERIFY(it->second.IsEqualWithSnapshots(portion)); - auto portionNew = it->second; - portionNew.AddRecord(indexInfo, rec); - OnBeforeChangePortion(&it->second, &portionNew); - it->second = std::move(portionNew); - OnAfterChangePortion(); } void TGranuleMeta::OnAfterChangePortion() { @@ -72,6 +77,24 @@ void TGranuleMeta::OnAfterChangePortion() { void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, const TPortionInfo* portionAfter) { HardSummaryCache = {}; + if (portionBefore) { + THashMap<TUnifiedBlobId, ui64> blobIdSize; + for (auto&& i : portionBefore->Records) { + blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size; + } + for (auto&& i : blobIdSize) { + PortionInfoGuard.OnDropBlob(portionBefore->IsActive() ? portionBefore->Meta.Produced : NPortion::EProduced::INACTIVE, i.second); + } + } + if (portionAfter) { + THashMap<TUnifiedBlobId, ui64> blobIdSize; + for (auto&& i : portionAfter->Records) { + blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size; + } + for (auto&& i : blobIdSize) { + PortionInfoGuard.OnNewBlob(portionAfter->IsActive() ? portionAfter->Meta.Produced : NPortion::EProduced::INACTIVE, i.second); + } + } if (!!AdditiveSummaryCache) { auto g = AdditiveSummaryCache->StartEdit(Counters); if (portionBefore && portionBefore->IsActive()) { @@ -169,4 +192,13 @@ const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary( return *AdditiveSummaryCache; } +TGranuleMeta::TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters) + : Owner(owner) + , Counters(counters) + , PortionInfoGuard(Owner->GetCounters().BuildPortionBlobsGuard()) + , Record(rec) +{ + +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 37f14eef22..e2f123acbf 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -288,6 +288,7 @@ private: mutable bool AllowInsertionFlag = false; std::shared_ptr<TGranulesStorage> Owner; const NColumnShard::TGranuleDataCounters Counters; + NColumnShard::TEngineLogsCounters::TPortionsInfoGuard PortionInfoGuard; void OnBeforeChangePortion(const TPortionInfo* portionBefore, const TPortionInfo* portionAfter); void OnAfterChangePortion(); @@ -374,12 +375,7 @@ public: bool ErasePortion(const ui64 portion); - explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters) - : Owner(owner) - , Counters(counters) - , Record(rec) - { - } + explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters); ui64 GetGranuleId() const { return Record.Granule; diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h index 83e10713df..6a4f8d16b8 100644 --- a/ydb/core/tx/columnshard/engines/storage/storage.h +++ b/ydb/core/tx/columnshard/engines/storage/storage.h @@ -34,6 +34,10 @@ public: } + const NColumnShard::TEngineLogsCounters& GetCounters() const { + return Counters; + } + class TModificationGuard: TNonCopyable { private: TGranulesStorage& Owner; |