summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <[email protected]>2023-05-12 13:31:43 +0300
committerstanly <[email protected]>2023-05-12 13:31:43 +0300
commit37c14dea4ca1af679dfe080d290915f5e9152eff (patch)
tree7ce1990d2d5a9fbca8818c9d0ff4fcf0b6597226
parent7808aa32a4b483f37cb799cef88ec18a1d61174d (diff)
do not create temporary vector for active portions
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp81
1 files changed, 45 insertions, 36 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index b99a5951f3d..445a2d3bc54 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -127,17 +127,6 @@ bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& por
return true;
}
-std::vector<const TPortionInfo*> GetActualPortions(const THashMap<ui64, TPortionInfo>& portions) {
- std::vector<const TPortionInfo*> out;
- out.reserve(portions.size());
- for (auto& [portion, portionInfo] : portions) {
- if (portionInfo.IsActive()) {
- out.emplace_back(&portionInfo);
- }
- }
- return out;
-}
-
} // namespace
@@ -1298,30 +1287,51 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
return out;
}
-static bool NeedSplit(const std::vector<const TPortionInfo*>& actual, const TCompactionLimits& limits, bool& inserted) {
- if (actual.size() < 2) {
- return false;
- }
-
+static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompactionLimits& limits, bool& inserted) {
ui64 sumSize = 0;
ui64 sumMaxSize = 0;
- auto [minPk0, maxPk0] = actual[0]->MinMaxValue(actual[0]->FirstPkColumn);
- bool pkEqual = !!minPk0 && !!maxPk0 && arrow::ScalarEquals(*minPk0, *maxPk0);
- for (auto* portionInfo : actual) {
- Y_VERIFY(portionInfo);
+ size_t activeCount = 0;
+ std::shared_ptr<arrow::Scalar> minPk0;
+ std::shared_ptr<arrow::Scalar> maxPk0;
+ bool pkEqual = true;
+
+ for (const auto& [_, info] : portions) {
+ // We need only actual portions here (with empty XPlanStep:XTxId)
+ if (info.IsActive()) {
+ ++activeCount;
+ } else {
+ continue;
+ }
+
if (pkEqual) {
- auto [minPkCurrent, maxPkCurrent] = portionInfo->MinMaxValue(portionInfo->FirstPkColumn);
- pkEqual = !!minPkCurrent && !!maxPkCurrent && arrow::ScalarEquals(*minPk0, *minPkCurrent)
- && arrow::ScalarEquals(*maxPk0, *maxPkCurrent);
+ const auto [minPkCurrent, maxPkCurrent] = info.MinMaxValue(info.FirstPkColumn);
+ // Check that all pks equal to each other.
+ if ((pkEqual = bool(minPkCurrent) && bool(maxPkCurrent))) {
+ if (minPk0 && maxPk0) {
+ pkEqual = arrow::ScalarEquals(*minPk0, *minPkCurrent) && arrow::ScalarEquals(*maxPk0, *maxPkCurrent);
+ } else {
+ pkEqual = arrow::ScalarEquals(*minPkCurrent, *maxPkCurrent);
+
+ minPk0 = minPkCurrent;
+ maxPk0 = maxPkCurrent;
+ }
+ }
}
- auto sizes = portionInfo->BlobsSizes();
+
+ auto sizes = info.BlobsSizes();
sumSize += sizes.first;
sumMaxSize += sizes.second;
- if (portionInfo->IsInserted()) {
+ if (info.IsInserted()) {
inserted = true;
}
}
+ // Do nothing if count of active portions is less than two.
+ if (activeCount < 2) {
+ inserted = false;
+ return false;
+ }
+
return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize);
}
@@ -1343,18 +1353,17 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact
const auto gi = Granules.find(*it);
// Check granule exists.
Y_VERIFY(gi != Granules.end());
- // We need only actual portions here (with empty XPlanStep:XTxId)
- if (const auto& actualPortions = GetActualPortions(gi->second->Portions); !actualPortions.empty()) {
- bool inserted = false;
- if (NeedSplit(actualPortions, Limits, inserted)) {
- inGranule = false;
- granule = *it;
- break;
- } else if (inserted) {
- granule = *it;
- break;
- }
+
+ bool inserted = false;
+ if (NeedSplit(gi->second->Portions, Limits, inserted)) {
+ inGranule = false;
+ granule = *it;
+ break;
+ } else if (inserted) {
+ granule = *it;
+ break;
}
+
// Nothing to compact in the current granule. Throw it.
it = CompactionGranules.erase(it);
}