diff options
author | ivanmorozov <[email protected]> | 2023-05-30 07:40:19 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-05-30 07:40:19 +0300 |
commit | a4992c4fc472bb3eb09a8174645f791c84d6dcfb (patch) | |
tree | 7d7f1b49422bdd52d9fc689d44a91be419fe7da4 | |
parent | b71fb3a796618a5a1f3ad735fc385c0653336007 (diff) |
trash-data-serialization optimize experiment (1.2 -> 0.44)
additional signals
-rw-r--r-- | ydb/core/tx/columnshard/counters/indexation.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/indexation.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_logic_logs.cpp | 43 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/storage/granule.h | 3 |
9 files changed, 57 insertions, 12 deletions
diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp index 8eb7450ee9a..0b6be350ac0 100644 --- a/ydb/core/tx/columnshard/counters/indexation.cpp +++ b/ydb/core/tx/columnshard/counters/indexation.cpp @@ -12,10 +12,12 @@ TIndexationCounters::TIndexationCounters(const TString& module) { } ReadBytes = SubGroup->GetCounter(module + "/Read/Bytes", true); AnalizeInsertedPortions = SubGroup->GetCounter(module + "/AnalizeInsertion/Portions", true); + AnalizeInsertedBytes = SubGroup->GetCounter(module + "/AnalizeInsertion/Bytes", true); RepackedInsertedPortions = SubGroup->GetCounter(module + "/RepackedInsertion/Portions", true); RepackedInsertedPortionBytes = SubGroup->GetCounter(module + "/RepackedInsertion/Bytes", true); AnalizeCompactedPortions = SubGroup->GetCounter(module + "/AnalizeCompaction/Portions", true); + AnalizeCompactedBytes = SubGroup->GetCounter(module + "/AnalizeCompaction/Bytes", true); SkipPortionsMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Portions", true); SkipPortionBytesMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Bytes", true); RepackedCompactedPortions = SubGroup->GetCounter(module + "/RepackedCompaction/Portions", true); @@ -26,6 +28,8 @@ TIndexationCounters::TIndexationCounters(const TString& module) { TrashDataSerialization = SubGroup->GetCounter(module + "/TrashDataSerialization/Count", true); CorrectDataSerializationBytes = SubGroup->GetCounter(module + "/CorrectDataSerialization/Bytes", true); CorrectDataSerialization = SubGroup->GetCounter(module + "/CorrectDataSerialization/Count", true); + + SplittedPortionsSize = SubGroup->GetHistogram(module + "/Histogram/SplittedPortionsSize", NMonitoring::ExponentialHistogram(15, 2, 1024)); } } diff --git a/ydb/core/tx/columnshard/counters/indexation.h b/ydb/core/tx/columnshard/counters/indexation.h index a5db7ff395d..bf8c8d37b90 100644 --- a/ydb/core/tx/columnshard/counters/indexation.h +++ b/ydb/core/tx/columnshard/counters/indexation.h @@ -10,6 +10,8 @@ public: NMonitoring::TDynamicCounters::TCounterPtr ReadBytes; NMonitoring::TDynamicCounters::TCounterPtr AnalizeCompactedPortions; NMonitoring::TDynamicCounters::TCounterPtr AnalizeInsertedPortions; + NMonitoring::TDynamicCounters::TCounterPtr AnalizeCompactedBytes; + NMonitoring::TDynamicCounters::TCounterPtr AnalizeInsertedBytes; NMonitoring::TDynamicCounters::TCounterPtr RepackedInsertedPortions; NMonitoring::TDynamicCounters::TCounterPtr RepackedInsertedPortionBytes; NMonitoring::TDynamicCounters::TCounterPtr SkipPortionsMoveThroughIntersection; @@ -23,6 +25,8 @@ public: NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerializationBytes; NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerialization; + NMonitoring::THistogramPtr SplittedPortionsSize; + TIndexationCounters(const TString& module); }; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 5503cef08a4..20b01766685 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -753,7 +753,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, GranulesTable->Write(db, rec); } } - + auto g = GranulesStorage->StartPackModification(); // Update old portions (set stale snapshot) if (switchedPortions) { diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 69ce114e3a2..233f69c66cb 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -272,7 +272,11 @@ std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<TString } std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const { - return ArrowSchema()->GetFieldByName(GetColumnName(columnId, true)); + auto it = ArrowColumnByColumnIdCache.find(columnId); + if (it == ArrowColumnByColumnIdCache.end()) { + it = ArrowColumnByColumnIdCache.emplace(columnId, ArrowSchema()->GetFieldByName(GetColumnName(columnId, true))).first; + } + return it->second; } void TIndexInfo::SetAllKeys() { diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 449ff1b64ed..235df0a974f 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -148,6 +148,7 @@ public: struct TIndexInfo : public NTable::TScheme::TTableSchema { private: THashMap<ui32, TColumnFeatures> ColumnFeatures; + mutable THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache; TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false); bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); public: diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index d2e132cc36f..5c1b18fc2fb 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -71,6 +71,15 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo, return true; } +class TSplitLimiterSettings { +public: + static const inline double ReduceCorrectionKff = 0.9; + static const inline double MaxKff = 0.5; + static const inline double MinKff = 0.2; + static const inline double DangerouseRealKff = 0.7; + static const inline double IncreaseCorrectionKff = 1.1; +}; + std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathId, const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, @@ -95,6 +104,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI saverContext.SetTierName(tierName).SetExternalCompression(compression); std::shared_ptr<arrow::RecordBatch> portionBatch = batch; + double kffSplit = 0.5; for (i32 pos = 0; pos < batch->num_rows();) { Y_VERIFY(portionBatch->num_rows()); @@ -117,6 +127,8 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI } ui32 fillCounter = 0; + double maxKffSplit = 0; + ui64 maxBlobSize = 0; for (const auto& columnSummary : sortedColumnIds) { const TString& columnName = resultSchema->GetIndexInfo().GetColumnName(columnSummary.GetColumnId()); const int idx = resultSchema->GetFieldIndex(columnSummary.GetColumnId()); @@ -129,10 +141,16 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI /// @warnign records are not valid cause of empty BlobId and zero Portion TColumnRecord record = TColumnRecord::Make(granule, columnSummary.GetColumnId(), snapshot, 0); auto columnSaver = resultSchema->GetColumnSaver(columnSummary.GetColumnId(), saverContext); - TString blob = TPortionInfo::SerializeColumnWithLimit(array, field, columnSaver, Counters); + ui64 droppedSize; + TString blob = TPortionInfo::SerializeColumnWithLimit(array, field, columnSaver, Counters, droppedSize); + const double kffNew = 1.0 * TCompactionLimits::MAX_BLOB_SIZE / droppedSize * TSplitLimiterSettings::ReduceCorrectionKff; if (!blob) { + kffSplit = std::max<double>(TSplitLimiterSettings::MinKff, std::min<double>(kffNew, TSplitLimiterSettings::MaxKff)); ok = false; break; + } else { + maxKffSplit = std::max<double>(maxKffSplit, kffNew); + maxBlobSize = std::max<ui64>(maxBlobSize, blob.size()); } portionInfo.InsertOneChunkColumn(idx, std::move(record)); @@ -150,11 +168,16 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI } pos += portionBatch->num_rows(); if (pos < batch->num_rows()) { - portionBatch = batch->Slice(pos); + const double kff = (maxKffSplit < TSplitLimiterSettings::DangerouseRealKff) ? TSplitLimiterSettings::IncreaseCorrectionKff : 1.0; + portionBatch = batch->Slice(pos, std::min<ui32>(portionBatch->num_rows() * kff, batch->num_rows() - pos)); } + Counters.SplittedPortionsSize->Collect(maxBlobSize); } else { - const i64 halfLen = portionBatch->num_rows() / 2; - Y_VERIFY(halfLen); + i64 halfLen = portionBatch->num_rows() * kffSplit; + if (!halfLen) { + halfLen = portionBatch->num_rows() * 0.5; + Y_VERIFY(halfLen); + } portionBatch = batch->Slice(pos, halfLen); } } @@ -551,6 +574,16 @@ ui64 TCompactionLogic::TryMovePortions(const TMark& ts0, return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end())); }(); + Counters.AnalizeCompactedPortions->Add(compacted.size()); + Counters.AnalizeInsertedPortions->Add(inserted.size()); + for (auto&& i : portions) { + if (i.IsInserted()) { + Counters.AnalizeInsertedBytes->Add(i.BlobsBytes()); + } else { + Counters.AnalizeCompactedBytes->Add(i.BlobsBytes()); + } + } + // Do nothing if there are less than two compacted portions. if (compacted.size() < 2) { return 0; @@ -559,8 +592,6 @@ ui64 TCompactionLogic::TryMovePortions(const TMark& ts0, std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) { return a->IndexKeyStart() < b->IndexKeyStart(); }); - Counters.AnalizeCompactedPortions->Add(compacted.size()); - Counters.AnalizeInsertedPortions->Add(inserted.size()); for (auto&& i : inserted) { Counters.RepackedInsertedPortionBytes->Add(i->BlobsBytes()); } diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 1d7f6f0fea2..dae5dcc906e 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -43,11 +43,12 @@ TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array TString TPortionInfo::SerializeColumnWithLimit(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, - const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, const ui32 limitBytes) { + const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, ui64& droppedSize, const ui32 limitBytes) { auto blob = SerializeColumn(array, field, saver); if (blob.size() >= limitBytes) { counters.TrashDataSerializationBytes->Add(blob.size()); counters.TrashDataSerialization->Add(1); + droppedSize = blob.size(); return {}; } else { counters.CorrectDataSerializationBytes->Add(blob.size()); diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 96cac1c77c7..db8a7855fd7 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -584,9 +584,10 @@ public: static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, const TColumnSaver saver); + static TString SerializeColumnWithLimit(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, - const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, const ui32 sizeLimit = BLOB_BYTES_LIMIT); + const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, ui64& droppedSize, const ui32 sizeLimit = BLOB_BYTES_LIMIT); void InsertOneChunkColumn(const ui32 idx, TColumnRecord&& record); diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 104b8baf8ea..9c47f5a7ece 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -119,8 +119,7 @@ private: if (GranuleSummary.GetActivePortionsCount() <= 1) { return 0; } - const ui64 weightedSize = (1.0 * GranuleSummary.GetGranuleSize() / 1024) * GranuleSummary.GetActivePortionsCount(); - return weightedSize; + return GranuleSummary.GetGranuleSize() * GranuleSummary.GetActivePortionsCount() * GranuleSummary.GetActivePortionsCount(); } public: TCompactionPriority(const TCompactionPriorityInfo& data, const TGranuleSummary& granuleSummary) |