diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-06-04 12:44:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-04 12:44:04 +0300 |
commit | b5e6006e5537fb59101c12ae85113f5101ee9eef (patch) | |
tree | 87f962df6961c148b5d57acfa6e76798e513420c | |
parent | 529fc6d7b505033223fa01db2f561d837be7937b (diff) | |
download | ydb-b5e6006e5537fb59101c12ae85113f5101ee9eef.tar.gz |
log expectations for split-processing control (#19212)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
-rw-r--r-- | ydb/core/formats/arrow/save_load/loader.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp | 46 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/batch_slice.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/column_info.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/settings.h | 12 | ||||
-rw-r--r-- | ydb/library/formats/arrow/splitter/similar_packer.cpp | 10 | ||||
-rw-r--r-- | ydb/library/formats/arrow/splitter/stats.h | 56 | ||||
-rw-r--r-- | ydb/library/formats/arrow/splitter/ya.make | 2 |
9 files changed, 120 insertions, 26 deletions
diff --git a/ydb/core/formats/arrow/save_load/loader.cpp b/ydb/core/formats/arrow/save_load/loader.cpp index 55896ba01d7..68f66d46066 100644 --- a/ydb/core/formats/arrow/save_load/loader.cpp +++ b/ydb/core/formats/arrow/save_load/loader.cpp @@ -70,6 +70,7 @@ std::optional<NSplitter::TSimpleSerializationStat> TColumnLoader::TryBuildColumn if constexpr (switcher.IsCType) { using CType = typename decltype(switcher)::ValueType; result = NSplitter::TSimpleSerializationStat(std::max<ui32>(1, sizeof(CType) / 2), 1, sizeof(CType)); + result->SetOrigination(NSplitter::TSimpleSerializationStat::EOrigination::Loader); } return true; }); diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp index 8fdc8c8e4c2..2e3e2297fb9 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp @@ -24,6 +24,7 @@ TString TColumnEngineChanges::DebugString() const { } TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& context) noexcept { + const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("task_id", GetTaskIdentifier())("task_class", TypeString()); Y_ABORT_UNLESS(Stage == NChanges::EStage::Started); context.Counters.CompactionInputSize(Blobs.GetTotalBlobsSize()); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index 3a1e3571bde..46e8fdccb4a 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -21,10 +21,15 @@ private: YDB_READONLY_DEF(std::vector<i64>, ChunkRecordsCount); YDB_READONLY_DEF(std::vector<std::shared_ptr<IPortionDataChunk>>, ResultChunks); ui32 ChunksReady = 0; + const std::optional<NArrow::NSplitter::TSimpleSerializationStat> Stats; + const NSplitter::TSplitSettings Settings; public: - TColumnSplitInfo(const ui32 columnId) - : ColumnId(columnId) { + TColumnSplitInfo( + const ui32 columnId, const std::optional<NArrow::NSplitter::TSimpleSerializationStat>& stats, const NSplitter::TSplitSettings& settings) + : ColumnId(columnId) + , Stats(stats) + , Settings(settings) { } void SetChunks(std::vector<i64>&& recordsCount) { @@ -43,6 +48,10 @@ public: ui32 checkRecordsCount = 0; for (auto&& i : chunks) { checkRecordsCount += i->GetRecordsCountVerified(); + if (chunks.size() > 1) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_COMPACTION)("settings", Settings.DebugString())( + "stats", Stats ? Stats->DebugString() : TString("no_stats"))("column_id", ColumnId)("packed", i->GetPackedSize()); + } } AFL_VERIFY(checkRecordsCount == ChunkRecordsCount[ChunksReady])("check", checkRecordsCount)("split", ChunkRecordsCount[ChunksReady]); ++ChunksReady; @@ -92,8 +101,9 @@ public: it->second.AddColumnChunks(chunks); } - void AddColumnSplitting(const ui32 columnId, std::vector<i64>&& chunks) { - auto it = Columns.emplace(columnId, columnId); + void AddColumnSplitting(const ui32 columnId, std::vector<i64>&& chunks, + const std::optional<NArrow::NSplitter::TSimpleSerializationStat>& stats, const NSplitter::TSplitSettings& settings) { + auto it = Columns.emplace(columnId, TColumnSplitInfo(columnId, stats, settings)); AFL_VERIFY(it.second); it.first->second.SetChunks(std::move(chunks)); AFL_VERIFY(RecordsCount == it.first->second.GetRecordsCount())("new", it.first->second.GetRecordsCount())("control", RecordsCount); @@ -102,10 +112,17 @@ public: class TSplittedBatch { private: + enum class ESplittingType : ui32 { + Stats = 0, + RecordsCount + }; + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Remapper); YDB_READONLY_DEF(std::vector<TPortionSplitInfo>, Portions); const std::shared_ptr<NArrow::NSplitter::TSerializationStats> Stats; const std::shared_ptr<TFilteredSnapshotSchema> ResultFiltered; + const NSplitter::TSplitSettings Settings; + std::optional<ESplittingType> SplittingType; public: ui32 GetRecordsCount() const { @@ -118,8 +135,22 @@ public: std::vector<TGeneralSerializedSlice> BuildSlices(const std::shared_ptr<NColumnShard::TSplitterCounters>& counters) const { std::vector<TGeneralSerializedSlice> result; + bool needWarnLog = false; for (auto&& i : Portions) { result.emplace_back(i.BuildSlice(ResultFiltered, Stats, counters)); + if (Portions.size() > 1 && (result.back().GetPackedSize() < 0.5 * Settings.GetMaxPortionSize() || + result.back().GetPackedSize() > 2 * (ui64)Settings.GetMaxPortionSize())) { + needWarnLog = true; + } + } + if (needWarnLog) { + auto batchStats = Stats->GetStatsForColumns(ResultFiltered->GetColumnIds(), false); + for (auto&& i : result) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_COMPACTION)("p_size", i.GetPackedSize())( + "expected_size", batchStats ? batchStats->PredictPackedSize(i.GetRecordsCount()) : 0)( + "s_type", SplittingType ? (ui32)*SplittingType : 999999)("settings", Settings.DebugString())("count", Portions.size())( + "r_count", i.GetRecordsCount())("b_stats", batchStats ? batchStats->DebugString() : TString("NO")); + } } return result; } @@ -128,13 +159,16 @@ public: const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const NSplitter::TSplitSettings& settings) : Remapper(std::move(remapper)) , Stats(stats) - , ResultFiltered(resultFiltered) { + , ResultFiltered(resultFiltered) + , Settings(settings) { AFL_VERIFY(Remapper); const std::vector<i64> recordsCount = [&]() { auto batchStatsOpt = stats->GetStatsForColumns(resultFiltered->GetColumnIds(), false); if (batchStatsOpt) { + SplittingType = ESplittingType::Stats; return batchStatsOpt->SplitRecordsForBlobSize(Remapper->num_rows(), settings.GetExpectedPortionSize()); } else { + SplittingType = ESplittingType::RecordsCount; return NArrow::NSplitter::TSimilarPacker::SplitWithExpected(Remapper->num_rows(), settings.GetExpectedPortionRecordsCount()); } }(); @@ -158,7 +192,7 @@ public: chunks = colStatsOpt->SplitRecords( p, settings.GetExpectedRecordsCountOnPage(), settings.GetExpectedBlobPage(), settings.GetMaxBlobSize() * 0.9); } - Portions.back().AddColumnSplitting(c, std::move(chunks)); + Portions.back().AddColumnSplitting(c, std::move(chunks), colStatsOpt, settings); } } AFL_VERIFY(recordsCountCursor == Remapper->num_rows())("cursor", recordsCountCursor)("length", Remapper->num_rows()); diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index 76965539adc..9edccdb0d6f 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -103,6 +103,15 @@ public: return arrow::RecordBatch::Make(pkSchema, 2, pkColumns); } + ui64 GetPackedSize() const { + ui64 result = 0; + for (auto&& i : Data) { + result += i.GetPackedSize(); + } + AFL_VERIFY(Size == result)("size", Size)("result", result); + return result; + } + ui64 GetSize() const { return Size; } diff --git a/ydb/core/tx/columnshard/splitter/column_info.h b/ydb/core/tx/columnshard/splitter/column_info.h index 5ae0373baf2..2afe91348e4 100644 --- a/ydb/core/tx/columnshard/splitter/column_info.h +++ b/ydb/core/tx/columnshard/splitter/column_info.h @@ -33,6 +33,15 @@ public: AFL_VERIFY(EntityId); } + ui64 GetPackedSize() const { + ui64 result = 0; + for (auto&& i : Chunks) { + result += i->GetPackedSize(); + } + AFL_VERIFY(Size == result)("size", Size)("result", result); + return result; + } + class TEntityChunk { private: YDB_READONLY_DEF(TPositiveControlInteger, Size); diff --git a/ydb/core/tx/columnshard/splitter/settings.h b/ydb/core/tx/columnshard/splitter/settings.h index debc004a0df..5a21dc17073 100644 --- a/ydb/core/tx/columnshard/splitter/settings.h +++ b/ydb/core/tx/columnshard/splitter/settings.h @@ -28,6 +28,18 @@ private: YDB_ACCESSOR(i64, MaxPortionSize, DefaultMaxPortionSize); public: + TString DebugString() const { + TStringBuilder sb; + sb << "{"; + sb << "max_bs=" << MaxBlobSize << ";"; + sb << "min_bs=" << MinBlobSize << ";"; + sb << "bs_tlrn=" << BlobSizeTolerance << ";"; + sb << "min_rc=" << MinRecordsCount << ";"; + sb << "max_ps=" << MaxPortionSize << ";"; + sb << "}"; + return sb; + } + ui64 GetExpectedBlobPage() const { return ((ui64)512) << 10; } diff --git a/ydb/library/formats/arrow/splitter/similar_packer.cpp b/ydb/library/formats/arrow/splitter/similar_packer.cpp index 156da99656f..2e6dc59a5f1 100644 --- a/ydb/library/formats/arrow/splitter/similar_packer.cpp +++ b/ydb/library/formats/arrow/splitter/similar_packer.cpp @@ -56,12 +56,18 @@ std::vector<i64> TSimilarPacker::SplitWithExpected( if (count <= expectation) { return { count }; } - const i64 partsCount = 1.0 * count / expectation; + // count > 2 * alpha * (partsCountBase + 1) - condition to use partsCountBase with no correction (+1) + const i64 alpha = count % expectation; + const i64 partsCountBase = 1.0 * count / expectation; + const bool cond = (2 * alpha * (partsCountBase + 1) < count); + const i64 partsCount = partsCountBase + (cond ? 0 : 1); + const i64 partResultCount = count / partsCount; const i64 sumResultCount = partsCount * partResultCount; AFL_VERIFY(sumResultCount <= count)("count", count)("expect", sumResultCount)("propose", partResultCount); const ui32 delta = count - sumResultCount; - AFL_VERIFY(delta < partResultCount); + AFL_VERIFY(delta < partsCount)("delta", delta)("part_size", partResultCount)("expectation", expectation)("count", count)( + "parts_count", partsCount); std::vector<i64> result(partsCount - delta, partResultCount); for (ui32 i = 0; i < delta; ++i) { result.emplace_back(partResultCount + 1); diff --git a/ydb/library/formats/arrow/splitter/stats.h b/ydb/library/formats/arrow/splitter/stats.h index 2de398bf452..10e44b9c8f6 100644 --- a/ydb/library/formats/arrow/splitter/stats.h +++ b/ydb/library/formats/arrow/splitter/stats.h @@ -1,36 +1,46 @@ #pragma once #include <ydb/library/accessor/accessor.h> - #include <ydb/library/actors/core/log.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> -#include <optional> -#include <map> #include <deque> -#include <string> +#include <map> #include <memory> +#include <optional> +#include <string> namespace NKikimr::NArrow::NSplitter { class TSimpleSerializationStat { +public: + enum class EOrigination : ui32 { + Statistic = 0, + Loader, + Mix + }; + protected: ui64 SerializedBytes = 0; ui64 RecordsCount = 0; ui64 RawBytes = 0; + YDB_ACCESSOR(EOrigination, Origination, EOrigination::Statistic); + public: TSimpleSerializationStat() = default; TSimpleSerializationStat(const ui64 bytes, const ui64 recordsCount, const ui64 rawBytes); - std::vector<i64> SplitRecords(const ui32 recordsCount, const ui32 expectedRecordsCount, const ui32 expectedColumnPageSize, const ui32 maxBlobSize); + std::vector<i64> SplitRecords( + const ui32 recordsCount, const ui32 expectedRecordsCount, const ui32 expectedColumnPageSize, const ui32 maxBlobSize); TString DebugString() const { return TStringBuilder() << "{" - << "serialized_bytes=" << SerializedBytes << ";" - << "records=" << RecordsCount << ";" - << "raw_bytes=" << RawBytes << ";" - << "}"; - } + << "serialized_bytes=" << SerializedBytes << ";" + << "records=" << RecordsCount << ";" + << "raw_bytes=" << RawBytes << ";" + << "origination=" << ::ToString(Origination) << ";" + << "}"; + }; double GetSerializedBytesPerRecord() const { AFL_VERIFY(RecordsCount); @@ -41,10 +51,10 @@ public: return 1.0 * RawBytes / RecordsCount; } - ui64 GetSerializedBytes() const{ + ui64 GetSerializedBytes() const { return SerializedBytes; } - ui64 GetRecordsCount() const{ + ui64 GetRecordsCount() const { return RecordsCount; } ui64 GetRawBytes() const { @@ -52,6 +62,9 @@ public: } void AddStat(const TSimpleSerializationStat& stat) { + if (stat.GetOrigination() != GetOrigination()) { + Origination = EOrigination::Mix; + } SerializedBytes += stat.SerializedBytes; RecordsCount += stat.RecordsCount; RawBytes += stat.RawBytes; @@ -79,6 +92,10 @@ public: RawBytesPerRecord = 1.0 * rawBytes / recordsCount; } + ui64 PredictPackedSize(const ui32 recordsCount) const { + return SerializedBytesPerRecord * recordsCount; + } + TString DebugString() const { return TStringBuilder() << "{sbpr=" << SerializedBytesPerRecord << ";rbpr=" << RawBytesPerRecord << "}"; } @@ -125,11 +142,11 @@ private: using TBase = TSimpleSerializationStat; YDB_READONLY(ui32, ColumnId, 0); YDB_READONLY_DEF(std::string, ColumnName); + public: TColumnSerializationStat(const ui32 columnId, const std::string& columnName) : ColumnId(columnId) , ColumnName(columnName) { - } double GetPackedRecordSize() const { @@ -138,7 +155,8 @@ public: TColumnSerializationStat RecalcForRecordsCount(const ui64 recordsCount) const { TColumnSerializationStat result(ColumnId, ColumnName); - result.Merge(TSimpleSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount, RawBytes / RecordsCount * recordsCount)); + result.Merge( + TSimpleSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount, RawBytes / RecordsCount * recordsCount)); return result; } @@ -158,6 +176,7 @@ private: std::deque<TColumnSerializationStat> ColumnStat; std::map<ui32, TColumnSerializationStat*> StatsByColumnId; std::map<std::string, TColumnSerializationStat*> StatsByColumnName; + public: TString DebugString() const { TStringBuilder sb; @@ -179,8 +198,10 @@ public: auto it = StatsByColumnId.find(info.GetColumnId()); if (it == StatsByColumnId.end()) { ColumnStat.emplace_back(info); - AFL_VERIFY(StatsByColumnId.emplace(info.GetColumnId(), &ColumnStat.back()).second)("column_id", info.GetColumnId())("column_name", info.GetColumnName()); - AFL_VERIFY(StatsByColumnName.emplace(info.GetColumnName(), &ColumnStat.back()).second)("column_id", info.GetColumnId())("column_name", info.GetColumnName()); + AFL_VERIFY(StatsByColumnId.emplace(info.GetColumnId(), &ColumnStat.back()).second)("column_id", info.GetColumnId())( + "column_name", info.GetColumnName()); + AFL_VERIFY(StatsByColumnName.emplace(info.GetColumnName(), &ColumnStat.back()).second)("column_id", info.GetColumnId())( + "column_name", info.GetColumnName()); } else { it->second->Merge(info); } @@ -226,5 +247,4 @@ public: return result; } }; - -} +} // namespace NKikimr::NArrow::NSplitter diff --git a/ydb/library/formats/arrow/splitter/ya.make b/ydb/library/formats/arrow/splitter/ya.make index 0031a75c115..35353c96cd8 100644 --- a/ydb/library/formats/arrow/splitter/ya.make +++ b/ydb/library/formats/arrow/splitter/ya.make @@ -11,4 +11,6 @@ PEERDIR( ydb/library/conclusion ) +GENERATE_ENUM_SERIALIZATION(stats.h) + END() |