summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-05-30 07:40:19 +0300
committerivanmorozov <[email protected]>2023-05-30 07:40:19 +0300
commita4992c4fc472bb3eb09a8174645f791c84d6dcfb (patch)
tree7d7f1b49422bdd52d9fc689d44a91be419fe7da4
parentb71fb3a796618a5a1f3ad735fc385c0653336007 (diff)
trash-data-serialization optimize experiment (1.2 -> 0.44)
additional signals
-rw-r--r--ydb/core/tx/columnshard/counters/indexation.cpp4
-rw-r--r--ydb/core/tx/columnshard/counters/indexation.h4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h1
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h3
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h3
9 files changed, 57 insertions, 12 deletions
diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp
index 8eb7450ee9a..0b6be350ac0 100644
--- a/ydb/core/tx/columnshard/counters/indexation.cpp
+++ b/ydb/core/tx/columnshard/counters/indexation.cpp
@@ -12,10 +12,12 @@ TIndexationCounters::TIndexationCounters(const TString& module) {
}
ReadBytes = SubGroup->GetCounter(module + "/Read/Bytes", true);
AnalizeInsertedPortions = SubGroup->GetCounter(module + "/AnalizeInsertion/Portions", true);
+ AnalizeInsertedBytes = SubGroup->GetCounter(module + "/AnalizeInsertion/Bytes", true);
RepackedInsertedPortions = SubGroup->GetCounter(module + "/RepackedInsertion/Portions", true);
RepackedInsertedPortionBytes = SubGroup->GetCounter(module + "/RepackedInsertion/Bytes", true);
AnalizeCompactedPortions = SubGroup->GetCounter(module + "/AnalizeCompaction/Portions", true);
+ AnalizeCompactedBytes = SubGroup->GetCounter(module + "/AnalizeCompaction/Bytes", true);
SkipPortionsMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Portions", true);
SkipPortionBytesMoveThroughIntersection = SubGroup->GetCounter(module + "/SkipMoveThroughIntersection/Bytes", true);
RepackedCompactedPortions = SubGroup->GetCounter(module + "/RepackedCompaction/Portions", true);
@@ -26,6 +28,8 @@ TIndexationCounters::TIndexationCounters(const TString& module) {
TrashDataSerialization = SubGroup->GetCounter(module + "/TrashDataSerialization/Count", true);
CorrectDataSerializationBytes = SubGroup->GetCounter(module + "/CorrectDataSerialization/Bytes", true);
CorrectDataSerialization = SubGroup->GetCounter(module + "/CorrectDataSerialization/Count", true);
+
+ SplittedPortionsSize = SubGroup->GetHistogram(module + "/Histogram/SplittedPortionsSize", NMonitoring::ExponentialHistogram(15, 2, 1024));
}
}
diff --git a/ydb/core/tx/columnshard/counters/indexation.h b/ydb/core/tx/columnshard/counters/indexation.h
index a5db7ff395d..bf8c8d37b90 100644
--- a/ydb/core/tx/columnshard/counters/indexation.h
+++ b/ydb/core/tx/columnshard/counters/indexation.h
@@ -10,6 +10,8 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr ReadBytes;
NMonitoring::TDynamicCounters::TCounterPtr AnalizeCompactedPortions;
NMonitoring::TDynamicCounters::TCounterPtr AnalizeInsertedPortions;
+ NMonitoring::TDynamicCounters::TCounterPtr AnalizeCompactedBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr AnalizeInsertedBytes;
NMonitoring::TDynamicCounters::TCounterPtr RepackedInsertedPortions;
NMonitoring::TDynamicCounters::TCounterPtr RepackedInsertedPortionBytes;
NMonitoring::TDynamicCounters::TCounterPtr SkipPortionsMoveThroughIntersection;
@@ -23,6 +25,8 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerializationBytes;
NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerialization;
+ NMonitoring::THistogramPtr SplittedPortionsSize;
+
TIndexationCounters(const TString& module);
};
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 5503cef08a4..20b01766685 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -753,7 +753,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
GranulesTable->Write(db, rec);
}
}
-
+ auto g = GranulesStorage->StartPackModification();
// Update old portions (set stale snapshot)
if (switchedPortions) {
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 69ce114e3a2..233f69c66cb 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -272,7 +272,11 @@ std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<TString
}
std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const {
- return ArrowSchema()->GetFieldByName(GetColumnName(columnId, true));
+ auto it = ArrowColumnByColumnIdCache.find(columnId);
+ if (it == ArrowColumnByColumnIdCache.end()) {
+ it = ArrowColumnByColumnIdCache.emplace(columnId, ArrowSchema()->GetFieldByName(GetColumnName(columnId, true))).first;
+ }
+ return it->second;
}
void TIndexInfo::SetAllKeys() {
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 449ff1b64ed..235df0a974f 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -148,6 +148,7 @@ public:
struct TIndexInfo : public NTable::TScheme::TTableSchema {
private:
THashMap<ui32, TColumnFeatures> ColumnFeatures;
+ mutable THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache;
TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false);
bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
public:
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index d2e132cc36f..5c1b18fc2fb 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -71,6 +71,15 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
return true;
}
+class TSplitLimiterSettings {
+public:
+ static const inline double ReduceCorrectionKff = 0.9;
+ static const inline double MaxKff = 0.5;
+ static const inline double MinKff = 0.2;
+ static const inline double DangerouseRealKff = 0.7;
+ static const inline double IncreaseCorrectionKff = 1.1;
+};
+
std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathId,
const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 granule,
@@ -95,6 +104,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
saverContext.SetTierName(tierName).SetExternalCompression(compression);
std::shared_ptr<arrow::RecordBatch> portionBatch = batch;
+ double kffSplit = 0.5;
for (i32 pos = 0; pos < batch->num_rows();) {
Y_VERIFY(portionBatch->num_rows());
@@ -117,6 +127,8 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
}
ui32 fillCounter = 0;
+ double maxKffSplit = 0;
+ ui64 maxBlobSize = 0;
for (const auto& columnSummary : sortedColumnIds) {
const TString& columnName = resultSchema->GetIndexInfo().GetColumnName(columnSummary.GetColumnId());
const int idx = resultSchema->GetFieldIndex(columnSummary.GetColumnId());
@@ -129,10 +141,16 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
/// @warnign records are not valid cause of empty BlobId and zero Portion
TColumnRecord record = TColumnRecord::Make(granule, columnSummary.GetColumnId(), snapshot, 0);
auto columnSaver = resultSchema->GetColumnSaver(columnSummary.GetColumnId(), saverContext);
- TString blob = TPortionInfo::SerializeColumnWithLimit(array, field, columnSaver, Counters);
+ ui64 droppedSize;
+ TString blob = TPortionInfo::SerializeColumnWithLimit(array, field, columnSaver, Counters, droppedSize);
+ const double kffNew = 1.0 * TCompactionLimits::MAX_BLOB_SIZE / droppedSize * TSplitLimiterSettings::ReduceCorrectionKff;
if (!blob) {
+ kffSplit = std::max<double>(TSplitLimiterSettings::MinKff, std::min<double>(kffNew, TSplitLimiterSettings::MaxKff));
ok = false;
break;
+ } else {
+ maxKffSplit = std::max<double>(maxKffSplit, kffNew);
+ maxBlobSize = std::max<ui64>(maxBlobSize, blob.size());
}
portionInfo.InsertOneChunkColumn(idx, std::move(record));
@@ -150,11 +168,16 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
}
pos += portionBatch->num_rows();
if (pos < batch->num_rows()) {
- portionBatch = batch->Slice(pos);
+ const double kff = (maxKffSplit < TSplitLimiterSettings::DangerouseRealKff) ? TSplitLimiterSettings::IncreaseCorrectionKff : 1.0;
+ portionBatch = batch->Slice(pos, std::min<ui32>(portionBatch->num_rows() * kff, batch->num_rows() - pos));
}
+ Counters.SplittedPortionsSize->Collect(maxBlobSize);
} else {
- const i64 halfLen = portionBatch->num_rows() / 2;
- Y_VERIFY(halfLen);
+ i64 halfLen = portionBatch->num_rows() * kffSplit;
+ if (!halfLen) {
+ halfLen = portionBatch->num_rows() * 0.5;
+ Y_VERIFY(halfLen);
+ }
portionBatch = batch->Slice(pos, halfLen);
}
}
@@ -551,6 +574,16 @@ ui64 TCompactionLogic::TryMovePortions(const TMark& ts0,
return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end()));
}();
+ Counters.AnalizeCompactedPortions->Add(compacted.size());
+ Counters.AnalizeInsertedPortions->Add(inserted.size());
+ for (auto&& i : portions) {
+ if (i.IsInserted()) {
+ Counters.AnalizeInsertedBytes->Add(i.BlobsBytes());
+ } else {
+ Counters.AnalizeCompactedBytes->Add(i.BlobsBytes());
+ }
+ }
+
// Do nothing if there are less than two compacted portions.
if (compacted.size() < 2) {
return 0;
@@ -559,8 +592,6 @@ ui64 TCompactionLogic::TryMovePortions(const TMark& ts0,
std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) {
return a->IndexKeyStart() < b->IndexKeyStart();
});
- Counters.AnalizeCompactedPortions->Add(compacted.size());
- Counters.AnalizeInsertedPortions->Add(inserted.size());
for (auto&& i : inserted) {
Counters.RepackedInsertedPortionBytes->Add(i->BlobsBytes());
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 1d7f6f0fea2..dae5dcc906e 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -43,11 +43,12 @@ TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array
TString TPortionInfo::SerializeColumnWithLimit(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
- const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, const ui32 limitBytes) {
+ const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, ui64& droppedSize, const ui32 limitBytes) {
auto blob = SerializeColumn(array, field, saver);
if (blob.size() >= limitBytes) {
counters.TrashDataSerializationBytes->Add(blob.size());
counters.TrashDataSerialization->Add(1);
+ droppedSize = blob.size();
return {};
} else {
counters.CorrectDataSerializationBytes->Add(blob.size());
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 96cac1c77c7..db8a7855fd7 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -584,9 +584,10 @@ public:
static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
const TColumnSaver saver);
+
static TString SerializeColumnWithLimit(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
- const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, const ui32 sizeLimit = BLOB_BYTES_LIMIT);
+ const TColumnSaver saver, const NColumnShard::TIndexationCounters& counters, ui64& droppedSize, const ui32 sizeLimit = BLOB_BYTES_LIMIT);
void InsertOneChunkColumn(const ui32 idx, TColumnRecord&& record);
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 104b8baf8ea..9c47f5a7ece 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -119,8 +119,7 @@ private:
if (GranuleSummary.GetActivePortionsCount() <= 1) {
return 0;
}
- const ui64 weightedSize = (1.0 * GranuleSummary.GetGranuleSize() / 1024) * GranuleSummary.GetActivePortionsCount();
- return weightedSize;
+ return GranuleSummary.GetGranuleSize() * GranuleSummary.GetActivePortionsCount() * GranuleSummary.GetActivePortionsCount();
}
public:
TCompactionPriority(const TCompactionPriorityInfo& data, const TGranuleSummary& granuleSummary)