diff options
author | snaury <snaury@ydb.tech> | 2022-09-06 19:33:24 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-09-06 19:33:24 +0300 |
commit | 45e9274c6ef0bb06bb0859707c4d0e85174f437b (patch) | |
tree | d2fcfd9c964da6415454bca5b4b56740df9830e0 | |
parent | 54ead21458150a3f3055a5f4077cd5f5309b3818 (diff) | |
download | ydb-45e9274c6ef0bb06bb0859707c4d0e85174f437b.tar.gz |
Use simple refcount and tagged list items for TOperation
-rw-r--r-- | ydb/core/tx/datashard/datashard_dep_tracker.cpp | 100 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_dep_tracker.h | 24 | ||||
-rw-r--r-- | ydb/core/tx/datashard/operation.h | 42 |
3 files changed, 78 insertions, 88 deletions
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp index 2c099436a6..9e46cb3604 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp +++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp @@ -107,7 +107,7 @@ void TDependencyTracker::AddImmediateWrites(const TOperation::TPtr& op, const TK void TDependencyTracker::FlushPlannedReads() noexcept { while (!DelayedPlannedReads.Empty()) { - TOperation::TPtr op = TOperation::From(DelayedPlannedReads.PopFront()); + TOperation::TPtr op = DelayedPlannedReads.PopFront(); auto reads = op->RemoveDelayedKnownReads(); AddPlannedReads(op, reads); } @@ -115,7 +115,7 @@ void TDependencyTracker::FlushPlannedReads() noexcept { void TDependencyTracker::FlushPlannedWrites() noexcept { while (!DelayedPlannedWrites.Empty()) { - TOperation::TPtr op = TOperation::From(DelayedPlannedWrites.PopFront()); + TOperation::TPtr op = DelayedPlannedWrites.PopFront(); auto writes = op->RemoveDelayedKnownWrites(); AddPlannedWrites(op, writes); } @@ -123,7 +123,7 @@ void TDependencyTracker::FlushPlannedWrites() noexcept { void TDependencyTracker::FlushImmediateReads() noexcept { while (!DelayedImmediateReads.Empty()) { - TOperation::TPtr op = TOperation::From(DelayedImmediateReads.PopFront()); + TOperation::TPtr op = DelayedImmediateReads.PopFront(); auto reads = op->RemoveDelayedKnownReads(); AddImmediateReads(op, reads); } @@ -131,7 +131,7 @@ void TDependencyTracker::FlushImmediateReads() noexcept { void TDependencyTracker::FlushImmediateWrites() noexcept { while (!DelayedImmediateWrites.Empty()) { - TOperation::TPtr op = TOperation::From(DelayedImmediateWrites.PopFront()); + TOperation::TPtr op = DelayedImmediateWrites.PopFront(); auto writes = op->RemoveDelayedKnownWrites(); AddImmediateWrites(op, writes); } @@ -289,23 +289,23 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::AddOperation(const TOp // We are potentially writing to all keys in all tables, thus we conflict with everything if (op->IsImmediate()) { for (auto& item : Parent.AllPlannedReaders) { - TOperation::From(item)->AddImmediateConflict(op); + item.AddImmediateConflict(op); } for (auto& item : Parent.AllPlannedWriters) { - TOperation::From(item)->AddImmediateConflict(op); + item.AddImmediateConflict(op); } } else { for (auto& item : Parent.AllPlannedReaders) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } for (auto& item : Parent.AllPlannedWriters) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } for (auto& item : Parent.AllImmediateReaders) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } for (auto& item : Parent.AllImmediateWriters) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } } } else { @@ -335,23 +335,23 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::AddOperation(const TOp // If we have any reads we conflict with global reads or writes if (op->IsImmediate()) { for (auto& item : Parent.GlobalPlannedReaders) { - TOperation::From(item)->AddImmediateConflict(op); + item.AddImmediateConflict(op); } for (auto& item : Parent.GlobalPlannedWriters) { - TOperation::From(item)->AddImmediateConflict(op); + item.AddImmediateConflict(op); } } else { for (auto& item : Parent.GlobalPlannedReaders) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } for (auto& item : Parent.GlobalPlannedWriters) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } for (auto& item : Parent.GlobalImmediateReaders) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } for (auto& item : Parent.GlobalImmediateWriters) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } } } @@ -360,14 +360,14 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::AddOperation(const TOp // We are potentially reading all keys in all tables, thus we conflict with all writes if (op->IsImmediate()) { for (auto& item : Parent.AllPlannedWriters) { - TOperation::From(item)->AddImmediateConflict(op); + item.AddImmediateConflict(op); } } else { for (auto& item : Parent.AllPlannedWriters) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } for (auto& item : Parent.AllImmediateWriters) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } } } else if (haveReads) { @@ -393,14 +393,14 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::AddOperation(const TOp if (!haveWrites) { if (op->IsImmediate()) { for (auto& item : Parent.GlobalPlannedWriters) { - TOperation::From(item)->AddImmediateConflict(op); + item.AddImmediateConflict(op); } } else { for (auto& item : Parent.GlobalPlannedWriters) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } for (auto& item : Parent.GlobalImmediateWriters) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } } } @@ -544,16 +544,11 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::RemoveOperation(const } } - TOperationAllListItem* allListItem = op.Get(); - TOperationGlobalListItem* globalListItem = op.Get(); - TOperationDelayedReadListItem* delayedReadListItem = op.Get(); - TOperationDelayedWriteListItem* delayedWriteListItem = op.Get(); + op->UnlinkFromList<TOperationAllListTag>(); + op->UnlinkFromList<TOperationGlobalListTag>(); - allListItem->Unlink(); - globalListItem->Unlink(); - - if (!delayedReadListItem->Empty()) { - delayedReadListItem->Unlink(); + if (op->IsInList<TOperationDelayedReadListTag>()) { + op->UnlinkFromList<TOperationDelayedReadListTag>(); op->RemoveDelayedKnownReads(); } else { for (auto& kv : Parent.Tables) { @@ -565,8 +560,8 @@ void TDependencyTracker::TDefaultDependencyTrackingLogic::RemoveOperation(const } } - if (!delayedWriteListItem->Empty()) { - delayedWriteListItem->Unlink(); + if (op->IsInList<TOperationDelayedWriteListTag>()) { + op->UnlinkFromList<TOperationDelayedWriteListTag>(); op->RemoveDelayedKnownWrites(); } else { for (auto& kv : Parent.Tables) { @@ -757,14 +752,14 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera // We are potentially writing to all keys in all tables, thus we conflict with everything if (op->IsImmediate()) { for (auto& item : Parent.AllPlannedWriters) { - TOperation::From(item)->AddImmediateConflict(op); + item.AddImmediateConflict(op); } } else { for (auto& item : Parent.AllPlannedWriters) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } for (auto& item : Parent.AllImmediateWriters) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } // In mvcc case we skip immediate readers because they read at a fixed time point, // so that they already calculated all their dependencies and cannot have new ones, @@ -798,18 +793,18 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera // If we have any reads or writes we conflict with global writes if (op->IsImmediate()) { for (auto& item : Parent.GlobalPlannedWriters) { - TOperation::From(item)->AddImmediateConflict(op); + item.AddImmediateConflict(op); } } else { for (auto& item : Parent.GlobalPlannedWriters) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } // Here we have immediate writers who are global readers as well for (auto& item : Parent.GlobalImmediateReaders) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } for (auto& item : Parent.GlobalImmediateWriters) { - op->AddImmediateConflict(TOperation::From(item)); + op->AddImmediateConflict(&item); } } } @@ -818,11 +813,11 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera // We are potentially reading all keys in all tables, thus we conflict with all writes if (op->IsImmediate()) { for (auto& item : Parent.AllPlannedWriters) { - onImmediateConflict(*TOperation::From(item)); + onImmediateConflict(item); } } else { for (auto& item : Parent.AllPlannedWriters) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } // Since each happened read moves incomplete edge, neither immediate write cannot affect // such read, that's why a planned reader cannot be a dependency for any immediate write @@ -846,11 +841,11 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera if (!haveWrites) { if (op->IsImmediate()) { for (auto& item : Parent.GlobalPlannedWriters) { - onImmediateConflict(*TOperation::From(item)); + onImmediateConflict(item); } } else { for (auto& item : Parent.GlobalPlannedWriters) { - op->AddDependency(TOperation::From(item)); + op->AddDependency(&item); } // Since each happened read moves incomplete edge, neither immediate write cannot affect // such read, that's why a planned reader cannot be a dependency for any immediate write @@ -983,16 +978,11 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::RemoveOperation(const TOp } } - TOperationAllListItem* allListItem = op.Get(); - TOperationGlobalListItem* globalListItem = op.Get(); - TOperationDelayedReadListItem* delayedReadListItem = op.Get(); - TOperationDelayedWriteListItem* delayedWriteListItem = op.Get(); - - allListItem->Unlink(); - globalListItem->Unlink(); + op->UnlinkFromList<TOperationAllListTag>(); + op->UnlinkFromList<TOperationGlobalListTag>(); - if (!delayedReadListItem->Empty()) { - delayedReadListItem->Unlink(); + if (op->IsInList<TOperationDelayedReadListTag>()) { + op->UnlinkFromList<TOperationDelayedReadListTag>(); op->RemoveDelayedKnownReads(); } else { for (auto& kv : Parent.Tables) { @@ -1004,8 +994,8 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::RemoveOperation(const TOp } } - if (!delayedWriteListItem->Empty()) { - delayedWriteListItem->Unlink(); + if (op->IsInList<TOperationDelayedWriteListTag>()) { + op->UnlinkFromList<TOperationDelayedWriteListTag>(); op->RemoveDelayedKnownWrites(); } else { for (auto& kv : Parent.Tables) { diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.h b/ydb/core/tx/datashard/datashard_dep_tracker.h index 856c71411c..22f49b2bad 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.h +++ b/ydb/core/tx/datashard/datashard_dep_tracker.h @@ -135,20 +135,20 @@ private: // Maps table id to current ranges that have been read/written by all transactions NFH::TFlatHashMap<ui64, TTableState> Tables; // All operations that are reading or writing something - TIntrusiveList<TOperationAllListItem> AllPlannedReaders; - TIntrusiveList<TOperationAllListItem> AllPlannedWriters; - TIntrusiveList<TOperationAllListItem> AllImmediateReaders; - TIntrusiveList<TOperationAllListItem> AllImmediateWriters; + TIntrusiveList<TOperation, TOperationAllListTag> AllPlannedReaders; + TIntrusiveList<TOperation, TOperationAllListTag> AllPlannedWriters; + TIntrusiveList<TOperation, TOperationAllListTag> AllImmediateReaders; + TIntrusiveList<TOperation, TOperationAllListTag> AllImmediateWriters; // A set of operations that read/write table globally - TIntrusiveList<TOperationGlobalListItem> GlobalPlannedReaders; - TIntrusiveList<TOperationGlobalListItem> GlobalPlannedWriters; - TIntrusiveList<TOperationGlobalListItem> GlobalImmediateReaders; - TIntrusiveList<TOperationGlobalListItem> GlobalImmediateWriters; + TIntrusiveList<TOperation, TOperationGlobalListTag> GlobalPlannedReaders; + TIntrusiveList<TOperation, TOperationGlobalListTag> GlobalPlannedWriters; + TIntrusiveList<TOperation, TOperationGlobalListTag> GlobalImmediateReaders; + TIntrusiveList<TOperation, TOperationGlobalListTag> GlobalImmediateWriters; // Immediate operations that have delayed adding their keys to ranges - TIntrusiveList<TOperationDelayedReadListItem> DelayedPlannedReads; - TIntrusiveList<TOperationDelayedReadListItem> DelayedImmediateReads; - TIntrusiveList<TOperationDelayedWriteListItem> DelayedPlannedWrites; - TIntrusiveList<TOperationDelayedWriteListItem> DelayedImmediateWrites; + TIntrusiveList<TOperation, TOperationDelayedReadListTag> DelayedPlannedReads; + TIntrusiveList<TOperation, TOperationDelayedReadListTag> DelayedImmediateReads; + TIntrusiveList<TOperation, TOperationDelayedWriteListTag> DelayedPlannedWrites; + TIntrusiveList<TOperation, TOperationDelayedWriteListTag> DelayedImmediateWrites; const TDefaultDependencyTrackingLogic DefaultLogic; const TMvccDependencyTrackingLogic MvccLogic; diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 59a90ced67..888cebdc5a 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -468,18 +468,18 @@ struct TExecutionProfile { THashMap<EExecutionUnitKind, TUnitProfile> UnitProfiles; }; -class TOperationAllListItem : public TIntrusiveListItem<TOperationAllListItem> {}; -class TOperationGlobalListItem : public TIntrusiveListItem<TOperationGlobalListItem> {}; -class TOperationDelayedReadListItem : public TIntrusiveListItem<TOperationDelayedReadListItem> {}; -class TOperationDelayedWriteListItem : public TIntrusiveListItem<TOperationDelayedWriteListItem> {}; +struct TOperationAllListTag {}; +struct TOperationGlobalListTag {}; +struct TOperationDelayedReadListTag {}; +struct TOperationDelayedWriteListTag {}; class TOperation : public TBasicOpInfo - , public TThrRefBase - , public TOperationAllListItem - , public TOperationGlobalListItem - , public TOperationDelayedReadListItem - , public TOperationDelayedWriteListItem + , public TSimpleRefCount<TOperation> + , public TIntrusiveListItem<TOperation, TOperationAllListTag> + , public TIntrusiveListItem<TOperation, TOperationGlobalListTag> + , public TIntrusiveListItem<TOperation, TOperationDelayedReadListTag> + , public TIntrusiveListItem<TOperation, TOperationDelayedWriteListTag> { public: enum EDepFlag { @@ -505,9 +505,7 @@ public: using TPtr = TIntrusivePtr<TOperation>; using EResultStatus = NKikimrTxDataShard::TEvProposeTransactionResult::EStatus; - ~TOperation() - { - } + virtual ~TOperation() = default; TActorId GetTarget() const { return Target; } void SetTarget(TActorId target) { Target = target; } @@ -516,15 +514,17 @@ public: void SetCookie(ui64 cookie) { Cookie = cookie; } public: - static TOperation* From(TOperationAllListItem* item) { return static_cast<TOperation*>(item); } - static TOperation* From(TOperationGlobalListItem* item) { return static_cast<TOperation*>(item); } - static TOperation* From(TOperationDelayedReadListItem* item) { return static_cast<TOperation*>(item); } - static TOperation* From(TOperationDelayedWriteListItem* item) { return static_cast<TOperation*>(item); } - - static TOperation* From(TOperationAllListItem& item) { return static_cast<TOperation*>(&item); } - static TOperation* From(TOperationGlobalListItem& item) { return static_cast<TOperation*>(&item); } - static TOperation* From(TOperationDelayedReadListItem& item) { return static_cast<TOperation*>(&item); } - static TOperation* From(TOperationDelayedWriteListItem& item) { return static_cast<TOperation*>(&item); } + template<class TTag> + bool IsInList() const { + using TItem = TIntrusiveListItem<TOperation, TTag>; + return !static_cast<const TItem*>(this)->Empty(); + } + + template<class TTag> + void UnlinkFromList() { + using TItem = TIntrusiveListItem<TOperation, TTag>; + static_cast<TItem*>(this)->Unlink(); + } public: TDuration GetTotalElapsed() const { |