diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-25 21:31:02 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-25 21:31:02 +0300 |
commit | 46cf74c79c878d215c13f5eb86ccff6940a7d817 (patch) | |
tree | 8d7a6ae361dc7081db9897363715009955608516 | |
parent | f3eadcc2cfd4bd08ff995216c88ab8d67e24f696 (diff) | |
download | ydb-46cf74c79c878d215c13f5eb86ccff6940a7d817.tar.gz |
fix compaction case with equal first column in pk
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 1 |
3 files changed, 27 insertions, 2 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 30c636336b..8c0153d3a7 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -1531,8 +1531,21 @@ static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompact inserted = 0; ui64 sumSize = 0; ui64 sumMaxSize = 0; + std::shared_ptr<arrow::Scalar> minPk0; + std::shared_ptr<arrow::Scalar> maxPk0; + if (actual.size()) { + actual[0]->MinMaxValue(actual[0]->FirstPkColumn, minPk0, maxPk0); + } + bool pkEqual = !!minPk0 && !!maxPk0 && arrow::ScalarEquals(*minPk0, *maxPk0); for (auto* portionInfo : actual) { Y_VERIFY(portionInfo); + if (pkEqual) { + std::shared_ptr<arrow::Scalar> minPkCurrent; + std::shared_ptr<arrow::Scalar> maxPkCurrent; + portionInfo->MinMaxValue(portionInfo->FirstPkColumn, minPkCurrent, maxPkCurrent); + pkEqual = !!minPkCurrent && !!maxPkCurrent && arrow::ScalarEquals(*minPk0, *minPkCurrent) + && arrow::ScalarEquals(*maxPk0, *maxPkCurrent); + } auto sizes = portionInfo->BlobsSizes(); sumSize += sizes.first; sumMaxSize += sizes.second; @@ -1541,8 +1554,8 @@ static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompact } } - return sumMaxSize >= limits.GranuleBlobSplitSize - || sumSize >= limits.GranuleOverloadSize; + return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize + || sumSize >= limits.GranuleOverloadSize); } std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) { diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 50bdb2ff6a..73035b6c24 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -218,6 +218,17 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord } } +void TPortionInfo::MinMaxValue(const ui32 columnId, std::shared_ptr<arrow::Scalar>& minValue, std::shared_ptr<arrow::Scalar>& maxValue) const { + auto it = Meta.ColumnMeta.find(columnId); + if (it == Meta.ColumnMeta.end()) { + minValue = nullptr; + maxValue = nullptr; + } else { + minValue = it->second.Min; + maxValue = it->second.Max; + } +} + std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const { if (!Meta.ColumnMeta.contains(columnId)) { return {}; diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 6230256ac8..db2123bc94 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -164,6 +164,7 @@ struct TPortionInfo { void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName); void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); + void MinMaxValue(const ui32 columnId, std::shared_ptr<arrow::Scalar>& minValue, std::shared_ptr<arrow::Scalar>& maxValue) const; std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; |