aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-06-04 12:44:04 +0300
committerGitHub <noreply@github.com>2025-06-04 12:44:04 +0300
commitb5e6006e5537fb59101c12ae85113f5101ee9eef (patch)
tree87f962df6961c148b5d57acfa6e76798e513420c
parent529fc6d7b505033223fa01db2f561d837be7937b (diff)
downloadydb-b5e6006e5537fb59101c12ae85113f5101ee9eef.tar.gz
log expectations for split-processing control (#19212)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
-rw-r--r--ydb/core/formats/arrow/save_load/loader.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp46
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.h9
-rw-r--r--ydb/core/tx/columnshard/splitter/column_info.h9
-rw-r--r--ydb/core/tx/columnshard/splitter/settings.h12
-rw-r--r--ydb/library/formats/arrow/splitter/similar_packer.cpp10
-rw-r--r--ydb/library/formats/arrow/splitter/stats.h56
-rw-r--r--ydb/library/formats/arrow/splitter/ya.make2
9 files changed, 120 insertions, 26 deletions
diff --git a/ydb/core/formats/arrow/save_load/loader.cpp b/ydb/core/formats/arrow/save_load/loader.cpp
index 55896ba01d7..68f66d46066 100644
--- a/ydb/core/formats/arrow/save_load/loader.cpp
+++ b/ydb/core/formats/arrow/save_load/loader.cpp
@@ -70,6 +70,7 @@ std::optional<NSplitter::TSimpleSerializationStat> TColumnLoader::TryBuildColumn
if constexpr (switcher.IsCType) {
using CType = typename decltype(switcher)::ValueType;
result = NSplitter::TSimpleSerializationStat(std::max<ui32>(1, sizeof(CType) / 2), 1, sizeof(CType));
+ result->SetOrigination(NSplitter::TSimpleSerializationStat::EOrigination::Loader);
}
return true;
});
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
index 8fdc8c8e4c2..2e3e2297fb9 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
@@ -24,6 +24,7 @@ TString TColumnEngineChanges::DebugString() const {
}
TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& context) noexcept {
+ const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("task_id", GetTaskIdentifier())("task_class", TypeString());
Y_ABORT_UNLESS(Stage == NChanges::EStage::Started);
context.Counters.CompactionInputSize(Blobs.GetTotalBlobsSize());
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
index 3a1e3571bde..46e8fdccb4a 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
@@ -21,10 +21,15 @@ private:
YDB_READONLY_DEF(std::vector<i64>, ChunkRecordsCount);
YDB_READONLY_DEF(std::vector<std::shared_ptr<IPortionDataChunk>>, ResultChunks);
ui32 ChunksReady = 0;
+ const std::optional<NArrow::NSplitter::TSimpleSerializationStat> Stats;
+ const NSplitter::TSplitSettings Settings;
public:
- TColumnSplitInfo(const ui32 columnId)
- : ColumnId(columnId) {
+ TColumnSplitInfo(
+ const ui32 columnId, const std::optional<NArrow::NSplitter::TSimpleSerializationStat>& stats, const NSplitter::TSplitSettings& settings)
+ : ColumnId(columnId)
+ , Stats(stats)
+ , Settings(settings) {
}
void SetChunks(std::vector<i64>&& recordsCount) {
@@ -43,6 +48,10 @@ public:
ui32 checkRecordsCount = 0;
for (auto&& i : chunks) {
checkRecordsCount += i->GetRecordsCountVerified();
+ if (chunks.size() > 1) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_COMPACTION)("settings", Settings.DebugString())(
+ "stats", Stats ? Stats->DebugString() : TString("no_stats"))("column_id", ColumnId)("packed", i->GetPackedSize());
+ }
}
AFL_VERIFY(checkRecordsCount == ChunkRecordsCount[ChunksReady])("check", checkRecordsCount)("split", ChunkRecordsCount[ChunksReady]);
++ChunksReady;
@@ -92,8 +101,9 @@ public:
it->second.AddColumnChunks(chunks);
}
- void AddColumnSplitting(const ui32 columnId, std::vector<i64>&& chunks) {
- auto it = Columns.emplace(columnId, columnId);
+ void AddColumnSplitting(const ui32 columnId, std::vector<i64>&& chunks,
+ const std::optional<NArrow::NSplitter::TSimpleSerializationStat>& stats, const NSplitter::TSplitSettings& settings) {
+ auto it = Columns.emplace(columnId, TColumnSplitInfo(columnId, stats, settings));
AFL_VERIFY(it.second);
it.first->second.SetChunks(std::move(chunks));
AFL_VERIFY(RecordsCount == it.first->second.GetRecordsCount())("new", it.first->second.GetRecordsCount())("control", RecordsCount);
@@ -102,10 +112,17 @@ public:
class TSplittedBatch {
private:
+ enum class ESplittingType : ui32 {
+ Stats = 0,
+ RecordsCount
+ };
+
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Remapper);
YDB_READONLY_DEF(std::vector<TPortionSplitInfo>, Portions);
const std::shared_ptr<NArrow::NSplitter::TSerializationStats> Stats;
const std::shared_ptr<TFilteredSnapshotSchema> ResultFiltered;
+ const NSplitter::TSplitSettings Settings;
+ std::optional<ESplittingType> SplittingType;
public:
ui32 GetRecordsCount() const {
@@ -118,8 +135,22 @@ public:
std::vector<TGeneralSerializedSlice> BuildSlices(const std::shared_ptr<NColumnShard::TSplitterCounters>& counters) const {
std::vector<TGeneralSerializedSlice> result;
+ bool needWarnLog = false;
for (auto&& i : Portions) {
result.emplace_back(i.BuildSlice(ResultFiltered, Stats, counters));
+ if (Portions.size() > 1 && (result.back().GetPackedSize() < 0.5 * Settings.GetMaxPortionSize() ||
+ result.back().GetPackedSize() > 2 * (ui64)Settings.GetMaxPortionSize())) {
+ needWarnLog = true;
+ }
+ }
+ if (needWarnLog) {
+ auto batchStats = Stats->GetStatsForColumns(ResultFiltered->GetColumnIds(), false);
+ for (auto&& i : result) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_COMPACTION)("p_size", i.GetPackedSize())(
+ "expected_size", batchStats ? batchStats->PredictPackedSize(i.GetRecordsCount()) : 0)(
+ "s_type", SplittingType ? (ui32)*SplittingType : 999999)("settings", Settings.DebugString())("count", Portions.size())(
+ "r_count", i.GetRecordsCount())("b_stats", batchStats ? batchStats->DebugString() : TString("NO"));
+ }
}
return result;
}
@@ -128,13 +159,16 @@ public:
const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const NSplitter::TSplitSettings& settings)
: Remapper(std::move(remapper))
, Stats(stats)
- , ResultFiltered(resultFiltered) {
+ , ResultFiltered(resultFiltered)
+ , Settings(settings) {
AFL_VERIFY(Remapper);
const std::vector<i64> recordsCount = [&]() {
auto batchStatsOpt = stats->GetStatsForColumns(resultFiltered->GetColumnIds(), false);
if (batchStatsOpt) {
+ SplittingType = ESplittingType::Stats;
return batchStatsOpt->SplitRecordsForBlobSize(Remapper->num_rows(), settings.GetExpectedPortionSize());
} else {
+ SplittingType = ESplittingType::RecordsCount;
return NArrow::NSplitter::TSimilarPacker::SplitWithExpected(Remapper->num_rows(), settings.GetExpectedPortionRecordsCount());
}
}();
@@ -158,7 +192,7 @@ public:
chunks = colStatsOpt->SplitRecords(
p, settings.GetExpectedRecordsCountOnPage(), settings.GetExpectedBlobPage(), settings.GetMaxBlobSize() * 0.9);
}
- Portions.back().AddColumnSplitting(c, std::move(chunks));
+ Portions.back().AddColumnSplitting(c, std::move(chunks), colStatsOpt, settings);
}
}
AFL_VERIFY(recordsCountCursor == Remapper->num_rows())("cursor", recordsCountCursor)("length", Remapper->num_rows());
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h
index 76965539adc..9edccdb0d6f 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.h
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.h
@@ -103,6 +103,15 @@ public:
return arrow::RecordBatch::Make(pkSchema, 2, pkColumns);
}
+ ui64 GetPackedSize() const {
+ ui64 result = 0;
+ for (auto&& i : Data) {
+ result += i.GetPackedSize();
+ }
+ AFL_VERIFY(Size == result)("size", Size)("result", result);
+ return result;
+ }
+
ui64 GetSize() const {
return Size;
}
diff --git a/ydb/core/tx/columnshard/splitter/column_info.h b/ydb/core/tx/columnshard/splitter/column_info.h
index 5ae0373baf2..2afe91348e4 100644
--- a/ydb/core/tx/columnshard/splitter/column_info.h
+++ b/ydb/core/tx/columnshard/splitter/column_info.h
@@ -33,6 +33,15 @@ public:
AFL_VERIFY(EntityId);
}
+ ui64 GetPackedSize() const {
+ ui64 result = 0;
+ for (auto&& i : Chunks) {
+ result += i->GetPackedSize();
+ }
+ AFL_VERIFY(Size == result)("size", Size)("result", result);
+ return result;
+ }
+
class TEntityChunk {
private:
YDB_READONLY_DEF(TPositiveControlInteger, Size);
diff --git a/ydb/core/tx/columnshard/splitter/settings.h b/ydb/core/tx/columnshard/splitter/settings.h
index debc004a0df..5a21dc17073 100644
--- a/ydb/core/tx/columnshard/splitter/settings.h
+++ b/ydb/core/tx/columnshard/splitter/settings.h
@@ -28,6 +28,18 @@ private:
YDB_ACCESSOR(i64, MaxPortionSize, DefaultMaxPortionSize);
public:
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "{";
+ sb << "max_bs=" << MaxBlobSize << ";";
+ sb << "min_bs=" << MinBlobSize << ";";
+ sb << "bs_tlrn=" << BlobSizeTolerance << ";";
+ sb << "min_rc=" << MinRecordsCount << ";";
+ sb << "max_ps=" << MaxPortionSize << ";";
+ sb << "}";
+ return sb;
+ }
+
ui64 GetExpectedBlobPage() const {
return ((ui64)512) << 10;
}
diff --git a/ydb/library/formats/arrow/splitter/similar_packer.cpp b/ydb/library/formats/arrow/splitter/similar_packer.cpp
index 156da99656f..2e6dc59a5f1 100644
--- a/ydb/library/formats/arrow/splitter/similar_packer.cpp
+++ b/ydb/library/formats/arrow/splitter/similar_packer.cpp
@@ -56,12 +56,18 @@ std::vector<i64> TSimilarPacker::SplitWithExpected(
if (count <= expectation) {
return { count };
}
- const i64 partsCount = 1.0 * count / expectation;
+ // count > 2 * alpha * (partsCountBase + 1) - condition to use partsCountBase with no correction (+1)
+ const i64 alpha = count % expectation;
+ const i64 partsCountBase = 1.0 * count / expectation;
+ const bool cond = (2 * alpha * (partsCountBase + 1) < count);
+ const i64 partsCount = partsCountBase + (cond ? 0 : 1);
+
const i64 partResultCount = count / partsCount;
const i64 sumResultCount = partsCount * partResultCount;
AFL_VERIFY(sumResultCount <= count)("count", count)("expect", sumResultCount)("propose", partResultCount);
const ui32 delta = count - sumResultCount;
- AFL_VERIFY(delta < partResultCount);
+ AFL_VERIFY(delta < partsCount)("delta", delta)("part_size", partResultCount)("expectation", expectation)("count", count)(
+ "parts_count", partsCount);
std::vector<i64> result(partsCount - delta, partResultCount);
for (ui32 i = 0; i < delta; ++i) {
result.emplace_back(partResultCount + 1);
diff --git a/ydb/library/formats/arrow/splitter/stats.h b/ydb/library/formats/arrow/splitter/stats.h
index 2de398bf452..10e44b9c8f6 100644
--- a/ydb/library/formats/arrow/splitter/stats.h
+++ b/ydb/library/formats/arrow/splitter/stats.h
@@ -1,36 +1,46 @@
#pragma once
#include <ydb/library/accessor/accessor.h>
-
#include <ydb/library/actors/core/log.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-#include <optional>
-#include <map>
#include <deque>
-#include <string>
+#include <map>
#include <memory>
+#include <optional>
+#include <string>
namespace NKikimr::NArrow::NSplitter {
class TSimpleSerializationStat {
+public:
+ enum class EOrigination : ui32 {
+ Statistic = 0,
+ Loader,
+ Mix
+ };
+
protected:
ui64 SerializedBytes = 0;
ui64 RecordsCount = 0;
ui64 RawBytes = 0;
+ YDB_ACCESSOR(EOrigination, Origination, EOrigination::Statistic);
+
public:
TSimpleSerializationStat() = default;
TSimpleSerializationStat(const ui64 bytes, const ui64 recordsCount, const ui64 rawBytes);
- std::vector<i64> SplitRecords(const ui32 recordsCount, const ui32 expectedRecordsCount, const ui32 expectedColumnPageSize, const ui32 maxBlobSize);
+ std::vector<i64> SplitRecords(
+ const ui32 recordsCount, const ui32 expectedRecordsCount, const ui32 expectedColumnPageSize, const ui32 maxBlobSize);
TString DebugString() const {
return TStringBuilder() << "{"
- << "serialized_bytes=" << SerializedBytes << ";"
- << "records=" << RecordsCount << ";"
- << "raw_bytes=" << RawBytes << ";"
- << "}";
- }
+ << "serialized_bytes=" << SerializedBytes << ";"
+ << "records=" << RecordsCount << ";"
+ << "raw_bytes=" << RawBytes << ";"
+ << "origination=" << ::ToString(Origination) << ";"
+ << "}";
+ };
double GetSerializedBytesPerRecord() const {
AFL_VERIFY(RecordsCount);
@@ -41,10 +51,10 @@ public:
return 1.0 * RawBytes / RecordsCount;
}
- ui64 GetSerializedBytes() const{
+ ui64 GetSerializedBytes() const {
return SerializedBytes;
}
- ui64 GetRecordsCount() const{
+ ui64 GetRecordsCount() const {
return RecordsCount;
}
ui64 GetRawBytes() const {
@@ -52,6 +62,9 @@ public:
}
void AddStat(const TSimpleSerializationStat& stat) {
+ if (stat.GetOrigination() != GetOrigination()) {
+ Origination = EOrigination::Mix;
+ }
SerializedBytes += stat.SerializedBytes;
RecordsCount += stat.RecordsCount;
RawBytes += stat.RawBytes;
@@ -79,6 +92,10 @@ public:
RawBytesPerRecord = 1.0 * rawBytes / recordsCount;
}
+ ui64 PredictPackedSize(const ui32 recordsCount) const {
+ return SerializedBytesPerRecord * recordsCount;
+ }
+
TString DebugString() const {
return TStringBuilder() << "{sbpr=" << SerializedBytesPerRecord << ";rbpr=" << RawBytesPerRecord << "}";
}
@@ -125,11 +142,11 @@ private:
using TBase = TSimpleSerializationStat;
YDB_READONLY(ui32, ColumnId, 0);
YDB_READONLY_DEF(std::string, ColumnName);
+
public:
TColumnSerializationStat(const ui32 columnId, const std::string& columnName)
: ColumnId(columnId)
, ColumnName(columnName) {
-
}
double GetPackedRecordSize() const {
@@ -138,7 +155,8 @@ public:
TColumnSerializationStat RecalcForRecordsCount(const ui64 recordsCount) const {
TColumnSerializationStat result(ColumnId, ColumnName);
- result.Merge(TSimpleSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount, RawBytes / RecordsCount * recordsCount));
+ result.Merge(
+ TSimpleSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount, RawBytes / RecordsCount * recordsCount));
return result;
}
@@ -158,6 +176,7 @@ private:
std::deque<TColumnSerializationStat> ColumnStat;
std::map<ui32, TColumnSerializationStat*> StatsByColumnId;
std::map<std::string, TColumnSerializationStat*> StatsByColumnName;
+
public:
TString DebugString() const {
TStringBuilder sb;
@@ -179,8 +198,10 @@ public:
auto it = StatsByColumnId.find(info.GetColumnId());
if (it == StatsByColumnId.end()) {
ColumnStat.emplace_back(info);
- AFL_VERIFY(StatsByColumnId.emplace(info.GetColumnId(), &ColumnStat.back()).second)("column_id", info.GetColumnId())("column_name", info.GetColumnName());
- AFL_VERIFY(StatsByColumnName.emplace(info.GetColumnName(), &ColumnStat.back()).second)("column_id", info.GetColumnId())("column_name", info.GetColumnName());
+ AFL_VERIFY(StatsByColumnId.emplace(info.GetColumnId(), &ColumnStat.back()).second)("column_id", info.GetColumnId())(
+ "column_name", info.GetColumnName());
+ AFL_VERIFY(StatsByColumnName.emplace(info.GetColumnName(), &ColumnStat.back()).second)("column_id", info.GetColumnId())(
+ "column_name", info.GetColumnName());
} else {
it->second->Merge(info);
}
@@ -226,5 +247,4 @@ public:
return result;
}
};
-
-}
+} // namespace NKikimr::NArrow::NSplitter
diff --git a/ydb/library/formats/arrow/splitter/ya.make b/ydb/library/formats/arrow/splitter/ya.make
index 0031a75c115..35353c96cd8 100644
--- a/ydb/library/formats/arrow/splitter/ya.make
+++ b/ydb/library/formats/arrow/splitter/ya.make
@@ -11,4 +11,6 @@ PEERDIR(
ydb/library/conclusion
)
+GENERATE_ENUM_SERIALIZATION(stats.h)
+
END()