diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-13 14:47:10 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-13 14:47:10 +0300 |
commit | b9ab00ec95d68d65d1b8974c7b45d6822c2d0c00 (patch) | |
tree | 14eebff3e5b8e59285ed29fb60a718280cd2304e | |
parent | 965c3d7760fc9b38267cff838aeab00223831be3 (diff) | |
download | ydb-b9ab00ec95d68d65d1b8974c7b45d6822c2d0c00.tar.gz |
separate rt/persistent logic
9 files changed, 232 insertions, 192 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index f8e03b52366..345e05cd900 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -500,13 +500,13 @@ struct Schema : NIceDb::Schema { switch (recType) { case EInsertTableIds::Inserted: - insertTable.AddInserted(TWriteId{ data.WriteTxId }, std::move(data)); + insertTable.AddInserted(std::move(data), true); break; case EInsertTableIds::Committed: - insertTable.AddCommitted(std::move(data)); + insertTable.AddCommitted(std::move(data), true); break; case EInsertTableIds::Aborted: - insertTable.AddAborted(TWriteId{ data.WriteTxId }, std::move(data)); + insertTable.AddAborted(std::move(data), true); break; } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index c7a799de8c4..82a5bd3dd26 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -5,62 +5,23 @@ namespace NKikimr::NOlap { -void TInsertTable::OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept { - if (!load) { - Counters.Inserted.Add(dataSize); - } - pathInfo.AddInsertedSize(dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); - ++StatsPrepared.Rows; - StatsPrepared.Bytes += dataSize; -} - -void TInsertTable::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept { - Counters.Inserted.Erase(dataSize); - pathInfo.AddInsertedSize(-1 * (i64)dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); - Y_VERIFY(--StatsPrepared.Rows >= 0); - StatsPrepared.Bytes += dataSize; -} - -void TInsertTable::OnNewCommitted(const ui64 dataSize, const bool load) noexcept { - if (!load) { - Counters.Committed.Add(dataSize); - } - ++StatsCommitted.Rows; - StatsCommitted.Bytes += dataSize; -} - -void TInsertTable::OnEraseCommitted(TPathInfo& /*pathInfo*/, const ui64 dataSize) noexcept { - Counters.Committed.Erase(dataSize); - Y_VERIFY(--StatsCommitted.Rows >= 0); - StatsCommitted.Bytes -= dataSize; -} - bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { - TWriteId writeId{data.WriteTxId}; - if (Inserted.contains(writeId)) { - Counters.Inserted.SkipAdd(data.BlobSize()); - return false; - } - - dbTable.Insert(data); - const ui32 dataSize = data.BlobSize(); - const ui64 pathId = data.PathId; - if (Inserted.emplace(writeId, std::move(data)).second) { - OnNewInserted(Summary.GetPathInfo(pathId), dataSize); + if (auto* dataPtr = Summary.AddInserted(std::move(data))) { + dbTable.Insert(*dataPtr); + return true; } else { - Counters.Inserted.SkipAdd(dataSize); + return false; } - return true; } -TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, +TInsertionSummary::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists) { Y_VERIFY(!writeIds.empty()); Y_UNUSED(metaShard); - TCounters counters; + TInsertionSummary::TCounters counters; for (auto writeId : writeIds) { - auto* data = Inserted.FindPtr(writeId); + std::optional<TInsertedData> data = Summary.ExtractInserted(writeId); Y_VERIFY(data, "Commit %" PRIu64 ":%" PRIu64 " : writeId %" PRIu64 " not found", planStep, txId, (ui64)writeId); NKikimrTxColumnShard::TLogicalMetadata meta; @@ -72,7 +33,6 @@ TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, dbTable.EraseInserted(*data); - const ui64 dataSize = data->BlobSize(); const ui64 pathId = data->PathId; auto* pathInfo = Summary.GetPathInfoOptional(pathId); // There could be commit after drop: propose, drop, plan @@ -80,17 +40,10 @@ TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, data->Commit(planStep, txId); dbTable.Commit(*data); - if (pathInfo->AddCommitted(std::move(*data))) { - OnNewCommitted(dataSize); - } + pathInfo->AddCommitted(std::move(*data)); } else { dbTable.Abort(*data); - Counters.Aborted.Add(data->BlobSize()); - Aborted.emplace(writeId, std::move(*data)); - } - - if (pathInfo && Inserted.erase(writeId)) { - OnEraseInserted(*pathInfo, dataSize); + Summary.AddAborted(std::move(*data)); } } @@ -103,19 +56,10 @@ void TInsertTable::Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWr for (auto writeId : writeIds) { // There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId. - if (auto* data = Inserted.FindPtr(writeId)) { - Counters.Aborted.Add(data->BlobSize()); + if (std::optional<TInsertedData> data = Summary.ExtractInserted(writeId)) { dbTable.EraseInserted(*data); dbTable.Abort(*data); - - const ui64 pathId = data->PathId; - const ui32 dataSize = data->BlobSize(); - Aborted.emplace(writeId, std::move(*data)); - if (Inserted.erase(writeId)) { - OnEraseInserted(Summary.GetPathInfo(pathId), dataSize); - } else { - Counters.Inserted.SkipErase(dataSize); - } + Summary.AddAborted(std::move(*data)); } } } @@ -129,96 +73,40 @@ THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const { } LastCleanup = now; - TInstant timeBorder = now - WaitCommitDelay; - THashSet<TWriteId> toAbort; - for (auto& [writeId, data] : Inserted) { - if (data.DirtyTime && data.DirtyTime < timeBorder) { - toAbort.insert(writeId); - } - } - return toAbort; + const TInstant timeBorder = now - WaitCommitDelay; + return Summary.GetDeprecatedInsertions(timeBorder); } THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) { - // Committed -> Aborted (for future cleanup) - auto pathInfo = Summary.ExtractPathInfo(pathId); - if (!pathInfo) { - return {}; - } - for (auto& data : pathInfo->GetCommitted()) { - dbTable.EraseCommitted(data); - OnEraseCommitted(*pathInfo, data.BlobSize()); - TInsertedData copy = data; - copy.Undo(); - dbTable.Abort(copy); - - TWriteId writeId{copy.WriteTxId}; - Counters.Aborted.Add(copy.BlobSize()); - Aborted.emplace(writeId, std::move(copy)); - } - - // Return not committed writes for abort. Tablet filter this list with proposed ones befor Abort(). - - THashSet<TWriteId> toAbort; - for (auto& [writeId, data] : Inserted) { - if (data.PathId == pathId) { - toAbort.insert(writeId); + if (!!pathInfo) { + for (auto& data : pathInfo->GetCommitted()) { + dbTable.EraseCommitted(data); + TInsertedData copy = data; + copy.Undo(); + dbTable.Abort(copy); + Summary.AddAborted(std::move(copy)); } } - return toAbort; + return Summary.GetInsertedByPathId(pathId); } void TInsertTable::EraseCommitted(IDbWrapper& dbTable, const TInsertedData& data) { - TPathInfo* pathInfo = Summary.GetPathInfoOptional(data.PathId); - if (!pathInfo) { - Counters.Committed.SkipErase(data.BlobSize()); - return; - } - - dbTable.EraseCommitted(data); - if (pathInfo->EraseCommitted(data)) { - OnEraseCommitted(*pathInfo, data.BlobSize()); - } else { - Counters.Committed.SkipErase(data.BlobSize()); + if (Summary.EraseCommitted(data)) { + dbTable.EraseCommitted(data); } } void TInsertTable::EraseAborted(IDbWrapper& dbTable, const TInsertedData& data) { - TWriteId writeId{data.WriteTxId}; - if (!Aborted.contains(writeId)) { - return; + if (Summary.EraseAborted((TWriteId)data.WriteTxId)) { + dbTable.EraseAborted(data); } - - dbTable.EraseAborted(data); - Counters.Aborted.Erase(data.BlobSize()); - Aborted.erase(writeId); } -bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) { +bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant loadTime) { Clear(); - - if (!dbTable.Load(*this, loadTime)) { - return false; - } - - // update stats - - StatsPrepared = {}; - StatsCommitted = {}; - - for (auto& [_, data] : Inserted) { - OnNewInserted(Summary.GetPathInfo(data.PathId), data.BlobSize()); - } - - for (auto& [pathId, pathInfo] : Summary.GetPathInfo()) { - for (auto& data : pathInfo.GetCommitted()) { - OnNewCommitted(data.BlobSize()); - } - } - - return true; + return dbTable.Load(*this, loadTime); } std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const { diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index d777b2e0a73..904991dbf30 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -14,52 +14,47 @@ class IDbWrapper; class TInsertTableAccessor { protected: - THashMap<TWriteId, TInsertedData> Inserted; - THashMap<TWriteId, TInsertedData> Aborted; TInsertionSummary Summary; protected: void Clear() { - Inserted.clear(); Summary.Clear(); - Aborted.clear(); } public: const std::map<ui64, std::set<const TPathInfo*>>& GetPathPriorities() const { return Summary.GetPathPriorities(); } - bool AddInserted(const TWriteId& writeId, TInsertedData&& data) { - return Inserted.emplace(writeId, std::move(data)).second; + bool AddInserted(TInsertedData&& data, const bool load) { + return Summary.AddInserted(std::move(data), load); } - bool AddAborted(const TWriteId& writeId, TInsertedData&& data) { - return Aborted.emplace(writeId, std::move(data)).second; + bool AddAborted(TInsertedData&& data, const bool load) { + return Summary.AddAborted(std::move(data), load); } - bool AddCommitted(TInsertedData&& data) { + bool AddCommitted(TInsertedData&& data, const bool load) { const ui64 pathId = data.PathId; - return Summary.GetPathInfo(pathId).AddCommitted(std::move(data)); + return Summary.GetPathInfo(pathId).AddCommitted(std::move(data), load); + } + const THashMap<TWriteId, TInsertedData>& GetAborted() const { return Summary.GetAborted(); } + const THashMap<TWriteId, TInsertedData>& GetInserted() const { return Summary.GetInserted(); } + const TInsertionSummary::TCounters& GetCountersPrepared() const { + return Summary.GetCountersPrepared(); + } + const TInsertionSummary::TCounters& GetCountersCommitted() const { + return Summary.GetCountersCommitted(); + } + bool IsOverloadedByCommitted(const ui64 pathId) const { + return Summary.IsOverloaded(pathId); } }; class TInsertTable: public TInsertTableAccessor { -private: - void OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load = false) noexcept; - void OnNewCommitted(const ui64 dataSize, const bool load = false) noexcept; - void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept; - void OnEraseCommitted(TPathInfo& pathInfo, const ui64 dataSize) noexcept; - public: - static constexpr const TDuration WaitCommitDelay = TDuration::Hours(24); + static constexpr const TDuration WaitCommitDelay = TDuration::Minutes(10); static constexpr const TDuration CleanDelay = TDuration::Minutes(10); - struct TCounters { - ui64 Rows{}; - ui64 Bytes{}; - ui64 RawBytes{}; - }; - bool Insert(IDbWrapper& dbTable, TInsertedData&& data); - TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, + TInsertionSummary::TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists); void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds); THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const; @@ -67,21 +62,10 @@ public: void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key); void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const; - bool Load(IDbWrapper& dbTable, const TInstant& loadTime); - const TCounters& GetCountersPrepared() const { return StatsPrepared; } - const TCounters& GetCountersCommitted() const { return StatsCommitted; } - - size_t InsertedSize() const { return Inserted.size(); } - const THashMap<TWriteId, TInsertedData>& GetAborted() const { return Aborted; } - bool IsOverloadedByCommitted(const ui64 pathId) const { - return Summary.IsOverloaded(pathId); - } + bool Load(IDbWrapper& dbTable, const TInstant loadTime); private: mutable TInstant LastCleanup; - TCounters StatsPrepared; - TCounters StatsCommitted; - const NColumnShard::TInsertTableCounters Counters; }; } diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp index a68a84a2b13..e70d48ac472 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp @@ -19,16 +19,12 @@ bool TPathInfo::SetInsertedOverload(const bool value) { void TPathInfo::AddCommittedSize(const i64 size, const ui64 overloadLimit) { CommittedSize += size; Y_VERIFY(CommittedSize >= 0); - Summary->CommittedSize += size; - Y_VERIFY(Summary->CommittedSize >= 0); SetCommittedOverload((ui64)CommittedSize > overloadLimit); } void TPathInfo::AddInsertedSize(const i64 size, const ui64 overloadLimit) { InsertedSize += size; Y_VERIFY(InsertedSize >= 0); - Summary->InsertedSize += size; - Y_VERIFY(Summary->InsertedSize >= 0); PathIdCounters.Inserted.OnPathIdDataInfo(InsertedSize, 0); SetInsertedOverload((ui64)InsertedSize > overloadLimit); } @@ -39,14 +35,17 @@ bool TPathInfo::EraseCommitted(const TInsertedData& data) { AddCommittedSize(-1 * (i64)data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); Summary->AddPriority(*this); PathIdCounters.Committed.OnPathIdDataInfo(CommittedSize, Committed.size()); + Summary->OnEraseCommitted(*this, data.BlobSize()); return result; } -bool TPathInfo::AddCommitted(TInsertedData&& data) { +bool TPathInfo::AddCommitted(TInsertedData&& data, const bool load) { + const ui64 dataSize = data.BlobSize(); Summary->RemovePriority(*this); AddCommittedSize(data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); bool result = Committed.emplace(std::move(data)).second; Summary->AddPriority(*this); + Summary->OnNewCommitted(dataSize, load); PathIdCounters.Committed.OnPathIdDataInfo(CommittedSize, Committed.size()); return result; } diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.h b/ydb/core/tx/columnshard/engines/insert_table/path_info.h index 57f19d1298a..22d033c124e 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/path_info.h +++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.h @@ -40,7 +40,7 @@ public: return Committed; } - bool AddCommitted(TInsertedData&& data); + bool AddCommitted(TInsertedData&& data, const bool load = false); bool IsOverloaded() const { return CommittedOverload || InsertedOverload; diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index f76b5379379..1b50f81e899 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -1,7 +1,22 @@ #include "rt_insertion.h" +#include <ydb/core/tx/columnshard/engines/column_engine.h> namespace NKikimr::NOlap { +void TInsertionSummary::OnNewCommitted(const ui64 dataSize, const bool load) noexcept { + if (!load) { + Counters.Committed.Add(dataSize); + } + ++StatsCommitted.Rows; + StatsCommitted.Bytes += dataSize; +} + +void TInsertionSummary::OnEraseCommitted(TPathInfo& /*pathInfo*/, const ui64 dataSize) noexcept { + Counters.Committed.Erase(dataSize); + Y_VERIFY(--StatsCommitted.Rows >= 0); + StatsCommitted.Bytes -= dataSize; +} + void TInsertionSummary::RemovePriority(const TPathInfo& pathInfo) noexcept { const ui64 priority = pathInfo.GetIndexationPriority(); auto it = Priorities.find(priority); @@ -35,6 +50,9 @@ std::optional<NKikimr::NOlap::TPathInfo> TInsertionSummary::ExtractPathInfo(cons RemovePriority(it->second); std::optional<TPathInfo> result = std::move(it->second); PathInfo.erase(it); + for (auto&& i : result->GetCommitted()) { + OnEraseCommitted(*result, i.BlobSize()); + } return result; } @@ -63,8 +81,113 @@ bool TInsertionSummary::IsOverloaded(const ui64 pathId) const { } void TInsertionSummary::Clear() { + StatsPrepared = {}; + StatsCommitted = {}; PathInfo.clear(); Priorities.clear(); + Inserted.clear(); + Aborted.clear(); +} + +void TInsertionSummary::OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept { + if (!load) { + Counters.Inserted.Add(dataSize); + } + pathInfo.AddInsertedSize(dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); + ++StatsPrepared.Rows; + StatsPrepared.Bytes += dataSize; +} + +void TInsertionSummary::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept { + Counters.Inserted.Erase(dataSize); + pathInfo.AddInsertedSize(-1 * (i64)dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); + Y_VERIFY(--StatsPrepared.Rows >= 0); + StatsPrepared.Bytes += dataSize; +} + +THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetInsertedByPathId(const ui64 pathId) const { + THashSet<TWriteId> result; + for (auto& [writeId, data] : Inserted) { + if (data.PathId == pathId) { + result.insert(writeId); + } + } + + return result; +} + +THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetDeprecatedInsertions(const TInstant timeBorder) const { + THashSet<TWriteId> toAbort; + for (auto& [writeId, data] : Inserted) { + if (data.DirtyTime && data.DirtyTime < timeBorder) { + toAbort.insert(writeId); + } + } + return toAbort; +} + +bool TInsertionSummary::EraseAborted(const TWriteId writeId) { + auto it = Aborted.find(writeId); + if (it == Aborted.end()) { + return false; + } + Counters.Aborted.Erase(it->second.BlobSize()); + Aborted.erase(it); + return true; +} + +bool TInsertionSummary::EraseCommitted(const TInsertedData& data) { + TPathInfo* pathInfo = GetPathInfoOptional(data.PathId); + if (!pathInfo) { + Counters.Committed.SkipErase(data.BlobSize()); + return false; + } + + if (!pathInfo->EraseCommitted(data)) { + Counters.Committed.SkipErase(data.BlobSize()); + return false; + } else { + return true; + } +} + +const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) { + const TWriteId writeId((TWriteId)data.WriteTxId); + if (!load) { + Counters.Aborted.Add(data.BlobSize()); + } + auto insertInfo = Aborted.emplace(writeId, std::move(data)); + Y_VERIFY(insertInfo.second); + return &insertInfo.first->second; +} + +std::optional<NKikimr::NOlap::TInsertedData> TInsertionSummary::ExtractInserted(const TWriteId id) { + auto it = Inserted.find(id); + if (it == Inserted.end()) { + return {}; + } else { + auto pathInfo = GetPathInfoOptional(it->second.PathId); + if (pathInfo) { + OnEraseInserted(*pathInfo, it->second.BlobSize()); + } + std::optional<TInsertedData> result = std::move(it->second); + Inserted.erase(it); + return result; + } +} + +const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) { + TWriteId writeId{ data.WriteTxId }; + const ui32 dataSize = data.BlobSize(); + const ui64 pathId = data.PathId; + auto insertInfo = Inserted.emplace(writeId, std::move(data)); + if (insertInfo.second) { + OnNewInserted(GetPathInfo(pathId), dataSize, load); + return &insertInfo.first->second; + } else { + Counters.Inserted.SkipAdd(dataSize); + return nullptr; + } } } diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h index 56f8c4bc660..8718dd19f83 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h @@ -6,18 +6,54 @@ namespace NKikimr::NOlap { class TInsertionSummary { +public: + struct TCounters { + ui64 Rows{}; + ui64 Bytes{}; + ui64 RawBytes{}; + }; + private: friend class TPathInfo; - + TCounters StatsPrepared; + TCounters StatsCommitted; const NColumnShard::TInsertTableCounters Counters; - YDB_READONLY(i64, CommittedSize, 0); - YDB_READONLY(i64, InsertedSize, 0); + + THashMap<TWriteId, TInsertedData> Inserted; + THashMap<TWriteId, TInsertedData> Aborted; + std::map<ui64, std::set<const TPathInfo*>> Priorities; THashMap<ui64, TPathInfo> PathInfo; void RemovePriority(const TPathInfo& pathInfo) noexcept; void AddPriority(const TPathInfo& pathInfo) noexcept; + void OnNewCommitted(const ui64 dataSize, const bool load = false) noexcept; + void OnEraseCommitted(TPathInfo& pathInfo, const ui64 dataSize) noexcept; + void OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept; + void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept; + public: + THashSet<TWriteId> GetInsertedByPathId(const ui64 pathId) const; + + THashSet<TWriteId> GetDeprecatedInsertions(const TInstant timeBorder) const; + + const THashMap<TWriteId, TInsertedData>& GetInserted() const { + return Inserted; + } + const THashMap<TWriteId, TInsertedData>& GetAborted() const { + return Aborted; + } + + const TInsertedData* AddAborted(TInsertedData&& data, const bool load = false); + bool EraseAborted(const TWriteId writeId); + + bool EraseCommitted(const TInsertedData& data); + + const TInsertedData* AddInserted(TInsertedData&& data, const bool load = false); + std::optional<TInsertedData> ExtractInserted(const TWriteId id); + + const TCounters& GetCountersPrepared() const { return StatsPrepared; } + const TCounters& GetCountersCommitted() const { return StatsCommitted; } const NColumnShard::TInsertTableCounters& GetCounters() const { return Counters; } @@ -34,6 +70,8 @@ public: bool IsOverloaded(const ui64 pathId) const; + + const std::map<ui64, std::set<const TPathInfo*>>& GetPathPriorities() const { return Priorities; } diff --git a/ydb/core/tx/columnshard/engines/reader/common.h b/ydb/core/tx/columnshard/engines/reader/common.h index c1dcc4cec6b..85f684c482d 100644 --- a/ydb/core/tx/columnshard/engines/reader/common.h +++ b/ydb/core/tx/columnshard/engines/reader/common.h @@ -7,12 +7,20 @@ namespace NKikimr::NOlap::NIndexedReader { class TBatchAddress { private: - YDB_READONLY(ui32, GranuleIdx, 0); - YDB_READONLY(ui32, BatchGranuleIdx, 0); + ui32 GranuleIdx = 0; + ui32 BatchGranuleIdx = 0; public: TString ToString() const; TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx); + + ui32 GetGranuleIdx() const { + return GranuleIdx; + } + + ui32 GetBatchGranuleIdx() const { + return BatchGranuleIdx; + } }; } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index a7a58da4d02..c119d816112 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -50,15 +50,15 @@ public: bool Load(TInsertTableAccessor& accessor, const TInstant&) override { for (auto&& i : Inserted) { - accessor.AddInserted(i.first, std::move(i.second)); + accessor.AddInserted(std::move(i.second), true); } for (auto&& i : Aborted) { - accessor.AddAborted(i.first, std::move(i.second)); + accessor.AddAborted(std::move(i.second), true); } for (auto&& i : Committed) { for (auto&& c: i.second) { auto copy = c; - accessor.AddCommitted(std::move(copy)); + accessor.AddCommitted(std::move(copy), true); } } return true; |