summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <[email protected]>2025-07-23 18:26:39 +0300
committerGitHub <[email protected]>2025-07-23 18:26:39 +0300
commit2dffb9996a88ae9f28c6c66db4ce9c5bc98f57a9 (patch)
tree79d56e0ab191fd6c7743b37b6fb9dbd51e68f637
parent674119ec63510afcccbc4284e9b682f7deb125f6 (diff)
Make TQueueInplace and TOneOneQueueInplace usable without an extra pointer (#21488)
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp17
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp3
-rw-r--r--ydb/core/tablet/tablet_pipe_client.cpp13
-rw-r--r--ydb/core/tablet/tablet_resolver.cpp42
-rw-r--r--ydb/core/tablet_flat/logic_redo_entry.h10
-rw-r--r--ydb/core/tablet_flat/logic_redo_queue.h19
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h2
-rw-r--r--ydb/core/tx/datashard/progress_queue.h13
-rw-r--r--ydb/core/tx/tx_proxy/proxy_impl.cpp23
-rw-r--r--ydb/core/util/queue_inplace.h197
-rw-r--r--ydb/core/util/queue_inplace_ut.cpp126
-rw-r--r--ydb/core/util/queue_oneone_inplace_ut.cpp21
-rw-r--r--ydb/library/actors/testlib/test_runtime.cpp21
-rw-r--r--ydb/library/actors/util/queue_oneone_inplace.h84
14 files changed, 381 insertions, 210 deletions
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index b080dbb9c79..eb3871877b0 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -108,8 +108,6 @@ namespace NKikimr {
std::shared_ptr<TVDiskSkeletonTrace> Trace;
ui64 InternalMessageId;
- TRecord() = default;
-
TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId,
ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId,
const NBackpressure::TQueueClientId& clientId, TString name, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
@@ -161,7 +159,7 @@ namespace NKikimr {
private:
const TString VDiskLogPrefix;
- std::unique_ptr<TQueueType, TQueueType::TCleanDestructor> Queue;
+ TQueueType Queue;
ui64 InFlightCount;
ui64 InFlightCost;
ui64 InFlightBytes;
@@ -203,7 +201,6 @@ namespace NKikimr {
ui64 maxInFlightCost,
TIntrusivePtr<::NMonitoring::TDynamicCounters> skeletonFrontGroup)
: VDiskLogPrefix(logPrefix)
- , Queue(new TQueueType())
, InFlightCount(0)
, InFlightCost(0)
, InFlightBytes(0)
@@ -231,7 +228,7 @@ namespace NKikimr {
}
ui64 GetSize() const {
- return Queue->GetSize();
+ return Queue.GetSize();
}
template<typename TFront>
@@ -240,7 +237,7 @@ namespace NKikimr {
NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& /*front*/,
const NBackpressure::TQueueClientId& clientId, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
ui64 internalMessageId) {
- if (!Queue->Head() && CanSendToSkeleton(cost)) {
+ if (!Queue.Head() && CanSendToSkeleton(cost)) {
// send to Skeleton for further processing
ctx.Send(converted.release());
++InFlightCount;
@@ -262,8 +259,8 @@ namespace NKikimr {
*SkeletonFrontDelayedBytes += recByteSize;
TInstant now = TAppData::TimeProvider->Now();
- Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
- clientId, Name, std::move(trace), internalMessageId));
+ Queue.Emplace(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
+ clientId, Name, std::move(trace), internalMessageId);
}
}
@@ -276,7 +273,7 @@ namespace NKikimr {
template <class TFront>
void ProcessNext(const TActorContext &ctx, TFront &front, bool forceError) {
// we can send next element to Skeleton if any
- while (TRecord *rec = Queue->Head()) {
+ while (TRecord *rec = Queue.Head()) {
const ui64 cost = rec->Cost;
if (CanSendToSkeleton(cost) || forceError) {
ui32 recByteSize = rec->ByteSize;
@@ -314,7 +311,7 @@ namespace NKikimr {
Msgs.emplace(rec->InternalMessageId, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace)));
UpdateState();
}
- Queue->Pop();
+ Queue.Pop();
} else {
break; // stop sending requests to skeleton
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
index c338f1a239e..772d39b9f7a 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
@@ -55,9 +55,6 @@ namespace NKikimr {
{}
~TEmergencyQueue() {
- while (Queue.Head()) {
- Queue.Pop();
- }
}
void Push(TEvBlobStorage::TEvVMovedPatch::TPtr ev) {
diff --git a/ydb/core/tablet/tablet_pipe_client.cpp b/ydb/core/tablet/tablet_pipe_client.cpp
index 879c574da4e..8eabef158af 100644
--- a/ydb/core/tablet/tablet_pipe_client.cpp
+++ b/ydb/core/tablet/tablet_pipe_client.cpp
@@ -9,7 +9,7 @@
#include <ydb/core/base/hive.h>
#include <ydb/core/base/domain.h>
#include <ydb/core/base/appdata.h>
-#include <ydb/library/actors/util/queue_oneone_inplace.h>
+#include <ydb/core/util/queue_inplace.h>
#include <library/cpp/random_provider/random_provider.h>
@@ -40,7 +40,6 @@ namespace NTabletPipe {
, TabletId(tabletId)
, Config(config)
, IsShutdown(false)
- , PayloadQueue(new TPayloadQueue())
, Leader(true)
{
Y_ABORT_UNLESS(tabletId != 0);
@@ -148,7 +147,7 @@ namespace NTabletPipe {
void HandleSendQueued(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
BLOG_D("queue send");
Y_ABORT_UNLESS(!IsShutdown);
- PayloadQueue->Push(ev.Release());
+ PayloadQueue.Push(std::move(ev));
}
void HandleSend(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
@@ -322,10 +321,11 @@ namespace NTabletPipe {
Leader, false, Generation, std::move(versionInfo)));
BLOG_D("send queued");
- while (TAutoPtr<IEventHandle> x = PayloadQueue->Pop())
+ while (TAutoPtr<IEventHandle> x = PayloadQueue.PopDefault())
Push(ctx, x);
- PayloadQueue.Destroy();
+ // Free buffer memory
+ PayloadQueue.Clear();
if (IsShutdown) {
BLOG_D("shutdown pipe due to pending shutdown request");
@@ -719,8 +719,7 @@ namespace NTabletPipe {
TActorId InterconnectProxyId;
TActorId InterconnectSessionId;
TActorId ServerId;
- typedef TOneOneQueueInplace<IEventHandle*, 32> TPayloadQueue;
- TAutoPtr<TPayloadQueue, TPayloadQueue::TPtrCleanDestructor> PayloadQueue;
+ TQueueInplace<TAutoPtr<IEventHandle>, 32> PayloadQueue;
TClientRetryState RetryState;
bool Leader;
ui64 Generation = 0;
diff --git a/ydb/core/tablet/tablet_resolver.cpp b/ydb/core/tablet/tablet_resolver.cpp
index d3eb050c22f..15d0aa9c269 100644
--- a/ydb/core/tablet/tablet_resolver.cpp
+++ b/ydb/core/tablet/tablet_resolver.cpp
@@ -11,7 +11,7 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/interconnect/interconnect.h>
#include <ydb/core/util/cache.h>
-#include <ydb/core/util/queue_oneone_inplace.h>
+#include <ydb/core/util/queue_inplace.h>
#include <util/generic/map.h>
#include <util/generic/deque.h>
#include <library/cpp/random_provider/random_provider.h>
@@ -94,17 +94,15 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
TInstant AddInstant;
TEvTabletResolver::TEvForward::TPtr Ev;
- TQueueEntry(TInstant instant, TEvTabletResolver::TEvForward::TPtr &ev)
+ TQueueEntry(TInstant instant, TEvTabletResolver::TEvForward::TPtr&& ev)
: AddInstant(instant)
- , Ev(ev)
+ , Ev(std::move(ev))
{}
};
- typedef TOneOneQueueInplace<TQueueEntry *, 64> TQueueType;
-
EState State = StInit;
- TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
+ TQueueInplace<TQueueEntry, 128> Queue;
TActorId KnownLeader;
TActorId KnownLeaderTablet;
@@ -200,9 +198,7 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
}
bool PushQueue(TEvTabletResolver::TEvForward::TPtr &ev, TEntry &entry, const TActorContext &ctx) {
- if (!entry.Queue)
- entry.Queue.Reset(new TEntry::TQueueType());
- entry.Queue->Push(new TEntry::TQueueEntry(ctx.Now(), ev));
+ entry.Queue.Emplace(ctx.Now(), std::move(ev));
return true;
}
@@ -324,14 +320,15 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
}
void SendQueued(ui64 tabletId, TEntry &entry, const TActorContext &ctx) {
- if (TEntry::TQueueType *queue = entry.Queue.Get()) {
- for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
- TEvTabletResolver::TEvForward *msg = x->Ev->Get();
- if (!SendForward(x->Ev->Sender, entry, msg, ctx))
- ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
+ while (TEntry::TQueueEntry* x = entry.Queue.Head()) {
+ TEvTabletResolver::TEvForward *msg = x->Ev->Get();
+ if (!SendForward(x->Ev->Sender, entry, msg, ctx)) {
+ ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
}
- entry.Queue.Destroy();
+ entry.Queue.Pop();
}
+ // Free buffer memory
+ entry.Queue.Clear();
}
void SendPing(ui64 tabletId, TEntry &entry, const TActorContext &ctx) {
@@ -359,10 +356,9 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
LOG_DEBUG(ctx, NKikimrServices::TABLET_RESOLVER,
"DropEntry tabletId: %" PRIu64 " followers: %" PRIu64,
tabletId, entry.KnownFollowers.size());
- if (TEntry::TQueueType *queue = entry.Queue.Get()) {
- for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
- ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
- }
+ while (TEntry::TQueueEntry* x = entry.Queue.Head()) {
+ ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
+ entry.Queue.Pop();
}
ResolvedTablets.Erase(tabletId);
UnresolvedTablets.Erase(tabletId);
@@ -840,10 +836,10 @@ public:
if (!value)
return;
- if (TEntry::TQueueType *queue = value->Queue.Get()) {
- for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
- ActorSystem->Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::RACE, key));
- }
+ auto& queue = value->Queue;
+ while (TEntry::TQueueEntry* x = queue.Head()) {
+ ActorSystem->Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::RACE, key));
+ queue.Pop();
}
});
}
diff --git a/ydb/core/tablet_flat/logic_redo_entry.h b/ydb/core/tablet_flat/logic_redo_entry.h
index ec947f04d32..c8d7741cc71 100644
--- a/ydb/core/tablet_flat/logic_redo_entry.h
+++ b/ydb/core/tablet_flat/logic_redo_entry.h
@@ -11,16 +11,16 @@ namespace NRedo {
struct TEntry {
template<typename ... Args>
- static TEntry* Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
+ static std::unique_ptr<TEntry> Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
{
- auto *ptr = malloc(sizeof(TEntry) + affects.size() * sizeof(ui32));
+ void* ptr = ::operator new(sizeof(TEntry) + affects.size() * sizeof(ui32));
- return ::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...);
+ return std::unique_ptr<TEntry>(::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...));
}
- void operator delete (void *p)
+ void operator delete(void* p)
{
- free(p);
+ ::operator delete(p);
}
void Describe(IOutputStream &out) const
diff --git a/ydb/core/tablet_flat/logic_redo_queue.h b/ydb/core/tablet_flat/logic_redo_queue.h
index 648047c560b..529dd5a6454 100644
--- a/ydb/core/tablet_flat/logic_redo_queue.h
+++ b/ydb/core/tablet_flat/logic_redo_queue.h
@@ -19,10 +19,10 @@ namespace NRedo {
struct TQueue {
using TStamp = NTable::TTxStamp;
using TAffects = TArrayRef<const ui32>;
+ using TLog = TQueueInplace<std::unique_ptr<TEntry>, 4096>;
TQueue(THashMap<ui32, NTable::TSnapEdge> edges)
- : Log(new TLog)
- , Edges(std::move(edges))
+ : Edges(std::move(edges))
{
}
@@ -45,13 +45,14 @@ namespace NRedo {
<< " (" << Memory << " mem" << ", " << LargeGlobIdsBytes << " raw)b }";
}
- void Push(TEntry *entry)
+ void Push(std::unique_ptr<TEntry>&& entryPtr)
{
+ TEntry* entry = entryPtr.get();
if (bool(entry->Embedded) == bool(entry->LargeGlobId)) {
Y_TABLET_ERROR(NFmt::Do(*entry) << " has incorrect payload");
}
- Log->Push(entry);
+ Log.Push(std::move(entryPtr));
Items++;
Memory += entry->BytesMem();
@@ -92,7 +93,7 @@ namespace NRedo {
for (auto &it : Overhead)
it.second.Clear();
- auto was = std::exchange(Log, new TLog);
+ TLog was = std::exchange(Log, TLog{});
Items = 0;
Memory = 0;
@@ -100,7 +101,7 @@ namespace NRedo {
auto logos = snap.MutableNonSnapLogBodies();
- while (TAutoPtr<TEntry> entry = was->Pop()) {
+ while (auto entry = was.PopDefault()) {
if (entry->FilterTables(Edges)) {
for (const auto& blobId : entry->LargeGlobId.Blobs()) {
LogoBlobIDFromLogoBlobID(blobId, logos->Add());
@@ -116,7 +117,7 @@ namespace NRedo {
entry->References = 0;
- Push(entry.Release());
+ Push(std::move(entry));
} else {
Y_ENSURE(entry->References == 0);
}
@@ -134,9 +135,7 @@ namespace NRedo {
return Usage;
}
- using TLog = TOneOneQueueInplace<NRedo::TEntry *, 4096>;
-
- TAutoPtr<TLog, TLog::TPtrCleanInplaceMallocDestructor> Log;
+ TLog Log;
THashMap<ui32, NTable::TSnapEdge> Edges;
THashMap<ui32, TOverhead> Overhead;
TIntrusiveList<TOverhead> Changes;
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 587ecd5927d..1b1a235adec 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -2523,7 +2523,7 @@ private:
TTxProgressIdempotentScalarQueue<TEvPrivate::TEvProgressTransaction> PlanQueue;
TTxProgressIdempotentScalarScheduleQueue<TEvPrivate::TEvCleanupTransaction> CleanupQueue;
- TTxProgressQueue<ui64, TNoOpDestroy, TEvPrivate::TEvProgressResendReadSet> ResendReadSetQueue;
+ TTxProgressQueue<ui64, TEvPrivate::TEvProgressResendReadSet> ResendReadSetQueue;
struct TPipeServerInfoOverloadSubscribersTag {};
diff --git a/ydb/core/tx/datashard/progress_queue.h b/ydb/core/tx/datashard/progress_queue.h
index 0e99651853a..a02ecb663a0 100644
--- a/ydb/core/tx/datashard/progress_queue.h
+++ b/ydb/core/tx/datashard/progress_queue.h
@@ -1,21 +1,20 @@
#pragma once
#include "defs.h"
-#include <ydb/core/util/queue_oneone_inplace.h>
+#include <ydb/core/util/queue_inplace.h>
namespace NKikimr {
-template <typename T, typename TDestruct, typename TEvent>
+template <typename T, typename TEvent>
class TTxProgressQueue {
bool HasInFly;
- TOneOneQueueInplace<T, 32> Queue;
+ TQueueInplace<T, 32> Queue;
+
public:
TTxProgressQueue()
: HasInFly(false)
{}
~TTxProgressQueue() {
- while (T head = Queue.Pop())
- TDestruct::Destroy(head);
}
void Progress(T x, const TActorContext &ctx) {
@@ -24,13 +23,13 @@ public:
ctx.Send(ctx.SelfID, new TEvent(x));
HasInFly = true;
} else {
- Queue.Push(x);
+ Queue.Push(std::move(x));
}
}
void Reset(const TActorContext &ctx) {
Y_DEBUG_ABORT_UNLESS(HasInFly);
- if (T x = Queue.Pop())
+ if (T x = Queue.PopDefault())
ctx.Send(ctx.SelfID, new TEvent(x));
else
HasInFly = false;
diff --git a/ydb/core/tx/tx_proxy/proxy_impl.cpp b/ydb/core/tx/tx_proxy/proxy_impl.cpp
index 26be5bd0577..c430fe132ae 100644
--- a/ydb/core/tx/tx_proxy/proxy_impl.cpp
+++ b/ydb/core/tx/tx_proxy/proxy_impl.cpp
@@ -12,6 +12,7 @@
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/protos/counters_tx_proxy.pb.h>
+#include <ydb/core/util/queue_inplace.h>
namespace NKikimr {
using namespace NTabletFlatExecutor;
@@ -34,12 +35,10 @@ struct TDelayedQueue {
}
};
typedef TAutoPtr<TRequest> TRequestPtr;
- typedef TOneOneQueueInplace<TRequest*, 64> TQueueType;
- typedef TAutoPtr<TQueueType, typename TQueueType::TPtrCleanDestructor> TSafeQueue;
- TSafeQueue Queue;
+ typedef TQueueInplace<TRequestPtr, 64> TQueueType;
+ TQueueType Queue;
TDelayedQueue()
- : Queue(new TQueueType())
{}
};
@@ -129,30 +128,30 @@ class TTxProxy : public TActorBootstrapped<TTxProxy> {
void DelayRequest(TEvTxUserProxy::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
auto request = new TDelayedProposal::TRequest(ev, ctx.Now() + TimeoutDelayedRequest);
- DelayedProposal.Queue->Push(request);
+ DelayedProposal.Queue.Emplace(request);
}
void DelayRequest(TEvTxUserProxy::TEvProposeKqpTransaction::TPtr &ev, const TActorContext &ctx) {
auto request = new TDelayedKqpProposal::TRequest(ev, ctx.Now() + TimeoutDelayedRequest);
- DelayedKqpProposal.Queue->Push(request);
+ DelayedKqpProposal.Queue.Emplace(request);
}
void DelayRequest(TEvTxUserProxy::TEvAllocateTxId::TPtr &ev, const TActorContext &ctx) {
auto request = new TDelayedAllocateTxId::TRequest(ev, ctx.Now() + TimeoutDelayedRequest);
- DelayedAllocateTxId.Queue->Push(request);
+ DelayedAllocateTxId.Queue.Emplace(request);
}
template<class EventType>
void PlayQueue(TDelayedQueue<EventType> &delayed, const TActorContext &ctx) {
typedef typename TDelayedQueue<EventType>::TRequestPtr TRequestPtr;
- while (delayed.Queue->Head()) {
+ while (delayed.Queue.Head()) {
TVector<ui64> txIds = TxAllocatorClient.AllocateTxIds(1, ctx);
if (!txIds) {
return;
}
- TRequestPtr extracted = delayed.Queue->Pop();
+ TRequestPtr extracted = delayed.Queue.PopDefault();
ProcessRequest(extracted->GetRequest(), ctx, txIds.front());
}
}
@@ -167,12 +166,12 @@ class TTxProxy : public TActorBootstrapped<TTxProxy> {
void CheckTimeout(TDelayedQueue<EventType> &delayed, const TActorContext &ctx) {
typedef typename TDelayedQueue<EventType>::TRequestPtr TRequestPtr;
- while (const auto head = delayed.Queue->Head()) {
- const TInstant &expireAt = head->GetExpireMoment();
+ while (const auto head = delayed.Queue.Head()) {
+ const TInstant &expireAt = (*head)->GetExpireMoment();
if (expireAt > ctx.Now()) {
break;
}
- TRequestPtr extracted = delayed.Queue->Pop();
+ TRequestPtr extracted = delayed.Queue.PopDefault();
Decline(extracted->GetRequest(), ctx);
}
}
diff --git a/ydb/core/util/queue_inplace.h b/ydb/core/util/queue_inplace.h
index 3d2cd472ffc..03aaab9f7ba 100644
--- a/ydb/core/util/queue_inplace.h
+++ b/ydb/core/util/queue_inplace.h
@@ -1,117 +1,186 @@
#pragma once
#include "defs.h"
+#include <memory>
+#include <type_traits>
template<typename T, ui32 TSize>
struct TSimpleQueueChunk {
static const ui32 EntriesCount = (TSize - sizeof(TSimpleQueueChunk*)) / sizeof(T);
static_assert(EntriesCount > 0, "expect EntriesCount > 0");
- T Entries[EntriesCount];
- TSimpleQueueChunk * volatile Next;
+ union {
+ T Entries[EntriesCount];
+ char Data[EntriesCount * sizeof(T)];
+ };
+ TSimpleQueueChunk* Next = nullptr;
- TSimpleQueueChunk() {
- }
+ TSimpleQueueChunk() {}
+ ~TSimpleQueueChunk() {}
};
-
template<typename T, ui32 TSize, typename TChunk = TSimpleQueueChunk<T, TSize>>
-class TQueueInplace : TNonCopyable {
- TChunk * ReadFrom;
+class TQueueInplace {
+ TChunk* ReadFrom;
ui32 ReadPosition;
ui32 WritePosition;
- TChunk * WriteTo;
+ TChunk* WriteTo;
size_t Size;
public:
- TQueueInplace()
- : ReadFrom(new TChunk())
+ TQueueInplace() noexcept
+ : ReadFrom(nullptr)
, ReadPosition(0)
, WritePosition(0)
- , WriteTo(ReadFrom)
+ , WriteTo(nullptr)
, Size(0)
{}
- ~TQueueInplace() {
- Y_DEBUG_ABORT_UNLESS(Head() == nullptr && Size == 0);
- delete ReadFrom;
+ TQueueInplace(TQueueInplace&& rhs) noexcept
+ : ReadFrom(rhs.ReadFrom)
+ , ReadPosition(rhs.ReadPosition)
+ , WritePosition(rhs.WritePosition)
+ , WriteTo(rhs.WriteTo)
+ , Size(rhs.Size)
+ {
+ rhs.ReadFrom = nullptr;
+ rhs.ReadPosition = 0;
+ rhs.WritePosition = 0;
+ rhs.WriteTo = nullptr;
+ rhs.Size = 0;
}
- struct TPtrCleanDestructor {
- static inline void Destroy(TQueueInplace<T, TSize> *x) noexcept {
- while (const T *head = x->Head()) {
- delete *head;
- x->Pop();
- }
- delete x;
+ TQueueInplace& operator=(TQueueInplace&& rhs) noexcept {
+ if (this != &rhs) [[likely]] {
+ Clear();
+ ReadFrom = rhs.ReadFrom;
+ ReadPosition = rhs.ReadPosition;
+ WritePosition = rhs.WritePosition;
+ WriteTo = rhs.WriteTo;
+ Size = rhs.Size;
+ rhs.ReadFrom = nullptr;
+ rhs.ReadPosition = 0;
+ rhs.WritePosition = 0;
+ rhs.WriteTo = nullptr;
+ rhs.Size = 0;
}
- };
+ return *this;
+ }
+
+ ~TQueueInplace() {
+ Clear();
+ }
- struct TCleanDestructor {
- static inline void Destroy(TQueueInplace<T, TSize> *x) noexcept {
- while (x->Head()) {
- x->Pop();
+ void Clear() noexcept {
+ TChunk* head = ReadFrom;
+ if (head) {
+ if constexpr (std::is_trivially_destructible_v<T>) {
+ do {
+ TChunk* next = head->Next;
+ delete head;
+ head = next;
+ } while (head);
+ } else {
+ ui32 start = ReadPosition;
+ do {
+ TChunk* next = head->Next;
+ ui32 end = next ? TChunk::EntriesCount : WritePosition;
+ for (ui32 index = start; index != end; ++index) {
+ std::destroy_at(&head->Entries[index]);
+ }
+ delete head;
+ head = next;
+ start = 0;
+ } while (head);
}
- delete x;
+ ReadFrom = nullptr;
+ ReadPosition = 0;
+ WritePosition = 0;
+ WriteTo = nullptr;
+ Size = 0;
}
+ }
- void operator ()(TQueueInplace<T, TSize> *x) const noexcept {
- Destroy(x);
- }
- };
+ void Push(const T& x) {
+ ::new (NewEntry()) T(x);
+ ++WritePosition;
+ ++Size;
+ }
- void Push(const T &x) noexcept {
+ void Push(T&& x) {
+ ::new (NewEntry()) T(std::move(x));
+ ++WritePosition;
++Size;
- if (WritePosition != TChunk::EntriesCount) {
- WriteTo->Entries[WritePosition] = x;
- ++WritePosition;
- } else {
- TChunk *next = new TChunk();
- next->Entries[0] = x;
- WriteTo->Next = next;
- WriteTo = next;
- WritePosition = 1;
- }
}
- void Push(T &&x) noexcept {
+ template<class... TArgs>
+ T& Emplace(TArgs&&... args) {
+ T& result = *::new (NewEntry()) T(std::forward<TArgs>(args)...);
+ ++WritePosition;
++Size;
- if (WritePosition != TChunk::EntriesCount) {
- WriteTo->Entries[WritePosition] = std::move(x);
- ++WritePosition;
- } else {
- TChunk *next = new TChunk();
- next->Entries[0] = std::move(x);
- WriteTo->Next = next;
- WriteTo = next;
- WritePosition = 1;
- }
+ return result;
}
- T *Head() {
- TChunk *head = ReadFrom;
- if (ReadFrom == WriteTo && ReadPosition == WritePosition) {
+ T* Head() noexcept {
+ TChunk* head = ReadFrom;
+ if (head == WriteTo && ReadPosition == WritePosition) {
+ // Note: this also handles ReadFrom == WriteTo == nullptr
return nullptr;
- } else if (ReadPosition != TChunk::EntriesCount) {
- return &(head->Entries[ReadPosition]);
- } else if (TChunk *next = head->Next) {
- ReadFrom = next;
+ }
+ if (ReadPosition == TChunk::EntriesCount) [[unlikely]] {
+ TChunk* next = head->Next;
+ if (!next) {
+ return nullptr;
+ }
delete head;
+ head = next;
+ ReadFrom = next;
ReadPosition = 0;
- return Head();
}
- return nullptr;
+ return &head->Entries[ReadPosition];
}
void Pop() {
- const T *ret = Head();
- if (ret) {
+ if (T* x = Head()) [[likely]] {
+ std::destroy_at(x);
+ ++ReadPosition;
+ --Size;
+ }
+ }
+
+ T PopDefault() {
+ if (T* x = Head()) [[likely]] {
+ T result(std::move(*x));
+ std::destroy_at(x);
++ReadPosition;
--Size;
+ return result;
+ } else {
+ return T{};
}
}
+ explicit operator bool() const {
+ return Size > 0;
+ }
+
size_t GetSize() const {
return Size;
}
+
+private:
+ void* NewEntry() {
+ if (WriteTo) [[likely]] {
+ if (WritePosition == TChunk::EntriesCount) [[unlikely]] {
+ TChunk* next = new TChunk;
+ WriteTo->Next = next;
+ WriteTo = next;
+ WritePosition = 0;
+ }
+ } else {
+ // Note: ReadPosition == WritePosition == 0
+ ReadFrom = WriteTo = new TChunk;
+ }
+ return &WriteTo->Entries[WritePosition];
+ }
};
diff --git a/ydb/core/util/queue_inplace_ut.cpp b/ydb/core/util/queue_inplace_ut.cpp
index fb2d511d048..2cc9d08eb91 100644
--- a/ydb/core/util/queue_inplace_ut.cpp
+++ b/ydb/core/util/queue_inplace_ut.cpp
@@ -11,16 +11,18 @@ Y_UNIT_TEST_SUITE(TQueueInplaceTests) {
struct TStruct {
ui32 X;
ui32 Y;
- TStruct(ui32 i = 0)
+ TStruct(ui32 i)
: X(i)
, Y(i)
{}
+ TStruct(TStruct&& s)
+ : X(s.X)
+ , Y(s.Y)
+ {}
+ TStruct(const TStruct&) = delete;
- TStruct &operator = (const TStruct &s) {
- X = s.X;
- Y = s.Y;
- return *this;
- }
+ TStruct& operator=(TStruct&& s) = delete;
+ TStruct& operator=(const TStruct&) = delete;
bool operator == (ui32 i) const {
return X == i && Y == i;
@@ -53,23 +55,119 @@ Y_UNIT_TEST_SUITE(TQueueInplaceTests) {
UNIT_ASSERT(queue.Head() == nullptr);
}
- Y_UNIT_TEST(CleanInDestructor) {
- using TQueueType = TQueueInplace<std::shared_ptr<bool> *, 32>;
+ Y_UNIT_TEST(DestroyInDestructor) {
+ using TQueueType = TQueueInplace<std::shared_ptr<bool>, 32>;
std::shared_ptr<bool> p(new bool(true));
UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count());
{
- TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> queue(new TQueueType());
- queue->Push(new std::shared_ptr<bool>(p));
- queue->Push(new std::shared_ptr<bool>(p));
- queue->Push(new std::shared_ptr<bool>(p));
- queue->Push(new std::shared_ptr<bool>(p));
+ TQueueType queue;
+ queue.Push(p);
+ queue.Push(p);
+ queue.Push(p);
+ queue.Push(p);
UNIT_ASSERT_VALUES_EQUAL(5u, p.use_count());
+
+ queue.Pop();
+
+ UNIT_ASSERT_VALUES_EQUAL(4u, p.use_count());
}
- UNIT_ASSERT_VALUES_EQUAL(1, p.use_count());
+ UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count());
+ }
+
+ Y_UNIT_TEST(EmplacePopDefault) {
+ using TQueueType = TQueueInplace<std::unique_ptr<int>, 32>;
+
+ TQueueType queue;
+ queue.Push(std::make_unique<int>(10));
+ queue.Emplace(new int(11));
+ queue.Emplace(new int(12));
+ queue.Emplace(std::make_unique<int>(13));
+
+ auto a = queue.PopDefault();
+ UNIT_ASSERT(a && *a == 10);
+ auto b = queue.PopDefault();
+ UNIT_ASSERT(b && *b == 11);
+ auto c = queue.PopDefault();
+ UNIT_ASSERT(c && *c == 12);
+ auto d = queue.PopDefault();
+ UNIT_ASSERT(d && *d == 13);
+ auto e = queue.PopDefault();
+ UNIT_ASSERT(!e);
+ }
+
+ Y_UNIT_TEST(PopTooManyTimes) {
+ using TQueueType = TQueueInplace<std::unique_ptr<int>, 32>;
+
+ TQueueType queue;
+ queue.Push(std::make_unique<int>(10));
+ queue.Push(std::make_unique<int>(11));
+ queue.Push(std::make_unique<int>(12));
+ queue.Push(std::make_unique<int>(13));
+ UNIT_ASSERT(queue.GetSize() == 4);
+
+ queue.Pop();
+ queue.Pop();
+ queue.Pop();
+ queue.Pop();
+ queue.Pop();
+ UNIT_ASSERT(queue.GetSize() == 0);
+ }
+
+ Y_UNIT_TEST(MoveConstructor) {
+ using TQueueType = TQueueInplace<ui64, 32>;
+
+ TQueueType a;
+ a.Push(10);
+ a.Push(11);
+ a.Push(12);
+ a.Push(13);
+ UNIT_ASSERT(a.GetSize() == 4);
+
+ TQueueType b(std::move(a));
+
+ UNIT_ASSERT(a.GetSize() == 0);
+ UNIT_ASSERT(a.PopDefault() == 0u);
+
+ UNIT_ASSERT(b.GetSize() == 4);
+ UNIT_ASSERT(b.PopDefault() == 10u);
+ UNIT_ASSERT(b.PopDefault() == 11u);
+ UNIT_ASSERT(b.PopDefault() == 12u);
+ UNIT_ASSERT(b.PopDefault() == 13u);
+ UNIT_ASSERT(b.PopDefault() == 0u);
+ }
+
+ Y_UNIT_TEST(MoveAssignment) {
+ using TQueueType = TQueueInplace<ui64, 32>;
+
+ TQueueType a;
+ a.Push(10);
+ a.Push(11);
+ a.Push(12);
+ a.Push(13);
+ UNIT_ASSERT(a.GetSize() == 4);
+
+ TQueueType b;
+ b.Push(20);
+ b.Push(21);
+ b.Push(22);
+ b.Push(23);
+ UNIT_ASSERT(b.GetSize() == 4);
+
+ a = std::move(b);
+
+ UNIT_ASSERT(a.GetSize() == 4);
+ UNIT_ASSERT(a.PopDefault() == 20u);
+ UNIT_ASSERT(a.PopDefault() == 21u);
+ UNIT_ASSERT(a.PopDefault() == 22u);
+ UNIT_ASSERT(a.PopDefault() == 23u);
+ UNIT_ASSERT(a.PopDefault() == 0u);
+
+ UNIT_ASSERT(b.GetSize() == 0);
+ UNIT_ASSERT(b.PopDefault() == 0u);
}
}
diff --git a/ydb/core/util/queue_oneone_inplace_ut.cpp b/ydb/core/util/queue_oneone_inplace_ut.cpp
index ac60a69f3dd..0ec514ecd96 100644
--- a/ydb/core/util/queue_oneone_inplace_ut.cpp
+++ b/ydb/core/util/queue_oneone_inplace_ut.cpp
@@ -52,7 +52,26 @@ Y_UNIT_TEST_SUITE(TOneOneQueueTests) {
UNIT_ASSERT_VALUES_EQUAL(5u, p.use_count());
}
- UNIT_ASSERT_VALUES_EQUAL(1, p.use_count());
+ UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count());
+ }
+
+ Y_UNIT_TEST(DeleteInDestructor) {
+ using TQueueType = TOneOneQueueInplace<std::shared_ptr<bool> *, 32, TDelete>;
+
+ std::shared_ptr<bool> p(new bool(true));
+ UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count());
+
+ {
+ TQueueType queue;
+ queue.Push(new std::shared_ptr<bool>(p));
+ queue.Push(new std::shared_ptr<bool>(p));
+ queue.Push(new std::shared_ptr<bool>(p));
+ queue.Push(new std::shared_ptr<bool>(p));
+
+ UNIT_ASSERT_VALUES_EQUAL(5u, p.use_count());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count());
}
Y_UNIT_TEST(ReadIterator) {
diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp
index b57c92e8aac..951b35910a2 100644
--- a/ydb/library/actors/testlib/test_runtime.cpp
+++ b/ydb/library/actors/testlib/test_runtime.cpp
@@ -1896,12 +1896,11 @@ namespace NActors {
struct TStrandingActorDecoratorContext : public TThrRefBase {
TStrandingActorDecoratorContext()
- : Queue(new TQueueType)
{
}
- typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType;
- TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
+ typedef TOneOneQueueInplace<IEventHandle*, 32, TDelete> TQueueType;
+ TQueueType Queue;
};
class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
@@ -1958,8 +1957,8 @@ namespace NActors {
}
STFUNC(StateFunc) {
- bool wasEmpty = !Context->Queue->Head();
- Context->Queue->Push(ev.Release());
+ bool wasEmpty = !Context->Queue.Head();
+ Context->Queue.Push(ev.Release());
if (wasEmpty) {
SendHead(ActorContext());
}
@@ -1967,15 +1966,15 @@ namespace NActors {
STFUNC(Reply) {
Y_ABORT_UNLESS(!HasReply);
- IEventHandle *requestEv = Context->Queue->Head();
+ IEventHandle *requestEv = Context->Queue.Head();
TActorId originalSender = requestEv->Sender;
HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
if (HasReply) {
- delete Context->Queue->Pop();
+ delete Context->Queue.Pop();
}
auto ctx(ActorContext());
ctx.Send(IEventHandle::Forward(ev, originalSender));
- if (!IsSync && Context->Queue->Head()) {
+ if (!IsSync && Context->Queue.Head()) {
SendHead(ctx);
}
}
@@ -1985,7 +1984,7 @@ namespace NActors {
if (!IsSync) {
ctx.Send(GetForwardedEvent().Release());
} else {
- while (Context->Queue->Head()) {
+ while (Context->Queue.Head()) {
ctx.Send(GetForwardedEvent().Release());
int count = 100;
while (!HasReply && count > 0) {
@@ -2003,7 +2002,7 @@ namespace NActors {
}
TAutoPtr<IEventHandle> GetForwardedEvent() {
- IEventHandle* ev = Context->Queue->Head();
+ IEventHandle* ev = Context->Queue.Head();
RequestType = ev->GetTypeRewrite();
HasReply = !ReplyChecker->OnRequest(ev);
TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
@@ -2011,7 +2010,7 @@ namespace NActors {
: new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
if (HasReply) {
- delete Context->Queue->Pop();
+ delete Context->Queue.Pop();
}
return forwardedEv;
}
diff --git a/ydb/library/actors/util/queue_oneone_inplace.h b/ydb/library/actors/util/queue_oneone_inplace.h
index 288011955a8..00708cad8b2 100644
--- a/ydb/library/actors/util/queue_oneone_inplace.h
+++ b/ydb/library/actors/util/queue_oneone_inplace.h
@@ -3,17 +3,15 @@
#include "defs.h"
#include "queue_chunk.h"
-template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>>
+template <typename T, ui32 TSize, typename D = TNoAction, typename TChunk = TQueueChunk<T, TSize>>
class TOneOneQueueInplace : TNonCopyable {
- static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::valuer");
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
TChunk* ReadFrom;
ui32 ReadPosition;
ui32 WritePosition;
TChunk* WriteTo;
- friend class TReadIterator;
-
public:
class TReadIterator {
TChunk* ReadFrom;
@@ -28,14 +26,15 @@ public:
inline T Next() {
TChunk* head = ReadFrom;
- if (ReadPosition != TChunk::EntriesCount) {
- return AtomicLoad(&head->Entries[ReadPosition++]);
- } else if (TChunk* next = AtomicLoad(&head->Next)) {
- ReadFrom = next;
+ if (ReadPosition == TChunk::EntriesCount) [[unlikely]] {
+ head = AtomicLoad(&head->Next);
+ if (!head) {
+ return T{};
+ }
+ ReadFrom = head;
ReadPosition = 0;
- return Next();
}
- return T{};
+ return AtomicLoad(&head->Entries[ReadPosition++]);
}
};
@@ -48,67 +47,68 @@ public:
}
~TOneOneQueueInplace() {
- Y_DEBUG_ABORT_UNLESS(Head() == 0);
- delete ReadFrom;
+ if constexpr (!std::is_same_v<D, TNoAction>) {
+ while (T x = Pop()) {
+ D::Destroy(x);
+ }
+ delete ReadFrom;
+ } else {
+ TChunk* next = ReadFrom;
+ do {
+ TChunk* head = next;
+ next = AtomicLoad(&head->Next);
+ delete head;
+ } while (next);
+ }
}
struct TPtrCleanDestructor {
- static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
- while (T head = x->Pop())
- delete head;
+ static inline void Destroy(TOneOneQueueInplace* x) noexcept {
+ while (T head = x->Pop()) {
+ ::CheckedDelete(head);
+ }
delete x;
}
};
struct TCleanDestructor {
- static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
- while (x->Pop() != nullptr)
- continue;
- delete x;
- }
- };
-
- struct TPtrCleanInplaceMallocDestructor {
- template <typename TPtrVal>
- static inline void Destroy(TOneOneQueueInplace<TPtrVal*, TSize>* x) noexcept {
- while (TPtrVal* head = x->Pop()) {
- head->~TPtrVal();
- free(head);
- }
+ static inline void Destroy(TOneOneQueueInplace* x) noexcept {
delete x;
}
};
- void Push(T x) noexcept {
- if (WritePosition != TChunk::EntriesCount) {
- AtomicStore(&WriteTo->Entries[WritePosition], x);
- ++WritePosition;
- } else {
+ void Push(T x) {
+ if (WritePosition == TChunk::EntriesCount) [[unlikely]] {
TChunk* next = new TChunk();
- next->Entries[0] = x;
+ AtomicStore(&next->Entries[0], x);
AtomicStore(&WriteTo->Next, next);
WriteTo = next;
WritePosition = 1;
+ } else {
+ AtomicStore(&WriteTo->Entries[WritePosition++], x);
}
}
T Head() {
TChunk* head = ReadFrom;
- if (ReadPosition != TChunk::EntriesCount) {
- return AtomicLoad(&head->Entries[ReadPosition]);
- } else if (TChunk* next = AtomicLoad(&head->Next)) {
- ReadFrom = next;
+ if (ReadPosition == TChunk::EntriesCount) [[unlikely]] {
+ TChunk* next = AtomicLoad(&head->Next);
+ if (!next) {
+ return T{};
+ }
delete head;
+ head = next;
+ ReadFrom = next;
ReadPosition = 0;
- return Head();
}
- return T{};
+ return AtomicLoad(&head->Entries[ReadPosition]);
}
T Pop() {
T ret = Head();
- if (ret)
+ if (ret) {
++ReadPosition;
+ }
return ret;
}