diff options
author | stanly <[email protected]> | 2023-05-12 13:31:43 +0300 |
---|---|---|
committer | stanly <[email protected]> | 2023-05-12 13:31:43 +0300 |
commit | 37c14dea4ca1af679dfe080d290915f5e9152eff (patch) | |
tree | 7ce1990d2d5a9fbca8818c9d0ff4fcf0b6597226 | |
parent | 7808aa32a4b483f37cb799cef88ec18a1d61174d (diff) |
do not create temporary vector for active portions
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 81 |
1 files changed, 45 insertions, 36 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index b99a5951f3d..445a2d3bc54 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -127,17 +127,6 @@ bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& por return true; } -std::vector<const TPortionInfo*> GetActualPortions(const THashMap<ui64, TPortionInfo>& portions) { - std::vector<const TPortionInfo*> out; - out.reserve(portions.size()); - for (auto& [portion, portionInfo] : portions) { - if (portionInfo.IsActive()) { - out.emplace_back(&portionInfo); - } - } - return out; -} - } // namespace @@ -1298,30 +1287,51 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot return out; } -static bool NeedSplit(const std::vector<const TPortionInfo*>& actual, const TCompactionLimits& limits, bool& inserted) { - if (actual.size() < 2) { - return false; - } - +static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompactionLimits& limits, bool& inserted) { ui64 sumSize = 0; ui64 sumMaxSize = 0; - auto [minPk0, maxPk0] = actual[0]->MinMaxValue(actual[0]->FirstPkColumn); - bool pkEqual = !!minPk0 && !!maxPk0 && arrow::ScalarEquals(*minPk0, *maxPk0); - for (auto* portionInfo : actual) { - Y_VERIFY(portionInfo); + size_t activeCount = 0; + std::shared_ptr<arrow::Scalar> minPk0; + std::shared_ptr<arrow::Scalar> maxPk0; + bool pkEqual = true; + + for (const auto& [_, info] : portions) { + // We need only actual portions here (with empty XPlanStep:XTxId) + if (info.IsActive()) { + ++activeCount; + } else { + continue; + } + if (pkEqual) { - auto [minPkCurrent, maxPkCurrent] = portionInfo->MinMaxValue(portionInfo->FirstPkColumn); - pkEqual = !!minPkCurrent && !!maxPkCurrent && arrow::ScalarEquals(*minPk0, *minPkCurrent) - && arrow::ScalarEquals(*maxPk0, *maxPkCurrent); + const auto [minPkCurrent, maxPkCurrent] = info.MinMaxValue(info.FirstPkColumn); + // Check that all pks equal to each other. + if ((pkEqual = bool(minPkCurrent) && bool(maxPkCurrent))) { + if (minPk0 && maxPk0) { + pkEqual = arrow::ScalarEquals(*minPk0, *minPkCurrent) && arrow::ScalarEquals(*maxPk0, *maxPkCurrent); + } else { + pkEqual = arrow::ScalarEquals(*minPkCurrent, *maxPkCurrent); + + minPk0 = minPkCurrent; + maxPk0 = maxPkCurrent; + } + } } - auto sizes = portionInfo->BlobsSizes(); + + auto sizes = info.BlobsSizes(); sumSize += sizes.first; sumMaxSize += sizes.second; - if (portionInfo->IsInserted()) { + if (info.IsInserted()) { inserted = true; } } + // Do nothing if count of active portions is less than two. + if (activeCount < 2) { + inserted = false; + return false; + } + return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize); } @@ -1343,18 +1353,17 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact const auto gi = Granules.find(*it); // Check granule exists. Y_VERIFY(gi != Granules.end()); - // We need only actual portions here (with empty XPlanStep:XTxId) - if (const auto& actualPortions = GetActualPortions(gi->second->Portions); !actualPortions.empty()) { - bool inserted = false; - if (NeedSplit(actualPortions, Limits, inserted)) { - inGranule = false; - granule = *it; - break; - } else if (inserted) { - granule = *it; - break; - } + + bool inserted = false; + if (NeedSplit(gi->second->Portions, Limits, inserted)) { + inGranule = false; + granule = *it; + break; + } else if (inserted) { + granule = *it; + break; } + // Nothing to compact in the current granule. Throw it. it = CompactionGranules.erase(it); } |