aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-25 21:31:02 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-25 21:31:02 +0300
commit46cf74c79c878d215c13f5eb86ccff6940a7d817 (patch)
tree8d7a6ae361dc7081db9897363715009955608516
parentf3eadcc2cfd4bd08ff995216c88ab8d67e24f696 (diff)
downloadydb-46cf74c79c878d215c13f5eb86ccff6940a7d817.tar.gz
fix compaction case with equal first column in pk
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h1
3 files changed, 27 insertions, 2 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 30c636336b..8c0153d3a7 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1531,8 +1531,21 @@ static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompact
inserted = 0;
ui64 sumSize = 0;
ui64 sumMaxSize = 0;
+ std::shared_ptr<arrow::Scalar> minPk0;
+ std::shared_ptr<arrow::Scalar> maxPk0;
+ if (actual.size()) {
+ actual[0]->MinMaxValue(actual[0]->FirstPkColumn, minPk0, maxPk0);
+ }
+ bool pkEqual = !!minPk0 && !!maxPk0 && arrow::ScalarEquals(*minPk0, *maxPk0);
for (auto* portionInfo : actual) {
Y_VERIFY(portionInfo);
+ if (pkEqual) {
+ std::shared_ptr<arrow::Scalar> minPkCurrent;
+ std::shared_ptr<arrow::Scalar> maxPkCurrent;
+ portionInfo->MinMaxValue(portionInfo->FirstPkColumn, minPkCurrent, maxPkCurrent);
+ pkEqual = !!minPkCurrent && !!maxPkCurrent && arrow::ScalarEquals(*minPk0, *minPkCurrent)
+ && arrow::ScalarEquals(*maxPk0, *maxPkCurrent);
+ }
auto sizes = portionInfo->BlobsSizes();
sumSize += sizes.first;
sumMaxSize += sizes.second;
@@ -1541,8 +1554,8 @@ static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompact
}
}
- return sumMaxSize >= limits.GranuleBlobSplitSize
- || sumSize >= limits.GranuleOverloadSize;
+ return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize
+ || sumSize >= limits.GranuleOverloadSize);
}
std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 50bdb2ff6a..73035b6c24 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -218,6 +218,17 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord
}
}
+void TPortionInfo::MinMaxValue(const ui32 columnId, std::shared_ptr<arrow::Scalar>& minValue, std::shared_ptr<arrow::Scalar>& maxValue) const {
+ auto it = Meta.ColumnMeta.find(columnId);
+ if (it == Meta.ColumnMeta.end()) {
+ minValue = nullptr;
+ maxValue = nullptr;
+ } else {
+ minValue = it->second.Min;
+ maxValue = it->second.Max;
+ }
+}
+
std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
if (!Meta.ColumnMeta.contains(columnId)) {
return {};
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 6230256ac8..db2123bc94 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -164,6 +164,7 @@ struct TPortionInfo {
void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName);
void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
+ void MinMaxValue(const ui32 columnId, std::shared_ptr<arrow::Scalar>& minValue, std::shared_ptr<arrow::Scalar>& maxValue) const;
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;