aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <stanly@yandex-team.com>2023-05-10 15:18:17 +0300
committerstanly <stanly@yandex-team.com>2023-05-10 15:18:17 +0300
commit5d0638d8bfa8a45e6cb9a5e2d224aa44edcd0d31 (patch)
tree063fa54e2d2fd4a16184e66555c5729a2dc9bffb
parent3ac9d23ff8e6758cb41d0d0c2d74699e34d74104 (diff)
downloadydb-5d0638d8bfa8a45e6cb9a5e2d224aa44edcd0d31.tar.gz
cleanup compaction code
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp121
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h42
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h3
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/range.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp4
7 files changed, 78 insertions, 106 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 7a8f1f6e91c..cd030c61870 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -121,7 +121,7 @@ public:
virtual ~TColumnEngineChanges() = default;
- TColumnEngineChanges(EType type)
+ explicit TColumnEngineChanges(EType type)
: Type(type)
{}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 654504c61cb..352c441bde4 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1,8 +1,7 @@
#include "column_engine_logs.h"
-#include "indexed_read_data.h"
-#include "index_logic_logs.h"
-
#include "filter.h"
+#include "index_logic_logs.h"
+#include "indexed_read_data.h"
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
@@ -467,7 +466,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<TInsertedData>&& dataToIndex) {
Y_VERIFY(dataToIndex.size());
- auto changes = std::make_shared<TChanges>(*this, std::move(dataToIndex), Limits);
+ auto changes = std::make_shared<TChanges>(TMark(MarkSchema), std::move(dataToIndex), Limits);
ui32 reserveGranules = 0;
changes->InitSnapshot = LastSnapshot;
@@ -487,13 +486,9 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<
// TODO: cache PathToGranule for hot pathIds
const auto& src = PathGranules[pathId];
- auto& dst = changes->PathToGranule[pathId];
- dst.reserve(src.size());
- for (const auto& [ts, granule] : src) {
- dst.emplace_back(ts, granule);
- }
+ changes->PathToGranule[pathId].assign(src.begin(), src.end());
} else {
- // It could reserve more then needed in case of the same pathId in DataToIndex
+ // It could reserve more than needed in case of the same pathId in DataToIndex
++reserveGranules;
}
}
@@ -510,35 +505,26 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info,
const TSnapshot& outdatedSnapshot) {
Y_VERIFY(info);
+ Y_VERIFY(info->Granules.size() == 1);
- auto changes = std::make_shared<TChanges>(*this, std::move(info), Limits);
- changes->InitSnapshot = LastSnapshot;
-
- Y_VERIFY(changes->CompactionInfo->Granules.size() == 1);
+ auto changes = std::make_shared<TChanges>(TMark(MarkSchema), std::move(info), Limits, LastSnapshot);
- ui64 granule = *changes->CompactionInfo->Granules.begin();
- auto& portions = changes->SwitchedPortions;
- {
- auto spg = Granules.find(granule)->second;
- Y_VERIFY(spg);
-
- auto actualPortions = GetActualPortions(spg->Portions);
- Y_VERIFY(!actualPortions.empty());
- portions.reserve(actualPortions.size());
+ const ui64 granule = *changes->CompactionInfo->Granules.begin();
+ const auto gi = Granules.find(granule);
+ // Check granule exists.
+ Y_VERIFY(gi != Granules.end());
- for (auto* portionInfo : actualPortions) {
- Y_VERIFY(!portionInfo->Empty());
- Y_VERIFY(portionInfo->Granule() == granule);
- portions.push_back(*portionInfo);
+ changes->SwitchedPortions.reserve(gi->second->Portions.size());
+ // Collect active portions for the granule.
+ for (const auto& [_, portionInfo] : gi->second->Portions) {
+ if (portionInfo.IsActive()) {
+ changes->SwitchedPortions.push_back(portionInfo);
}
}
- Y_VERIFY(Granules.contains(granule));
- auto& spg = Granules[granule];
- Y_VERIFY(spg);
- ui64 pathId = spg->Record.PathId;
+ const ui64 pathId = gi->second->Record.PathId;
Y_VERIFY(PathGranules.contains(pathId));
-
+ // Locate mark for the granule.
for (const auto& [mark, pathGranule] : PathGranules[pathId]) {
if (pathGranule == granule) {
changes->SrcGranule = TChanges::TSrcGranule(pathId, granule, mark);
@@ -548,22 +534,22 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
Y_VERIFY(changes->SrcGranule);
if (changes->CompactionInfo->InGranule) {
- TSnapshot completedSnap = (outdatedSnapshot < LastSnapshot) ? LastSnapshot : outdatedSnapshot;
- if (!InitInGranuleMerge(changes->SrcGranule->Mark, portions, Limits, completedSnap, changes->MergeBorders)) {
+ const TSnapshot completedSnap = std::max(LastSnapshot, outdatedSnapshot);
+ if (!InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, Limits, completedSnap, changes->MergeBorders)) {
return {};
}
} else {
GranulesInSplit.insert(granule);
}
- Y_VERIFY(!portions.empty());
+ Y_VERIFY(!changes->SwitchedPortions.empty());
return changes;
}
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop,
ui32 maxRecords) {
- auto changes = std::make_shared<TChanges>(*this, snapshot, Limits);
+ auto changes = std::make_shared<TChanges>(TMark(MarkSchema), snapshot, Limits);
ui32 affectedRecords = 0;
// Add all portions from dropped paths
@@ -650,7 +636,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
}
TSnapshot fakeSnapshot(1, 1); // TODO: better snapshot
- auto changes = std::make_shared<TChanges>(*this, TColumnEngineChanges::TTL, fakeSnapshot);
+ auto changes = std::make_shared<TChanges>(TMark(MarkSchema), TColumnEngineChanges::TTL, fakeSnapshot);
ui64 evicttionSize = 0;
bool allowEviction = true;
ui64 dropBlobs = 0;
@@ -1312,26 +1298,19 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
return out;
}
-static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompactionLimits& limits, ui32& inserted) {
+static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompactionLimits& limits, bool& inserted) {
if (actual.size() < 2) {
return false;
}
- inserted = 0;
ui64 sumSize = 0;
ui64 sumMaxSize = 0;
- std::shared_ptr<arrow::Scalar> minPk0;
- std::shared_ptr<arrow::Scalar> maxPk0;
- if (actual.size()) {
- actual[0]->MinMaxValue(actual[0]->FirstPkColumn, minPk0, maxPk0);
- }
+ auto [minPk0, maxPk0] = actual[0]->MinMaxValue(actual[0]->FirstPkColumn);
bool pkEqual = !!minPk0 && !!maxPk0 && arrow::ScalarEquals(*minPk0, *maxPk0);
for (auto* portionInfo : actual) {
Y_VERIFY(portionInfo);
if (pkEqual) {
- std::shared_ptr<arrow::Scalar> minPkCurrent;
- std::shared_ptr<arrow::Scalar> maxPkCurrent;
- portionInfo->MinMaxValue(portionInfo->FirstPkColumn, minPkCurrent, maxPkCurrent);
+ auto [minPkCurrent, maxPkCurrent] = portionInfo->MinMaxValue(portionInfo->FirstPkColumn);
pkEqual = !!minPkCurrent && !!maxPkCurrent && arrow::ScalarEquals(*minPk0, *minPkCurrent)
&& arrow::ScalarEquals(*maxPk0, *maxPkCurrent);
}
@@ -1339,12 +1318,11 @@ static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompact
sumSize += sizes.first;
sumMaxSize += sizes.second;
if (portionInfo->IsInserted()) {
- ++inserted;
+ inserted = true;
}
}
- return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize
- || sumSize >= limits.GranuleOverloadSize);
+ return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize);
}
std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) {
@@ -1352,50 +1330,43 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact
return {};
}
- std::optional<ui64> outGranule;
+ ui64 granule = 0;
bool inGranule = true;
-
auto it = CompactionGranules.upper_bound(lastCompactedGranule);
- if (it == CompactionGranules.end()) {
- it = CompactionGranules.begin();
- }
while (!CompactionGranules.empty()) {
- Y_VERIFY(it != CompactionGranules.end());
- ui64 granule = *it;
- Y_VERIFY(Granules.contains(granule));
- auto spg = Granules.find(granule)->second;
- Y_VERIFY(spg);
+ // Start from the beggining if the end is reached.
+ if (it == CompactionGranules.end()) {
+ it = CompactionGranules.begin();
+ }
+ const auto gi = Granules.find(*it);
+ // Check granule exists.
+ Y_VERIFY(gi != Granules.end());
// We need only actual portions here (with empty XPlanStep:XTxId)
- auto actualPortions = GetActualPortions(spg->Portions);
- if (!actualPortions.empty()) {
- ui32 inserted = 0;
- bool needSplit = NeedSplit(actualPortions, Limits, inserted);
- if (needSplit) {
+ if (const auto& actualPortions = GetActualPortions(gi->second->Portions); !actualPortions.empty()) {
+ bool inserted = false;
+ if (NeedSplit(actualPortions, Limits, inserted)) {
inGranule = false;
- outGranule = granule;
+ granule = *it;
break;
} else if (inserted) {
- outGranule = granule;
+ granule = *it;
break;
}
}
-
+ // Nothing to compact in the current granule. Throw it.
it = CompactionGranules.erase(it);
- if (it == CompactionGranules.end()) {
- it = CompactionGranules.begin();
- }
}
- if (outGranule) {
+ if (granule) {
auto info = std::make_unique<TCompactionInfo>();
- info->Granules.insert(*outGranule);
+ info->Granules.insert(granule);
info->InGranule = inGranule;
- lastCompactedGranule = *outGranule;
+ lastCompactedGranule = granule;
return info;
}
return {};
}
-}
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 352206ed7b8..374b044e011 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -60,37 +60,38 @@ public:
{}
};
- TChanges(const TColumnEngineForLogs& engine,
+ TChanges(const TMark& defaultMark,
TVector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits)
: TColumnEngineChanges(TColumnEngineChanges::INSERT)
- , DefaultMark(engine.GetDefaultMark())
+ , DefaultMark(defaultMark)
{
Limits = limits;
DataToIndex = std::move(blobsToIndex);
}
- TChanges(const TColumnEngineForLogs& engine,
- std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits)
+ TChanges(const TMark& defaultMark,
+ std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits, const TSnapshot& snapshot)
: TColumnEngineChanges(TColumnEngineChanges::COMPACTION)
- , DefaultMark(engine.GetDefaultMark())
+ , DefaultMark(defaultMark)
{
Limits = limits;
CompactionInfo = std::move(info);
+ InitSnapshot = snapshot;
}
- TChanges(const TColumnEngineForLogs& engine,
+ TChanges(const TMark& defaultMark,
const TSnapshot& snapshot, const TCompactionLimits& limits)
: TColumnEngineChanges(TColumnEngineChanges::CLEANUP)
- , DefaultMark(engine.GetDefaultMark())
+ , DefaultMark(defaultMark)
{
Limits = limits;
InitSnapshot = snapshot;
}
- TChanges(const TColumnEngineForLogs& engine,
+ TChanges(const TMark& defaultMark,
TColumnEngineChanges::EType type, const TSnapshot& applySnapshot)
: TColumnEngineChanges(type)
- , DefaultMark(engine.GetDefaultMark())
+ , DefaultMark(defaultMark)
{
ApplySnapshot = applySnapshot;
}
@@ -142,7 +143,7 @@ public:
return std::max<ui32>(2, numSplitInto);
}
- TMark DefaultMark;
+ const TMark DefaultMark;
THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule}
std::optional<TSrcGranule> SrcGranule;
THashMap<ui64, std::pair<ui64, TMark>> NewGranules; // granule -> {pathId, key}
@@ -197,11 +198,14 @@ public:
return TMark::Deserialize(key, MarkSchema);
}
- TMark GetDefaultMark() const {
- return TMark(MarkSchema);
- }
+ const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override;
+ const TColumnEngineStats& GetTotalStats() override;
+ ui64 MemoryUsage() const override;
+ TSnapshot LastUpdate() const override { return LastSnapshot; }
+public:
bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override;
+
std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) override;
std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo,
const TSnapshot& outdatedSnapshot) override;
@@ -209,15 +213,16 @@ public:
ui32 maxRecords) override;
std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema,
ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
+
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) override;
+
void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) override;
+
void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override;
void UpdateCompactionLimits(const TCompactionLimits& limits) override { Limits = limits; }
- const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override;
- const TColumnEngineStats& GetTotalStats() override;
- ui64 MemoryUsage() const override;
- TSnapshot LastUpdate() const override { return LastSnapshot; }
+
+
std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot,
const THashSet<ui32>& columnIds,
@@ -259,6 +264,7 @@ private:
ui64 LastGranule;
TSnapshot LastSnapshot = TSnapshot::Zero();
+private:
void ClearIndex() {
Granules.clear();
PathGranules.clear();
@@ -298,4 +304,4 @@ private:
TVector<TVector<std::pair<TMark, ui64>>> EmptyGranuleTracks(const ui64 pathId) const;
};
-}
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 664d9bc3a54..5ac90c67303 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -162,14 +162,12 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord
}
}
-void TPortionInfo::MinMaxValue(const ui32 columnId, std::shared_ptr<arrow::Scalar>& minValue, std::shared_ptr<arrow::Scalar>& maxValue) const {
+std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> TPortionInfo::MinMaxValue(const ui32 columnId) const {
auto it = Meta.ColumnMeta.find(columnId);
if (it == Meta.ColumnMeta.end()) {
- minValue = nullptr;
- maxValue = nullptr;
+ return std::make_tuple(std::shared_ptr<arrow::Scalar>(), std::shared_ptr<arrow::Scalar>());
} else {
- minValue = it->second.Min;
- maxValue = it->second.Max;
+ return std::make_tuple(it->second.Min, it->second.Max);
}
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index ec970378229..6f00a50dc1e 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -211,7 +211,8 @@ struct TPortionInfo {
void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName);
void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
- void MinMaxValue(const ui32 columnId, std::shared_ptr<arrow::Scalar>& minValue, std::shared_ptr<arrow::Scalar>& maxValue) const;
+
+ std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> MinMaxValue(const ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp
index acd16bea9a8..69a6389bdd9 100644
--- a/ydb/core/tx/columnshard/engines/predicate/range.cpp
+++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp
@@ -44,9 +44,7 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo
bool matchFrom = false;
bool matchTo = false;
for (auto&& c : indexInfo.GetReplaceKey()->field_names()) {
- std::shared_ptr<arrow::Scalar> minValue;
- std::shared_ptr<arrow::Scalar> maxValue;
- info.MinMaxValue(indexInfo.GetColumnId(c), minValue, maxValue);
+ const auto& [minValue, maxValue ] = info.MinMaxValue(indexInfo.GetColumnId(c));
if (!matchFrom) {
const int result = PredicateFrom.MatchScalar(idx, maxValue);
if (result < 0) {
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 912ce96f24f..1f320b18fae 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -184,9 +184,7 @@ void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::
std::vector<std::shared_ptr<arrow::Scalar>> maxRecord;
for (auto&& i : indexInfo.GetReplaceKey()->fields()) {
const ui32 columnId = indexInfo.GetColumnId(i->name());
- std::shared_ptr<arrow::Scalar> minScalar;
- std::shared_ptr<arrow::Scalar> maxScalar;
- PortionInfo->MinMaxValue(columnId, minScalar, maxScalar);
+ const auto& [minScalar, maxScalar] = PortionInfo->MinMaxValue(columnId);
if (!FirstPK && !minScalar) {
FirstPK = nullptr;
ReverseLastPK = nullptr;