aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-13 14:47:10 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-13 14:47:10 +0300
commitb9ab00ec95d68d65d1b8974c7b45d6822c2d0c00 (patch)
tree14eebff3e5b8e59285ed29fb60a718280cd2304e
parent965c3d7760fc9b38267cff838aeab00223831be3 (diff)
downloadydb-b9ab00ec95d68d65d1b8974c7b45d6822c2d0c00.tar.gz
separate rt/persistent logic
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h6
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp166
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.h56
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/path_info.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/path_info.h2
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp123
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h44
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common.h12
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp6
9 files changed, 232 insertions, 192 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index f8e03b52366..345e05cd900 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -500,13 +500,13 @@ struct Schema : NIceDb::Schema {
switch (recType) {
case EInsertTableIds::Inserted:
- insertTable.AddInserted(TWriteId{ data.WriteTxId }, std::move(data));
+ insertTable.AddInserted(std::move(data), true);
break;
case EInsertTableIds::Committed:
- insertTable.AddCommitted(std::move(data));
+ insertTable.AddCommitted(std::move(data), true);
break;
case EInsertTableIds::Aborted:
- insertTable.AddAborted(TWriteId{ data.WriteTxId }, std::move(data));
+ insertTable.AddAborted(std::move(data), true);
break;
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
index c7a799de8c4..82a5bd3dd26 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
@@ -5,62 +5,23 @@
namespace NKikimr::NOlap {
-void TInsertTable::OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept {
- if (!load) {
- Counters.Inserted.Add(dataSize);
- }
- pathInfo.AddInsertedSize(dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
- ++StatsPrepared.Rows;
- StatsPrepared.Bytes += dataSize;
-}
-
-void TInsertTable::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept {
- Counters.Inserted.Erase(dataSize);
- pathInfo.AddInsertedSize(-1 * (i64)dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
- Y_VERIFY(--StatsPrepared.Rows >= 0);
- StatsPrepared.Bytes += dataSize;
-}
-
-void TInsertTable::OnNewCommitted(const ui64 dataSize, const bool load) noexcept {
- if (!load) {
- Counters.Committed.Add(dataSize);
- }
- ++StatsCommitted.Rows;
- StatsCommitted.Bytes += dataSize;
-}
-
-void TInsertTable::OnEraseCommitted(TPathInfo& /*pathInfo*/, const ui64 dataSize) noexcept {
- Counters.Committed.Erase(dataSize);
- Y_VERIFY(--StatsCommitted.Rows >= 0);
- StatsCommitted.Bytes -= dataSize;
-}
-
bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) {
- TWriteId writeId{data.WriteTxId};
- if (Inserted.contains(writeId)) {
- Counters.Inserted.SkipAdd(data.BlobSize());
- return false;
- }
-
- dbTable.Insert(data);
- const ui32 dataSize = data.BlobSize();
- const ui64 pathId = data.PathId;
- if (Inserted.emplace(writeId, std::move(data)).second) {
- OnNewInserted(Summary.GetPathInfo(pathId), dataSize);
+ if (auto* dataPtr = Summary.AddInserted(std::move(data))) {
+ dbTable.Insert(*dataPtr);
+ return true;
} else {
- Counters.Inserted.SkipAdd(dataSize);
+ return false;
}
- return true;
}
-TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard,
+TInsertionSummary::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard,
const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists) {
Y_VERIFY(!writeIds.empty());
Y_UNUSED(metaShard);
- TCounters counters;
+ TInsertionSummary::TCounters counters;
for (auto writeId : writeIds) {
- auto* data = Inserted.FindPtr(writeId);
+ std::optional<TInsertedData> data = Summary.ExtractInserted(writeId);
Y_VERIFY(data, "Commit %" PRIu64 ":%" PRIu64 " : writeId %" PRIu64 " not found", planStep, txId, (ui64)writeId);
NKikimrTxColumnShard::TLogicalMetadata meta;
@@ -72,7 +33,6 @@ TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep,
dbTable.EraseInserted(*data);
- const ui64 dataSize = data->BlobSize();
const ui64 pathId = data->PathId;
auto* pathInfo = Summary.GetPathInfoOptional(pathId);
// There could be commit after drop: propose, drop, plan
@@ -80,17 +40,10 @@ TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep,
data->Commit(planStep, txId);
dbTable.Commit(*data);
- if (pathInfo->AddCommitted(std::move(*data))) {
- OnNewCommitted(dataSize);
- }
+ pathInfo->AddCommitted(std::move(*data));
} else {
dbTable.Abort(*data);
- Counters.Aborted.Add(data->BlobSize());
- Aborted.emplace(writeId, std::move(*data));
- }
-
- if (pathInfo && Inserted.erase(writeId)) {
- OnEraseInserted(*pathInfo, dataSize);
+ Summary.AddAborted(std::move(*data));
}
}
@@ -103,19 +56,10 @@ void TInsertTable::Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWr
for (auto writeId : writeIds) {
// There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId.
- if (auto* data = Inserted.FindPtr(writeId)) {
- Counters.Aborted.Add(data->BlobSize());
+ if (std::optional<TInsertedData> data = Summary.ExtractInserted(writeId)) {
dbTable.EraseInserted(*data);
dbTable.Abort(*data);
-
- const ui64 pathId = data->PathId;
- const ui32 dataSize = data->BlobSize();
- Aborted.emplace(writeId, std::move(*data));
- if (Inserted.erase(writeId)) {
- OnEraseInserted(Summary.GetPathInfo(pathId), dataSize);
- } else {
- Counters.Inserted.SkipErase(dataSize);
- }
+ Summary.AddAborted(std::move(*data));
}
}
}
@@ -129,96 +73,40 @@ THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const {
}
LastCleanup = now;
- TInstant timeBorder = now - WaitCommitDelay;
- THashSet<TWriteId> toAbort;
- for (auto& [writeId, data] : Inserted) {
- if (data.DirtyTime && data.DirtyTime < timeBorder) {
- toAbort.insert(writeId);
- }
- }
- return toAbort;
+ const TInstant timeBorder = now - WaitCommitDelay;
+ return Summary.GetDeprecatedInsertions(timeBorder);
}
THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) {
- // Committed -> Aborted (for future cleanup)
-
auto pathInfo = Summary.ExtractPathInfo(pathId);
- if (!pathInfo) {
- return {};
- }
- for (auto& data : pathInfo->GetCommitted()) {
- dbTable.EraseCommitted(data);
- OnEraseCommitted(*pathInfo, data.BlobSize());
- TInsertedData copy = data;
- copy.Undo();
- dbTable.Abort(copy);
-
- TWriteId writeId{copy.WriteTxId};
- Counters.Aborted.Add(copy.BlobSize());
- Aborted.emplace(writeId, std::move(copy));
- }
-
- // Return not committed writes for abort. Tablet filter this list with proposed ones befor Abort().
-
- THashSet<TWriteId> toAbort;
- for (auto& [writeId, data] : Inserted) {
- if (data.PathId == pathId) {
- toAbort.insert(writeId);
+ if (!!pathInfo) {
+ for (auto& data : pathInfo->GetCommitted()) {
+ dbTable.EraseCommitted(data);
+ TInsertedData copy = data;
+ copy.Undo();
+ dbTable.Abort(copy);
+ Summary.AddAborted(std::move(copy));
}
}
- return toAbort;
+ return Summary.GetInsertedByPathId(pathId);
}
void TInsertTable::EraseCommitted(IDbWrapper& dbTable, const TInsertedData& data) {
- TPathInfo* pathInfo = Summary.GetPathInfoOptional(data.PathId);
- if (!pathInfo) {
- Counters.Committed.SkipErase(data.BlobSize());
- return;
- }
-
- dbTable.EraseCommitted(data);
- if (pathInfo->EraseCommitted(data)) {
- OnEraseCommitted(*pathInfo, data.BlobSize());
- } else {
- Counters.Committed.SkipErase(data.BlobSize());
+ if (Summary.EraseCommitted(data)) {
+ dbTable.EraseCommitted(data);
}
}
void TInsertTable::EraseAborted(IDbWrapper& dbTable, const TInsertedData& data) {
- TWriteId writeId{data.WriteTxId};
- if (!Aborted.contains(writeId)) {
- return;
+ if (Summary.EraseAborted((TWriteId)data.WriteTxId)) {
+ dbTable.EraseAborted(data);
}
-
- dbTable.EraseAborted(data);
- Counters.Aborted.Erase(data.BlobSize());
- Aborted.erase(writeId);
}
-bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) {
+bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant loadTime) {
Clear();
-
- if (!dbTable.Load(*this, loadTime)) {
- return false;
- }
-
- // update stats
-
- StatsPrepared = {};
- StatsCommitted = {};
-
- for (auto& [_, data] : Inserted) {
- OnNewInserted(Summary.GetPathInfo(data.PathId), data.BlobSize());
- }
-
- for (auto& [pathId, pathInfo] : Summary.GetPathInfo()) {
- for (auto& data : pathInfo.GetCommitted()) {
- OnNewCommitted(data.BlobSize());
- }
- }
-
- return true;
+ return dbTable.Load(*this, loadTime);
}
std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const {
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h
index d777b2e0a73..904991dbf30 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h
@@ -14,52 +14,47 @@ class IDbWrapper;
class TInsertTableAccessor {
protected:
- THashMap<TWriteId, TInsertedData> Inserted;
- THashMap<TWriteId, TInsertedData> Aborted;
TInsertionSummary Summary;
protected:
void Clear() {
- Inserted.clear();
Summary.Clear();
- Aborted.clear();
}
public:
const std::map<ui64, std::set<const TPathInfo*>>& GetPathPriorities() const {
return Summary.GetPathPriorities();
}
- bool AddInserted(const TWriteId& writeId, TInsertedData&& data) {
- return Inserted.emplace(writeId, std::move(data)).second;
+ bool AddInserted(TInsertedData&& data, const bool load) {
+ return Summary.AddInserted(std::move(data), load);
}
- bool AddAborted(const TWriteId& writeId, TInsertedData&& data) {
- return Aborted.emplace(writeId, std::move(data)).second;
+ bool AddAborted(TInsertedData&& data, const bool load) {
+ return Summary.AddAborted(std::move(data), load);
}
- bool AddCommitted(TInsertedData&& data) {
+ bool AddCommitted(TInsertedData&& data, const bool load) {
const ui64 pathId = data.PathId;
- return Summary.GetPathInfo(pathId).AddCommitted(std::move(data));
+ return Summary.GetPathInfo(pathId).AddCommitted(std::move(data), load);
+ }
+ const THashMap<TWriteId, TInsertedData>& GetAborted() const { return Summary.GetAborted(); }
+ const THashMap<TWriteId, TInsertedData>& GetInserted() const { return Summary.GetInserted(); }
+ const TInsertionSummary::TCounters& GetCountersPrepared() const {
+ return Summary.GetCountersPrepared();
+ }
+ const TInsertionSummary::TCounters& GetCountersCommitted() const {
+ return Summary.GetCountersCommitted();
+ }
+ bool IsOverloadedByCommitted(const ui64 pathId) const {
+ return Summary.IsOverloaded(pathId);
}
};
class TInsertTable: public TInsertTableAccessor {
-private:
- void OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load = false) noexcept;
- void OnNewCommitted(const ui64 dataSize, const bool load = false) noexcept;
- void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
- void OnEraseCommitted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
-
public:
- static constexpr const TDuration WaitCommitDelay = TDuration::Hours(24);
+ static constexpr const TDuration WaitCommitDelay = TDuration::Minutes(10);
static constexpr const TDuration CleanDelay = TDuration::Minutes(10);
- struct TCounters {
- ui64 Rows{};
- ui64 Bytes{};
- ui64 RawBytes{};
- };
-
bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
- TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard,
+ TInsertionSummary::TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard,
const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists);
void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds);
THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const;
@@ -67,21 +62,10 @@ public:
void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key);
void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key);
std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const;
- bool Load(IDbWrapper& dbTable, const TInstant& loadTime);
- const TCounters& GetCountersPrepared() const { return StatsPrepared; }
- const TCounters& GetCountersCommitted() const { return StatsCommitted; }
-
- size_t InsertedSize() const { return Inserted.size(); }
- const THashMap<TWriteId, TInsertedData>& GetAborted() const { return Aborted; }
- bool IsOverloadedByCommitted(const ui64 pathId) const {
- return Summary.IsOverloaded(pathId);
- }
+ bool Load(IDbWrapper& dbTable, const TInstant loadTime);
private:
mutable TInstant LastCleanup;
- TCounters StatsPrepared;
- TCounters StatsCommitted;
- const NColumnShard::TInsertTableCounters Counters;
};
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
index a68a84a2b13..e70d48ac472 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
@@ -19,16 +19,12 @@ bool TPathInfo::SetInsertedOverload(const bool value) {
void TPathInfo::AddCommittedSize(const i64 size, const ui64 overloadLimit) {
CommittedSize += size;
Y_VERIFY(CommittedSize >= 0);
- Summary->CommittedSize += size;
- Y_VERIFY(Summary->CommittedSize >= 0);
SetCommittedOverload((ui64)CommittedSize > overloadLimit);
}
void TPathInfo::AddInsertedSize(const i64 size, const ui64 overloadLimit) {
InsertedSize += size;
Y_VERIFY(InsertedSize >= 0);
- Summary->InsertedSize += size;
- Y_VERIFY(Summary->InsertedSize >= 0);
PathIdCounters.Inserted.OnPathIdDataInfo(InsertedSize, 0);
SetInsertedOverload((ui64)InsertedSize > overloadLimit);
}
@@ -39,14 +35,17 @@ bool TPathInfo::EraseCommitted(const TInsertedData& data) {
AddCommittedSize(-1 * (i64)data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
Summary->AddPriority(*this);
PathIdCounters.Committed.OnPathIdDataInfo(CommittedSize, Committed.size());
+ Summary->OnEraseCommitted(*this, data.BlobSize());
return result;
}
-bool TPathInfo::AddCommitted(TInsertedData&& data) {
+bool TPathInfo::AddCommitted(TInsertedData&& data, const bool load) {
+ const ui64 dataSize = data.BlobSize();
Summary->RemovePriority(*this);
AddCommittedSize(data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
bool result = Committed.emplace(std::move(data)).second;
Summary->AddPriority(*this);
+ Summary->OnNewCommitted(dataSize, load);
PathIdCounters.Committed.OnPathIdDataInfo(CommittedSize, Committed.size());
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.h b/ydb/core/tx/columnshard/engines/insert_table/path_info.h
index 57f19d1298a..22d033c124e 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/path_info.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.h
@@ -40,7 +40,7 @@ public:
return Committed;
}
- bool AddCommitted(TInsertedData&& data);
+ bool AddCommitted(TInsertedData&& data, const bool load = false);
bool IsOverloaded() const {
return CommittedOverload || InsertedOverload;
diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
index f76b5379379..1b50f81e899 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
@@ -1,7 +1,22 @@
#include "rt_insertion.h"
+#include <ydb/core/tx/columnshard/engines/column_engine.h>
namespace NKikimr::NOlap {
+void TInsertionSummary::OnNewCommitted(const ui64 dataSize, const bool load) noexcept {
+ if (!load) {
+ Counters.Committed.Add(dataSize);
+ }
+ ++StatsCommitted.Rows;
+ StatsCommitted.Bytes += dataSize;
+}
+
+void TInsertionSummary::OnEraseCommitted(TPathInfo& /*pathInfo*/, const ui64 dataSize) noexcept {
+ Counters.Committed.Erase(dataSize);
+ Y_VERIFY(--StatsCommitted.Rows >= 0);
+ StatsCommitted.Bytes -= dataSize;
+}
+
void TInsertionSummary::RemovePriority(const TPathInfo& pathInfo) noexcept {
const ui64 priority = pathInfo.GetIndexationPriority();
auto it = Priorities.find(priority);
@@ -35,6 +50,9 @@ std::optional<NKikimr::NOlap::TPathInfo> TInsertionSummary::ExtractPathInfo(cons
RemovePriority(it->second);
std::optional<TPathInfo> result = std::move(it->second);
PathInfo.erase(it);
+ for (auto&& i : result->GetCommitted()) {
+ OnEraseCommitted(*result, i.BlobSize());
+ }
return result;
}
@@ -63,8 +81,113 @@ bool TInsertionSummary::IsOverloaded(const ui64 pathId) const {
}
void TInsertionSummary::Clear() {
+ StatsPrepared = {};
+ StatsCommitted = {};
PathInfo.clear();
Priorities.clear();
+ Inserted.clear();
+ Aborted.clear();
+}
+
+void TInsertionSummary::OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept {
+ if (!load) {
+ Counters.Inserted.Add(dataSize);
+ }
+ pathInfo.AddInsertedSize(dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
+ ++StatsPrepared.Rows;
+ StatsPrepared.Bytes += dataSize;
+}
+
+void TInsertionSummary::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept {
+ Counters.Inserted.Erase(dataSize);
+ pathInfo.AddInsertedSize(-1 * (i64)dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
+ Y_VERIFY(--StatsPrepared.Rows >= 0);
+ StatsPrepared.Bytes += dataSize;
+}
+
+THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetInsertedByPathId(const ui64 pathId) const {
+ THashSet<TWriteId> result;
+ for (auto& [writeId, data] : Inserted) {
+ if (data.PathId == pathId) {
+ result.insert(writeId);
+ }
+ }
+
+ return result;
+}
+
+THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetDeprecatedInsertions(const TInstant timeBorder) const {
+ THashSet<TWriteId> toAbort;
+ for (auto& [writeId, data] : Inserted) {
+ if (data.DirtyTime && data.DirtyTime < timeBorder) {
+ toAbort.insert(writeId);
+ }
+ }
+ return toAbort;
+}
+
+bool TInsertionSummary::EraseAborted(const TWriteId writeId) {
+ auto it = Aborted.find(writeId);
+ if (it == Aborted.end()) {
+ return false;
+ }
+ Counters.Aborted.Erase(it->second.BlobSize());
+ Aborted.erase(it);
+ return true;
+}
+
+bool TInsertionSummary::EraseCommitted(const TInsertedData& data) {
+ TPathInfo* pathInfo = GetPathInfoOptional(data.PathId);
+ if (!pathInfo) {
+ Counters.Committed.SkipErase(data.BlobSize());
+ return false;
+ }
+
+ if (!pathInfo->EraseCommitted(data)) {
+ Counters.Committed.SkipErase(data.BlobSize());
+ return false;
+ } else {
+ return true;
+ }
+}
+
+const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) {
+ const TWriteId writeId((TWriteId)data.WriteTxId);
+ if (!load) {
+ Counters.Aborted.Add(data.BlobSize());
+ }
+ auto insertInfo = Aborted.emplace(writeId, std::move(data));
+ Y_VERIFY(insertInfo.second);
+ return &insertInfo.first->second;
+}
+
+std::optional<NKikimr::NOlap::TInsertedData> TInsertionSummary::ExtractInserted(const TWriteId id) {
+ auto it = Inserted.find(id);
+ if (it == Inserted.end()) {
+ return {};
+ } else {
+ auto pathInfo = GetPathInfoOptional(it->second.PathId);
+ if (pathInfo) {
+ OnEraseInserted(*pathInfo, it->second.BlobSize());
+ }
+ std::optional<TInsertedData> result = std::move(it->second);
+ Inserted.erase(it);
+ return result;
+ }
+}
+
+const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) {
+ TWriteId writeId{ data.WriteTxId };
+ const ui32 dataSize = data.BlobSize();
+ const ui64 pathId = data.PathId;
+ auto insertInfo = Inserted.emplace(writeId, std::move(data));
+ if (insertInfo.second) {
+ OnNewInserted(GetPathInfo(pathId), dataSize, load);
+ return &insertInfo.first->second;
+ } else {
+ Counters.Inserted.SkipAdd(dataSize);
+ return nullptr;
+ }
}
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
index 56f8c4bc660..8718dd19f83 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
@@ -6,18 +6,54 @@
namespace NKikimr::NOlap {
class TInsertionSummary {
+public:
+ struct TCounters {
+ ui64 Rows{};
+ ui64 Bytes{};
+ ui64 RawBytes{};
+ };
+
private:
friend class TPathInfo;
-
+ TCounters StatsPrepared;
+ TCounters StatsCommitted;
const NColumnShard::TInsertTableCounters Counters;
- YDB_READONLY(i64, CommittedSize, 0);
- YDB_READONLY(i64, InsertedSize, 0);
+
+ THashMap<TWriteId, TInsertedData> Inserted;
+ THashMap<TWriteId, TInsertedData> Aborted;
+
std::map<ui64, std::set<const TPathInfo*>> Priorities;
THashMap<ui64, TPathInfo> PathInfo;
void RemovePriority(const TPathInfo& pathInfo) noexcept;
void AddPriority(const TPathInfo& pathInfo) noexcept;
+ void OnNewCommitted(const ui64 dataSize, const bool load = false) noexcept;
+ void OnEraseCommitted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
+ void OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept;
+ void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
+
public:
+ THashSet<TWriteId> GetInsertedByPathId(const ui64 pathId) const;
+
+ THashSet<TWriteId> GetDeprecatedInsertions(const TInstant timeBorder) const;
+
+ const THashMap<TWriteId, TInsertedData>& GetInserted() const {
+ return Inserted;
+ }
+ const THashMap<TWriteId, TInsertedData>& GetAborted() const {
+ return Aborted;
+ }
+
+ const TInsertedData* AddAborted(TInsertedData&& data, const bool load = false);
+ bool EraseAborted(const TWriteId writeId);
+
+ bool EraseCommitted(const TInsertedData& data);
+
+ const TInsertedData* AddInserted(TInsertedData&& data, const bool load = false);
+ std::optional<TInsertedData> ExtractInserted(const TWriteId id);
+
+ const TCounters& GetCountersPrepared() const { return StatsPrepared; }
+ const TCounters& GetCountersCommitted() const { return StatsCommitted; }
const NColumnShard::TInsertTableCounters& GetCounters() const {
return Counters;
}
@@ -34,6 +70,8 @@ public:
bool IsOverloaded(const ui64 pathId) const;
+
+
const std::map<ui64, std::set<const TPathInfo*>>& GetPathPriorities() const {
return Priorities;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/common.h b/ydb/core/tx/columnshard/engines/reader/common.h
index c1dcc4cec6b..85f684c482d 100644
--- a/ydb/core/tx/columnshard/engines/reader/common.h
+++ b/ydb/core/tx/columnshard/engines/reader/common.h
@@ -7,12 +7,20 @@ namespace NKikimr::NOlap::NIndexedReader {
class TBatchAddress {
private:
- YDB_READONLY(ui32, GranuleIdx, 0);
- YDB_READONLY(ui32, BatchGranuleIdx, 0);
+ ui32 GranuleIdx = 0;
+ ui32 BatchGranuleIdx = 0;
public:
TString ToString() const;
TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx);
+
+ ui32 GetGranuleIdx() const {
+ return GranuleIdx;
+ }
+
+ ui32 GetBatchGranuleIdx() const {
+ return BatchGranuleIdx;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index a7a58da4d02..c119d816112 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -50,15 +50,15 @@ public:
bool Load(TInsertTableAccessor& accessor,
const TInstant&) override {
for (auto&& i : Inserted) {
- accessor.AddInserted(i.first, std::move(i.second));
+ accessor.AddInserted(std::move(i.second), true);
}
for (auto&& i : Aborted) {
- accessor.AddAborted(i.first, std::move(i.second));
+ accessor.AddAborted(std::move(i.second), true);
}
for (auto&& i : Committed) {
for (auto&& c: i.second) {
auto copy = c;
- accessor.AddCommitted(std::move(copy));
+ accessor.AddCommitted(std::move(copy), true);
}
}
return true;