aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-09-06 19:33:24 +0300
committersnaury <snaury@ydb.tech>2022-09-06 19:33:24 +0300
commit45e9274c6ef0bb06bb0859707c4d0e85174f437b (patch)
treed2fcfd9c964da6415454bca5b4b56740df9830e0
parent54ead21458150a3f3055a5f4077cd5f5309b3818 (diff)
downloadydb-45e9274c6ef0bb06bb0859707c4d0e85174f437b.tar.gz
Use simple refcount and tagged list items for TOperation
-rw-r--r--ydb/core/tx/datashard/datashard_dep_tracker.cpp100
-rw-r--r--ydb/core/tx/datashard/datashard_dep_tracker.h24
-rw-r--r--ydb/core/tx/datashard/operation.h42
3 files changed, 78 insertions, 88 deletions
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
index 2c099436a6..9e46cb3604 100644
--- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
@@ -107,7 +107,7 @@ void TDependencyTracker::AddImmediateWrites(const TOperation::TPtr& op, const TK
void TDependencyTracker::FlushPlannedReads() noexcept {
while (!DelayedPlannedReads.Empty()) {
- TOperation::TPtr op = TOperation::From(DelayedPlannedReads.PopFront());
+ TOperation::TPtr op = DelayedPlannedReads.PopFront();
auto reads = op->RemoveDelayedKnownReads();
AddPlannedReads(op, reads);
}
@@ -115,7 +115,7 @@ void TDependencyTracker::FlushPlannedReads() noexcept {
void TDependencyTracker::FlushPlannedWrites() noexcept {
while (!DelayedPlannedWrites.Empty()) {
- TOperation::TPtr op = TOperation::From(DelayedPlannedWrites.PopFront());
+ TOperation::TPtr op = DelayedPlannedWrites.PopFront();
auto writes = op->RemoveDelayedKnownWrites();
AddPlannedWrites(op, writes);
}
@@ -123,7 +123,7 @@ void TDependencyTracker::FlushPlannedWrites() noexcept {
void TDependencyTracker::FlushImmediateReads() noexcept {
while (!DelayedImmediateReads.Empty()) {
- TOperation::TPtr op = TOperation::From(DelayedImmediateReads.PopFront());
+ TOperation::TPtr op = DelayedImmediateReads.PopFront();
auto reads = op->RemoveDelayedKnownReads();
AddImmediateReads(op, reads);
}
@@ -131,7 +131,7 @@ void TDependencyTracker::FlushImmediateReads() noexcept {
void TDependencyTracker::FlushImmediateWrites() noexcept {
while (!DelayedImmediateWrites.Empty()) {
- TOperation::TPtr op = TOperation::From(DelayedImmediateWrites.PopFront());
+ TOperation::TPtr op = DelayedImmediateWrites.PopFront();
auto writes = op->RemoveDelayedKnownWrites();
AddImmediateWrites(op, writes);
}
@@ -289,23 +289,23 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::AddOperation(const TOp
// We are potentially writing to all keys in all tables, thus we conflict with everything
if (op->IsImmediate()) {
for (auto& item : Parent.AllPlannedReaders) {
- TOperation::From(item)->AddImmediateConflict(op);
+ item.AddImmediateConflict(op);
}
for (auto& item : Parent.AllPlannedWriters) {
- TOperation::From(item)->AddImmediateConflict(op);
+ item.AddImmediateConflict(op);
}
} else {
for (auto& item : Parent.AllPlannedReaders) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
for (auto& item : Parent.AllPlannedWriters) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
for (auto& item : Parent.AllImmediateReaders) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
for (auto& item : Parent.AllImmediateWriters) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
}
} else {
@@ -335,23 +335,23 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::AddOperation(const TOp
// If we have any reads we conflict with global reads or writes
if (op->IsImmediate()) {
for (auto& item : Parent.GlobalPlannedReaders) {
- TOperation::From(item)->AddImmediateConflict(op);
+ item.AddImmediateConflict(op);
}
for (auto& item : Parent.GlobalPlannedWriters) {
- TOperation::From(item)->AddImmediateConflict(op);
+ item.AddImmediateConflict(op);
}
} else {
for (auto& item : Parent.GlobalPlannedReaders) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
for (auto& item : Parent.GlobalPlannedWriters) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
for (auto& item : Parent.GlobalImmediateReaders) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
for (auto& item : Parent.GlobalImmediateWriters) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
}
}
@@ -360,14 +360,14 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::AddOperation(const TOp
// We are potentially reading all keys in all tables, thus we conflict with all writes
if (op->IsImmediate()) {
for (auto& item : Parent.AllPlannedWriters) {
- TOperation::From(item)->AddImmediateConflict(op);
+ item.AddImmediateConflict(op);
}
} else {
for (auto& item : Parent.AllPlannedWriters) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
for (auto& item : Parent.AllImmediateWriters) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
}
} else if (haveReads) {
@@ -393,14 +393,14 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::AddOperation(const TOp
if (!haveWrites) {
if (op->IsImmediate()) {
for (auto& item : Parent.GlobalPlannedWriters) {
- TOperation::From(item)->AddImmediateConflict(op);
+ item.AddImmediateConflict(op);
}
} else {
for (auto& item : Parent.GlobalPlannedWriters) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
for (auto& item : Parent.GlobalImmediateWriters) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
}
}
@@ -544,16 +544,11 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::RemoveOperation(const
}
}
- TOperationAllListItem* allListItem = op.Get();
- TOperationGlobalListItem* globalListItem = op.Get();
- TOperationDelayedReadListItem* delayedReadListItem = op.Get();
- TOperationDelayedWriteListItem* delayedWriteListItem = op.Get();
+ op->UnlinkFromList<TOperationAllListTag>();
+ op->UnlinkFromList<TOperationGlobalListTag>();
- allListItem->Unlink();
- globalListItem->Unlink();
-
- if (!delayedReadListItem->Empty()) {
- delayedReadListItem->Unlink();
+ if (op->IsInList<TOperationDelayedReadListTag>()) {
+ op->UnlinkFromList<TOperationDelayedReadListTag>();
op->RemoveDelayedKnownReads();
} else {
for (auto& kv : Parent.Tables) {
@@ -565,8 +560,8 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::RemoveOperation(const
}
}
- if (!delayedWriteListItem->Empty()) {
- delayedWriteListItem->Unlink();
+ if (op->IsInList<TOperationDelayedWriteListTag>()) {
+ op->UnlinkFromList<TOperationDelayedWriteListTag>();
op->RemoveDelayedKnownWrites();
} else {
for (auto& kv : Parent.Tables) {
@@ -757,14 +752,14 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
// We are potentially writing to all keys in all tables, thus we conflict with everything
if (op->IsImmediate()) {
for (auto& item : Parent.AllPlannedWriters) {
- TOperation::From(item)->AddImmediateConflict(op);
+ item.AddImmediateConflict(op);
}
} else {
for (auto& item : Parent.AllPlannedWriters) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
for (auto& item : Parent.AllImmediateWriters) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
// In mvcc case we skip immediate readers because they read at a fixed time point,
// so that they already calculated all their dependencies and cannot have new ones,
@@ -798,18 +793,18 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
// If we have any reads or writes we conflict with global writes
if (op->IsImmediate()) {
for (auto& item : Parent.GlobalPlannedWriters) {
- TOperation::From(item)->AddImmediateConflict(op);
+ item.AddImmediateConflict(op);
}
} else {
for (auto& item : Parent.GlobalPlannedWriters) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
// Here we have immediate writers who are global readers as well
for (auto& item : Parent.GlobalImmediateReaders) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
for (auto& item : Parent.GlobalImmediateWriters) {
- op->AddImmediateConflict(TOperation::From(item));
+ op->AddImmediateConflict(&item);
}
}
}
@@ -818,11 +813,11 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
// We are potentially reading all keys in all tables, thus we conflict with all writes
if (op->IsImmediate()) {
for (auto& item : Parent.AllPlannedWriters) {
- onImmediateConflict(*TOperation::From(item));
+ onImmediateConflict(item);
}
} else {
for (auto& item : Parent.AllPlannedWriters) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
// Since each happened read moves incomplete edge, neither immediate write cannot affect
// such read, that's why a planned reader cannot be a dependency for any immediate write
@@ -846,11 +841,11 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
if (!haveWrites) {
if (op->IsImmediate()) {
for (auto& item : Parent.GlobalPlannedWriters) {
- onImmediateConflict(*TOperation::From(item));
+ onImmediateConflict(item);
}
} else {
for (auto& item : Parent.GlobalPlannedWriters) {
- op->AddDependency(TOperation::From(item));
+ op->AddDependency(&item);
}
// Since each happened read moves incomplete edge, neither immediate write cannot affect
// such read, that's why a planned reader cannot be a dependency for any immediate write
@@ -983,16 +978,11 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::RemoveOperation(const TOp
}
}
- TOperationAllListItem* allListItem = op.Get();
- TOperationGlobalListItem* globalListItem = op.Get();
- TOperationDelayedReadListItem* delayedReadListItem = op.Get();
- TOperationDelayedWriteListItem* delayedWriteListItem = op.Get();
-
- allListItem->Unlink();
- globalListItem->Unlink();
+ op->UnlinkFromList<TOperationAllListTag>();
+ op->UnlinkFromList<TOperationGlobalListTag>();
- if (!delayedReadListItem->Empty()) {
- delayedReadListItem->Unlink();
+ if (op->IsInList<TOperationDelayedReadListTag>()) {
+ op->UnlinkFromList<TOperationDelayedReadListTag>();
op->RemoveDelayedKnownReads();
} else {
for (auto& kv : Parent.Tables) {
@@ -1004,8 +994,8 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::RemoveOperation(const TOp
}
}
- if (!delayedWriteListItem->Empty()) {
- delayedWriteListItem->Unlink();
+ if (op->IsInList<TOperationDelayedWriteListTag>()) {
+ op->UnlinkFromList<TOperationDelayedWriteListTag>();
op->RemoveDelayedKnownWrites();
} else {
for (auto& kv : Parent.Tables) {
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.h b/ydb/core/tx/datashard/datashard_dep_tracker.h
index 856c71411c..22f49b2bad 100644
--- a/ydb/core/tx/datashard/datashard_dep_tracker.h
+++ b/ydb/core/tx/datashard/datashard_dep_tracker.h
@@ -135,20 +135,20 @@ private:
// Maps table id to current ranges that have been read/written by all transactions
NFH::TFlatHashMap<ui64, TTableState> Tables;
// All operations that are reading or writing something
- TIntrusiveList<TOperationAllListItem> AllPlannedReaders;
- TIntrusiveList<TOperationAllListItem> AllPlannedWriters;
- TIntrusiveList<TOperationAllListItem> AllImmediateReaders;
- TIntrusiveList<TOperationAllListItem> AllImmediateWriters;
+ TIntrusiveList<TOperation, TOperationAllListTag> AllPlannedReaders;
+ TIntrusiveList<TOperation, TOperationAllListTag> AllPlannedWriters;
+ TIntrusiveList<TOperation, TOperationAllListTag> AllImmediateReaders;
+ TIntrusiveList<TOperation, TOperationAllListTag> AllImmediateWriters;
// A set of operations that read/write table globally
- TIntrusiveList<TOperationGlobalListItem> GlobalPlannedReaders;
- TIntrusiveList<TOperationGlobalListItem> GlobalPlannedWriters;
- TIntrusiveList<TOperationGlobalListItem> GlobalImmediateReaders;
- TIntrusiveList<TOperationGlobalListItem> GlobalImmediateWriters;
+ TIntrusiveList<TOperation, TOperationGlobalListTag> GlobalPlannedReaders;
+ TIntrusiveList<TOperation, TOperationGlobalListTag> GlobalPlannedWriters;
+ TIntrusiveList<TOperation, TOperationGlobalListTag> GlobalImmediateReaders;
+ TIntrusiveList<TOperation, TOperationGlobalListTag> GlobalImmediateWriters;
// Immediate operations that have delayed adding their keys to ranges
- TIntrusiveList<TOperationDelayedReadListItem> DelayedPlannedReads;
- TIntrusiveList<TOperationDelayedReadListItem> DelayedImmediateReads;
- TIntrusiveList<TOperationDelayedWriteListItem> DelayedPlannedWrites;
- TIntrusiveList<TOperationDelayedWriteListItem> DelayedImmediateWrites;
+ TIntrusiveList<TOperation, TOperationDelayedReadListTag> DelayedPlannedReads;
+ TIntrusiveList<TOperation, TOperationDelayedReadListTag> DelayedImmediateReads;
+ TIntrusiveList<TOperation, TOperationDelayedWriteListTag> DelayedPlannedWrites;
+ TIntrusiveList<TOperation, TOperationDelayedWriteListTag> DelayedImmediateWrites;
const TDefaultDependencyTrackingLogic DefaultLogic;
const TMvccDependencyTrackingLogic MvccLogic;
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 59a90ced67..888cebdc5a 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -468,18 +468,18 @@ struct TExecutionProfile {
THashMap<EExecutionUnitKind, TUnitProfile> UnitProfiles;
};
-class TOperationAllListItem : public TIntrusiveListItem<TOperationAllListItem> {};
-class TOperationGlobalListItem : public TIntrusiveListItem<TOperationGlobalListItem> {};
-class TOperationDelayedReadListItem : public TIntrusiveListItem<TOperationDelayedReadListItem> {};
-class TOperationDelayedWriteListItem : public TIntrusiveListItem<TOperationDelayedWriteListItem> {};
+struct TOperationAllListTag {};
+struct TOperationGlobalListTag {};
+struct TOperationDelayedReadListTag {};
+struct TOperationDelayedWriteListTag {};
class TOperation
: public TBasicOpInfo
- , public TThrRefBase
- , public TOperationAllListItem
- , public TOperationGlobalListItem
- , public TOperationDelayedReadListItem
- , public TOperationDelayedWriteListItem
+ , public TSimpleRefCount<TOperation>
+ , public TIntrusiveListItem<TOperation, TOperationAllListTag>
+ , public TIntrusiveListItem<TOperation, TOperationGlobalListTag>
+ , public TIntrusiveListItem<TOperation, TOperationDelayedReadListTag>
+ , public TIntrusiveListItem<TOperation, TOperationDelayedWriteListTag>
{
public:
enum EDepFlag {
@@ -505,9 +505,7 @@ public:
using TPtr = TIntrusivePtr<TOperation>;
using EResultStatus = NKikimrTxDataShard::TEvProposeTransactionResult::EStatus;
- ~TOperation()
- {
- }
+ virtual ~TOperation() = default;
TActorId GetTarget() const { return Target; }
void SetTarget(TActorId target) { Target = target; }
@@ -516,15 +514,17 @@ public:
void SetCookie(ui64 cookie) { Cookie = cookie; }
public:
- static TOperation* From(TOperationAllListItem* item) { return static_cast<TOperation*>(item); }
- static TOperation* From(TOperationGlobalListItem* item) { return static_cast<TOperation*>(item); }
- static TOperation* From(TOperationDelayedReadListItem* item) { return static_cast<TOperation*>(item); }
- static TOperation* From(TOperationDelayedWriteListItem* item) { return static_cast<TOperation*>(item); }
-
- static TOperation* From(TOperationAllListItem& item) { return static_cast<TOperation*>(&item); }
- static TOperation* From(TOperationGlobalListItem& item) { return static_cast<TOperation*>(&item); }
- static TOperation* From(TOperationDelayedReadListItem& item) { return static_cast<TOperation*>(&item); }
- static TOperation* From(TOperationDelayedWriteListItem& item) { return static_cast<TOperation*>(&item); }
+ template<class TTag>
+ bool IsInList() const {
+ using TItem = TIntrusiveListItem<TOperation, TTag>;
+ return !static_cast<const TItem*>(this)->Empty();
+ }
+
+ template<class TTag>
+ void UnlinkFromList() {
+ using TItem = TIntrusiveListItem<TOperation, TTag>;
+ static_cast<TItem*>(this)->Unlink();
+ }
public:
TDuration GetTotalElapsed() const {