diff options
author | ivanmorozov <[email protected]> | 2023-08-03 19:50:57 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-08-03 19:50:57 +0300 |
commit | 5cc18010911fd094c55a3572b17c0efefcc5ce21 (patch) | |
tree | aa4eb40ea4e482f7a9abc18c367773dd558fa9f1 | |
parent | 358091205166a3237041ce0c7c601bc67adc1239 (diff) |
KIKIMR-18932:private snapshot and other features for portion
13 files changed, 91 insertions, 84 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index ff609d2363b..e4b85385034 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -592,9 +592,10 @@ struct Schema : NIceDb::Schema { // IndexColumns activities static void IndexColumns_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { - db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId, portion.PlanStep, portion.TxId, portion.Portion, row.Chunk).Update( - NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.XPlanStep), - NIceDb::TUpdate<IndexColumns::XTxId>(portion.XTxId), + db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId, + portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update( + NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()), + NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()), NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()), NIceDb::TUpdate<IndexColumns::Metadata>(row.Metadata), NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset), @@ -603,7 +604,8 @@ struct Schema : NIceDb::Schema { } static void IndexColumns_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { - db.Table<IndexColumns>().Key(index, portion.Granule, row.ColumnId, portion.PlanStep, portion.TxId, portion.Portion, row.Chunk).Delete(); + db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId, + portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete(); } static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index, @@ -615,14 +617,13 @@ struct Schema : NIceDb::Schema { while (!rowset.EndOfSet()) { TColumnRecord row; NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty(); - portion.Granule = rowset.GetValue<IndexColumns::Granule>(); + portion.SetGranule(rowset.GetValue<IndexColumns::Granule>()); row.ColumnId = rowset.GetValue<IndexColumns::ColumnIdx>(); - portion.PlanStep = rowset.GetValue<IndexColumns::PlanStep>(); - portion.TxId = rowset.GetValue<IndexColumns::TxId>(); - portion.Portion = rowset.GetValue<IndexColumns::Portion>(); + portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()); + portion.SetPortion(rowset.GetValue<IndexColumns::Portion>()); row.Chunk = rowset.GetValue<IndexColumns::Chunk>(); - portion.XPlanStep = rowset.GetValue<IndexColumns::XPlanStep>(); - portion.XTxId = rowset.GetValue<IndexColumns::XTxId>(); + portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>()); + TString strBlobId = rowset.GetValue<IndexColumns::Blob>(); row.Metadata = rowset.GetValue<IndexColumns::Metadata>(); row.BlobRange.Offset = rowset.GetValue<IndexColumns::Offset>(); diff --git a/ydb/core/tx/columnshard/common/snapshot.cpp b/ydb/core/tx/columnshard/common/snapshot.cpp index 9f905808066..fb4ee00017f 100644 --- a/ydb/core/tx/columnshard/common/snapshot.cpp +++ b/ydb/core/tx/columnshard/common/snapshot.cpp @@ -1,5 +1,10 @@ #include "snapshot.h" +#include <util/string/builder.h> namespace NKikimr::NOlap { +TString TSnapshot::DebugString() const { + return TStringBuilder() << "plan_step=" << PlanStep << ";tx_id=" << TxId << ";"; +} + }; diff --git a/ydb/core/tx/columnshard/common/snapshot.h b/ydb/core/tx/columnshard/common/snapshot.h index 3da82944910..93b896be051 100644 --- a/ydb/core/tx/columnshard/common/snapshot.h +++ b/ydb/core/tx/columnshard/common/snapshot.h @@ -46,6 +46,8 @@ public: friend IOutputStream& operator<<(IOutputStream& out, const TSnapshot& s) { return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ::ToString(s.TxId)) << "}"; } + + TString DebugString() const; }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 07bad486b90..996a003eb9d 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -41,7 +41,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { if (granuleRemap.size()) { auto it = granuleRemap.find(portionInfo.GetGranule()); Y_VERIFY(it != granuleRemap.end()); - portionInfo.UpdateGranuleId(it->second); + portionInfo.SetGranule(it->second); } TPortionMeta::EProduced produced = TPortionMeta::INSERTED; @@ -53,7 +53,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { } for (auto& portionInfo : SwitchedPortions) { Y_VERIFY(portionInfo.IsActive()); - portionInfo.SetStale(context.GetSnapshot()); + portionInfo.SetRemoveSnapshot(context.GetSnapshot()); } } diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp index d63aadcc0dd..893ee6efbdb 100644 --- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp @@ -125,11 +125,11 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColum for (auto& portionInfo : portions) { Y_VERIFY(!portionInfo.Empty()); Y_VERIFY(portionInfo.GetGranule() == granule); - auto blobSchema = context.SchemaVersions.GetSchema(portionInfo.GetSnapshot()); + auto blobSchema = context.SchemaVersions.GetSchema(portionInfo.GetMinSnapshot()); auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs); batches.push_back(batch); - if (portionInfo.GetSnapshot() > maxSnapshot) { - maxSnapshot = portionInfo.GetSnapshot(); + if (maxSnapshot < portionInfo.GetMinSnapshot()) { + maxSnapshot = portionInfo.GetMinSnapshot(); } } diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp index 2f0e93b0945..43acd6974f9 100644 --- a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp @@ -137,12 +137,12 @@ std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, NKikimr::NOlap::TSna TSnapshot maxSnapshot = resultSchema->GetSnapshot(); for (const auto& portionInfo : portions) { if (!insertedOnly || portionInfo.IsInserted()) { - const auto blobSchema = context.SchemaVersions.GetSchema(portionInfo.GetSnapshot()); + const auto blobSchema = context.SchemaVersions.GetSchema(portionInfo.GetMinSnapshot()); batches.push_back(portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs)); - if (maxSnapshot < portionInfo.GetSnapshot()) { - maxSnapshot = portionInfo.GetSnapshot(); + if (maxSnapshot < portionInfo.GetMinSnapshot()) { + maxSnapshot = portionInfo.GetMinSnapshot(); } } } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index 7065e663d35..de0c9ea536f 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -189,7 +189,7 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfo& portionInfo, TP TPortionInfo undo = portionInfo; - auto blobSchema = context.SchemaVersions.GetSchema(undo.GetSnapshot()); + auto blobSchema = context.SchemaVersions.GetSchema(undo.GetMinSnapshot()); auto resultSchema = context.SchemaVersions.GetLastSchema(); auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 558a11199e2..006e07b8180 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -97,12 +97,12 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange void TChangesWithAppend::DoCompile(TFinalizationContext& context) { for (auto&& i : AppendedPortions) { - i.UpdatePortionId(context.NextPortionId()); + i.SetPortion(context.NextPortionId()); i.UpdateRecordsMeta(TPortionMeta::INSERTED); } } -std::vector<NKikimr::NOlap::TPortionInfo> TChangesWithAppend::MakeAppendedPortions( +std::vector<TPortionInfo> TChangesWithAppend::MakeAppendedPortions( const ui64 pathId, const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, std::vector<TString>& blobs, const TGranuleMeta* granuleMeta, TConstructionContext& context) const { Y_VERIFY(batch->num_rows()); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index c6df12ed774..af814e84fd3 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -243,7 +243,7 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& // Do not count the blob as lost since it exists in the index. lostBlobs.erase(rec.BlobRange.BlobId); // Locate granule and append the record. - if (const auto gi = Granules.find(portion.Granule); gi != Granules.end()) { + if (const auto gi = Granules.find(portion.GetGranule()); gi != Granules.end()) { gi->second->AddColumnRecord(indexInfo, portion, rec); } else { Y_VERIFY(false); @@ -705,8 +705,8 @@ static TMap<TSnapshot, std::vector<const TPortionInfo*>> GroupPortionsBySnapshot continue; } - TSnapshot recSnapshot = portionInfo.GetSnapshot(); - TSnapshot recXSnapshot = portionInfo.GetXSnapshot(); + TSnapshot recSnapshot = portionInfo.GetMinSnapshot(); + TSnapshot recXSnapshot = portionInfo.GetRemoveSnapshot(); bool visible = (recSnapshot <= snapshot); if (recXSnapshot.GetPlanStep()) { @@ -782,7 +782,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot TMap<TSnapshot, std::vector<const TPortionInfo*>> orderedPortions = GroupPortionsBySnapshot(portions, snapshot); for (auto& [snap, vec] : orderedPortions) { for (const auto* portionInfo : vec) { - TPortionInfo outPortion = portionInfo->FilterColumns(columnIds); + TPortionInfo outPortion = portionInfo->CopyWithFilteredColumns(columnIds); Y_VERIFY(outPortion.Produced()); if (!pkRangesFilter.IsPortionInUsage(outPortion, VersionedIndex.GetLastSchema()->GetIndexInfo())) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped") diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index a7a36b0813a..9c9011bec18 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -228,6 +228,20 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { return it->second.Max; } +TPortionInfo TPortionInfo::CopyWithFilteredColumns(const THashSet<ui32>& columnIds) const { + TPortionInfo result(Granule, Portion, GetMinSnapshot()); + result.Meta = Meta; + result.Records.reserve(columnIds.size()); + + for (auto& rec : Records) { + Y_VERIFY(rec.Valid()); + if (columnIds.contains(rec.ColumnId)) { + result.Records.push_back(rec); + } + } + return result; +} + std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { Y_VERIFY(!Blobs.empty()); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 625348c6357..3058404e214 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -101,14 +101,12 @@ public: struct TPortionInfo { private: TPortionInfo() = default; -public: - static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024; ui64 Granule = 0; ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in granule - ui64 PlanStep = 0; // {PlanStep, TxId} is min snapshot for {Granule, Portion} - ui64 TxId = 0; - ui64 XPlanStep = 0; // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one) - ui64 XTxId = 0; + TSnapshot MinSnapshot = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion} + TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one) +public: + static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024; std::vector<TColumnRecord> Records; TPortionMeta Meta; @@ -116,42 +114,29 @@ public: bool Empty() const { return Records.empty(); } bool Produced() const { return Meta.Produced != TPortionMeta::UNSPECIFIED; } - bool Valid() const { return PlanStep && TxId && Granule && Portion && !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } - bool ValidSnapshotInfo() const { return PlanStep && TxId && Granule && Portion; } + bool Valid() const { return MinSnapshot.Valid() && Granule && Portion && !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } + bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && Granule && Portion; } bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; } bool IsEvicted() const { return Meta.Produced == TPortionMeta::EVICTED; } bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ } bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); } size_t NumRecords() const { return Records.size(); } - TPortionInfo FilterColumns(const THashSet<ui32>& columnIds) const { - TPortionInfo result(Granule, Portion, GetSnapshot()); - result.Meta = Meta; - result.Records.reserve(columnIds.size()); - - for (auto& rec : Records) { - Y_VERIFY(rec.Valid()); - if (columnIds.contains(rec.ColumnId)) { - result.Records.push_back(rec); - } - } - return result; - } + TPortionInfo CopyWithFilteredColumns(const THashSet<ui32>& columnIds) const; bool IsEqualWithSnapshots(const TPortionInfo& item) const { - return Granule == item.Granule && PlanStep == item.PlanStep && TxId == item.TxId - && Portion == item.Portion && XPlanStep == item.XPlanStep && XTxId == item.XTxId; + return Granule == item.Granule && MinSnapshot == item.MinSnapshot + && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot; } static TPortionInfo BuildEmpty() { return TPortionInfo(); } - TPortionInfo(const ui64 granuleId, const ui64 portionId, const TSnapshot& snapshot) + TPortionInfo(const ui64 granuleId, const ui64 portionId, const TSnapshot& minSnapshot) : Granule(granuleId) , Portion(portionId) - , PlanStep(snapshot.GetPlanStep()) - , TxId(snapshot.GetTxId()) + , MinSnapshot(minSnapshot) { } @@ -160,6 +145,7 @@ public: return TStringBuilder() << "portion_id:" << Portion << ";" << "granule_id:" << Granule << ";" << + "min_snapshot:" << MinSnapshot.DebugString() << ";" << "meta:(" << Meta.DebugString() << ");" ; } @@ -169,7 +155,7 @@ public: return false; } - return GetXSnapshot() < snapshot; + return GetRemoveSnapshot() < snapshot; } bool CheckForCleanup() const { @@ -204,29 +190,42 @@ public: Granule = granule; } - TSnapshot GetSnapshot() const { - return TSnapshot(PlanStep, TxId); + void SetPortion(const ui64 portion) { + Portion = portion; + } + + const TSnapshot& GetMinSnapshot() const { + return MinSnapshot; } - TSnapshot GetXSnapshot() const { - Y_VERIFY(!Empty()); - return TSnapshot(XPlanStep, XTxId); + const TSnapshot& GetRemoveSnapshot() const { + return RemoveSnapshot; } bool IsActive() const { - return GetXSnapshot().IsZero(); + return GetRemoveSnapshot().IsZero(); } - void SetSnapshot(const TSnapshot& snap) { + void SetMinSnapshot(const TSnapshot& snap) { Y_VERIFY(snap.Valid()); - PlanStep = snap.GetPlanStep(); - TxId = snap.GetTxId(); + MinSnapshot = snap; } - void SetXSnapshot(const TSnapshot& snap) { - Y_VERIFY(snap.Valid()); - XPlanStep = snap.GetPlanStep(); - XTxId = snap.GetTxId(); + void SetMinSnapshot(const ui64 planStep, const ui64 txId) { + MinSnapshot = TSnapshot(planStep, txId); + Y_VERIFY(MinSnapshot.Valid()); + } + + void SetRemoveSnapshot(const TSnapshot& snap) { + const bool wasValid = RemoveSnapshot.Valid(); + Y_VERIFY(!wasValid || snap.Valid()); + RemoveSnapshot = snap; + } + + void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { + const bool wasValid = RemoveSnapshot.Valid(); + RemoveSnapshot = TSnapshot(planStep, txId); + Y_VERIFY(!wasValid || RemoveSnapshot.Valid()); } std::pair<ui32, ui32> BlobsSizes() const { @@ -247,14 +246,6 @@ public: return sum; } - void UpdatePortionId(const ui64 portionId) { - Portion = portionId; - } - - void UpdateGranuleId(const ui64 granuleId) { - Granule = granuleId; - } - void UpdateRecordsMeta(TPortionMeta::EProduced produced) { Meta.Produced = produced; for (auto& record : Records) { @@ -262,10 +253,6 @@ public: } } - void SetStale(const TSnapshot& snapshot) { - SetXSnapshot(snapshot); - } - void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) { Records.push_back(rec); LoadMetadata(indexInfo, rec); diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 2708ea26282..9436f7a217f 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -33,7 +33,7 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard Y_VERIFY(PortionInfo->Produced()); Y_VERIFY(!FetchedInfo.GetFilteredBatch()); - auto blobSchema = readMetadata->GetLoadSchema(PortionInfo->GetSnapshot()); + auto blobSchema = readMetadata->GetLoadSchema(PortionInfo->GetMinSnapshot()); auto readSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot()); ISnapshotSchema::TPtr resultSchema; if (CurrentColumnIds) { diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 6a61ee5cd9a..dafd6d1aa18 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -106,14 +106,12 @@ public: auto& data = Indices[index].Columns[portion.GetGranule()]; auto it = data.find(portion.GetPortion()); if (it == data.end()) { - it = data.emplace(portion.GetPortion(), portion.FilterColumns({})).first; + it = data.emplace(portion.GetPortion(), portion.CopyWithFilteredColumns({})).first; } else { - Y_VERIFY(portion.Granule == it->second.Granule && portion.Portion == it->second.Portion); + Y_VERIFY(portion.GetGranule() == it->second.GetGranule() && portion.GetPortion() == it->second.GetPortion()); } - it->second.TxId = portion.TxId; - it->second.PlanStep = portion.PlanStep; - it->second.XTxId = portion.XTxId; - it->second.XPlanStep = portion.XPlanStep; + it->second.SetMinSnapshot(portion.GetMinSnapshot()); + it->second.SetRemoveSnapshot(portion.GetRemoveSnapshot()); bool replaced = false; for (auto& rec : it->second.Records) { |