aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/actor
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor')
-rw-r--r--library/cpp/messagebus/actor/actor.h100
-rw-r--r--library/cpp/messagebus/actor/actor_ut.cpp260
-rw-r--r--library/cpp/messagebus/actor/executor.cpp474
-rw-r--r--library/cpp/messagebus/actor/executor.h74
-rw-r--r--library/cpp/messagebus/actor/queue_for_actor.h50
-rw-r--r--library/cpp/messagebus/actor/queue_in_actor.h44
-rw-r--r--library/cpp/messagebus/actor/ring_buffer.h236
-rw-r--r--library/cpp/messagebus/actor/ring_buffer_ut.cpp100
-rw-r--r--library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h146
-rw-r--r--library/cpp/messagebus/actor/tasks.h26
-rw-r--r--library/cpp/messagebus/actor/tasks_ut.cpp66
-rw-r--r--library/cpp/messagebus/actor/temp_tls_vector.h38
-rw-r--r--library/cpp/messagebus/actor/thread_extra.cpp36
-rw-r--r--library/cpp/messagebus/actor/thread_extra.h18
-rw-r--r--library/cpp/messagebus/actor/what_thread_does.cpp20
-rw-r--r--library/cpp/messagebus/actor/what_thread_does.h56
-rw-r--r--library/cpp/messagebus/actor/what_thread_does_guard.h28
-rw-r--r--library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp16
-rw-r--r--library/cpp/messagebus/actor/ya.make20
19 files changed, 904 insertions, 904 deletions
diff --git a/library/cpp/messagebus/actor/actor.h b/library/cpp/messagebus/actor/actor.h
index 9b8f20298a..6c05d474ae 100644
--- a/library/cpp/messagebus/actor/actor.h
+++ b/library/cpp/messagebus/actor/actor.h
@@ -1,100 +1,100 @@
-#pragma once
-
+#pragma once
+
#include "executor.h"
-#include "tasks.h"
-#include "what_thread_does.h"
-
+#include "tasks.h"
+#include "what_thread_does.h"
+
#include <util/system/yassert.h>
-namespace NActor {
+namespace NActor {
class IActor: protected IWorkItem {
public:
// TODO: make private
TTasks Tasks;
-
+
public:
virtual void ScheduleHereV() = 0;
virtual void ScheduleV() = 0;
virtual void ScheduleHereAtMostOnceV() = 0;
-
+
// TODO: make private
virtual void RefV() = 0;
virtual void UnRefV() = 0;
-
+
// mute warnings
~IActor() override {
}
};
-
+
struct TDefaultTag {};
-
+
template <typename TThis, typename TTag = TDefaultTag>
class TActor: public IActor {
private:
TExecutor* const Executor;
-
+
public:
TActor(TExecutor* executor)
: Executor(executor)
{
}
-
+
void AddTaskFromActorLoop() {
bool schedule = Tasks.AddTask();
// TODO: check thread id
Y_ASSERT(!schedule);
}
-
+
/**
- * Schedule actor.
- *
- * If actor is sleeping, then actor will be executed right now.
- * If actor is executing right now, it will be executed one more time.
- * If this method is called multiple time, actor will be re-executed no more than one more time.
- */
+ * Schedule actor.
+ *
+ * If actor is sleeping, then actor will be executed right now.
+ * If actor is executing right now, it will be executed one more time.
+ * If this method is called multiple time, actor will be re-executed no more than one more time.
+ */
void Schedule() {
if (Tasks.AddTask()) {
EnqueueWork();
}
- }
-
+ }
+
/**
- * Schedule actor, execute it in current thread.
- *
- * If actor is running, continue executing where it is executing.
- * If actor is sleeping, execute it in current thread.
- *
- * Operation is useful for tasks that are likely to complete quickly.
- */
+ * Schedule actor, execute it in current thread.
+ *
+ * If actor is running, continue executing where it is executing.
+ * If actor is sleeping, execute it in current thread.
+ *
+ * Operation is useful for tasks that are likely to complete quickly.
+ */
void ScheduleHere() {
if (Tasks.AddTask()) {
Loop();
}
- }
-
+ }
+
/**
- * Schedule actor, execute in current thread no more than once.
- *
- * If actor is running, continue executing where it is executing.
- * If actor is sleeping, execute one iteration here, and if actor got new tasks,
- * reschedule it in worker pool.
- */
+ * Schedule actor, execute in current thread no more than once.
+ *
+ * If actor is running, continue executing where it is executing.
+ * If actor is sleeping, execute one iteration here, and if actor got new tasks,
+ * reschedule it in worker pool.
+ */
void ScheduleHereAtMostOnce() {
if (Tasks.AddTask()) {
bool fetched = Tasks.FetchTask();
Y_VERIFY(fetched, "happens");
-
+
DoAct();
-
+
// if someone added more tasks, schedule them
if (Tasks.FetchTask()) {
bool added = Tasks.AddTask();
Y_VERIFY(!added, "happens");
EnqueueWork();
}
- }
- }
-
+ }
+ }
+
void ScheduleHereV() override {
ScheduleHere();
}
@@ -110,35 +110,35 @@ namespace NActor {
void UnRefV() override {
GetThis()->UnRef();
}
-
+
private:
TThis* GetThis() {
return static_cast<TThis*>(this);
}
-
+
void EnqueueWork() {
GetThis()->Ref();
Executor->EnqueueWork({this});
}
-
+
void DoAct() {
WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
-
+
GetThis()->Act(TTag());
}
-
+
void Loop() {
// TODO: limit number of iterations
while (Tasks.FetchTask()) {
DoAct();
}
- }
-
+ }
+
void DoWork() override {
Y_ASSERT(GetThis()->RefCount() >= 1);
Loop();
GetThis()->UnRef();
}
};
-
+
}
diff --git a/library/cpp/messagebus/actor/actor_ut.cpp b/library/cpp/messagebus/actor/actor_ut.cpp
index b76ab55bfa..3f6d72ccdc 100644
--- a/library/cpp/messagebus/actor/actor_ut.cpp
+++ b/library/cpp/messagebus/actor/actor_ut.cpp
@@ -1,157 +1,157 @@
#include <library/cpp/testing/unittest/registar.h>
-
-#include "actor.h"
-#include "queue_in_actor.h"
-
+
+#include "actor.h"
+#include "queue_in_actor.h"
+
#include <library/cpp/messagebus/misc/test_sync.h>
#include <util/generic/object_counter.h>
#include <util/system/event.h>
-using namespace NActor;
-
-template <typename TThis>
+using namespace NActor;
+
+template <typename TThis>
struct TTestActorBase: public TAtomicRefCount<TThis>, public TActor<TThis> {
- TTestSync Started;
- TTestSync Acted;
-
- TTestActorBase(TExecutor* executor)
- : TActor<TThis>(executor)
+ TTestSync Started;
+ TTestSync Acted;
+
+ TTestActorBase(TExecutor* executor)
+ : TActor<TThis>(executor)
{
}
-
- void Act(TDefaultTag) {
- Started.Inc();
- static_cast<TThis*>(this)->Act2();
- Acted.Inc();
- }
-};
-
+
+ void Act(TDefaultTag) {
+ Started.Inc();
+ static_cast<TThis*>(this)->Act2();
+ Acted.Inc();
+ }
+};
+
struct TNopActor: public TTestActorBase<TNopActor> {
- TObjectCounter<TNopActor> AllocCounter;
-
- TNopActor(TExecutor* executor)
- : TTestActorBase<TNopActor>(executor)
+ TObjectCounter<TNopActor> AllocCounter;
+
+ TNopActor(TExecutor* executor)
+ : TTestActorBase<TNopActor>(executor)
{
}
-
- void Act2() {
- }
-};
-
+
+ void Act2() {
+ }
+};
+
struct TWaitForSignalActor: public TTestActorBase<TWaitForSignalActor> {
- TWaitForSignalActor(TExecutor* executor)
- : TTestActorBase<TWaitForSignalActor>(executor)
+ TWaitForSignalActor(TExecutor* executor)
+ : TTestActorBase<TWaitForSignalActor>(executor)
{
}
-
+
TSystemEvent WaitFor;
-
- void Act2() {
- WaitFor.Wait();
- }
-};
-
+
+ void Act2() {
+ WaitFor.Wait();
+ }
+};
+
struct TDecrementAndSendActor: public TTestActorBase<TDecrementAndSendActor>, public TQueueInActor<TDecrementAndSendActor, int> {
TSystemEvent Done;
-
- TDecrementAndSendActor* Next;
-
- TDecrementAndSendActor(TExecutor* executor)
- : TTestActorBase<TDecrementAndSendActor>(executor)
+
+ TDecrementAndSendActor* Next;
+
+ TDecrementAndSendActor(TExecutor* executor)
+ : TTestActorBase<TDecrementAndSendActor>(executor)
, Next(nullptr)
{
}
-
- void ProcessItem(TDefaultTag, TDefaultTag, int n) {
- if (n == 0) {
- Done.Signal();
- } else {
- Next->EnqueueAndSchedule(n - 1);
- }
- }
-
- void Act(TDefaultTag) {
- DequeueAll();
- }
-};
-
-struct TObjectCountChecker {
- TObjectCountChecker() {
- CheckCounts();
- }
-
- ~TObjectCountChecker() {
- CheckCounts();
- }
-
- void CheckCounts() {
- UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TNopActor>::ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TWaitForSignalActor>::ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TDecrementAndSendActor>::ObjectCount());
- }
-};
-
+
+ void ProcessItem(TDefaultTag, TDefaultTag, int n) {
+ if (n == 0) {
+ Done.Signal();
+ } else {
+ Next->EnqueueAndSchedule(n - 1);
+ }
+ }
+
+ void Act(TDefaultTag) {
+ DequeueAll();
+ }
+};
+
+struct TObjectCountChecker {
+ TObjectCountChecker() {
+ CheckCounts();
+ }
+
+ ~TObjectCountChecker() {
+ CheckCounts();
+ }
+
+ void CheckCounts() {
+ UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TNopActor>::ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TWaitForSignalActor>::ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TDecrementAndSendActor>::ObjectCount());
+ }
+};
+
Y_UNIT_TEST_SUITE(TActor) {
Y_UNIT_TEST(Simple) {
- TObjectCountChecker objectCountChecker;
-
- TExecutor executor(4);
-
- TIntrusivePtr<TNopActor> actor(new TNopActor(&executor));
-
- actor->Schedule();
-
- actor->Acted.WaitFor(1u);
- }
-
+ TObjectCountChecker objectCountChecker;
+
+ TExecutor executor(4);
+
+ TIntrusivePtr<TNopActor> actor(new TNopActor(&executor));
+
+ actor->Schedule();
+
+ actor->Acted.WaitFor(1u);
+ }
+
Y_UNIT_TEST(ScheduleAfterStart) {
- TObjectCountChecker objectCountChecker;
-
- TExecutor executor(4);
-
- TIntrusivePtr<TWaitForSignalActor> actor(new TWaitForSignalActor(&executor));
-
- actor->Schedule();
-
- actor->Started.WaitFor(1);
-
- actor->Schedule();
-
- actor->WaitFor.Signal();
-
- // make sure Act is called second time
- actor->Acted.WaitFor(2u);
- }
-
- void ComplexImpl(int queueSize, int actorCount) {
- TObjectCountChecker objectCountChecker;
-
- TExecutor executor(queueSize);
-
+ TObjectCountChecker objectCountChecker;
+
+ TExecutor executor(4);
+
+ TIntrusivePtr<TWaitForSignalActor> actor(new TWaitForSignalActor(&executor));
+
+ actor->Schedule();
+
+ actor->Started.WaitFor(1);
+
+ actor->Schedule();
+
+ actor->WaitFor.Signal();
+
+ // make sure Act is called second time
+ actor->Acted.WaitFor(2u);
+ }
+
+ void ComplexImpl(int queueSize, int actorCount) {
+ TObjectCountChecker objectCountChecker;
+
+ TExecutor executor(queueSize);
+
TVector<TIntrusivePtr<TDecrementAndSendActor>> actors;
- for (int i = 0; i < actorCount; ++i) {
- actors.push_back(new TDecrementAndSendActor(&executor));
- }
-
- for (int i = 0; i < actorCount; ++i) {
- actors.at(i)->Next = &*actors.at((i + 1) % actorCount);
- }
-
- for (int i = 0; i < actorCount; ++i) {
- actors.at(i)->EnqueueAndSchedule(10000);
- }
-
- for (int i = 0; i < actorCount; ++i) {
- actors.at(i)->Done.WaitI();
- }
- }
-
+ for (int i = 0; i < actorCount; ++i) {
+ actors.push_back(new TDecrementAndSendActor(&executor));
+ }
+
+ for (int i = 0; i < actorCount; ++i) {
+ actors.at(i)->Next = &*actors.at((i + 1) % actorCount);
+ }
+
+ for (int i = 0; i < actorCount; ++i) {
+ actors.at(i)->EnqueueAndSchedule(10000);
+ }
+
+ for (int i = 0; i < actorCount; ++i) {
+ actors.at(i)->Done.WaitI();
+ }
+ }
+
Y_UNIT_TEST(ComplexContention) {
- ComplexImpl(4, 6);
- }
-
+ ComplexImpl(4, 6);
+ }
+
Y_UNIT_TEST(ComplexNoContention) {
- ComplexImpl(6, 4);
- }
-}
+ ComplexImpl(6, 4);
+ }
+}
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp
index 7a2227a458..fbd76a86ba 100644
--- a/library/cpp/messagebus/actor/executor.cpp
+++ b/library/cpp/messagebus/actor/executor.cpp
@@ -1,95 +1,95 @@
#include "executor.h"
-
-#include "thread_extra.h"
-#include "what_thread_does.h"
-#include "what_thread_does_guard.h"
-
+
+#include "thread_extra.h"
+#include "what_thread_does.h"
+#include "what_thread_does_guard.h"
+
#include <util/generic/utility.h>
#include <util/random/random.h>
#include <util/stream/str.h>
#include <util/system/tls.h>
#include <util/system/yassert.h>
-
+
#include <array>
-using namespace NActor;
-using namespace NActor::NPrivate;
-
-namespace {
- struct THistoryInternal {
- struct TRecord {
+using namespace NActor;
+using namespace NActor::NPrivate;
+
+namespace {
+ struct THistoryInternal {
+ struct TRecord {
TAtomic MaxQueueSize;
-
+
TRecord()
: MaxQueueSize()
{
}
-
- TExecutorHistory::THistoryRecord Capture() {
- TExecutorHistory::THistoryRecord r;
+
+ TExecutorHistory::THistoryRecord Capture() {
+ TExecutorHistory::THistoryRecord r;
r.MaxQueueSize = AtomicGet(MaxQueueSize);
- return r;
- }
- };
-
- ui64 Start;
- ui64 LastTime;
-
+ return r;
+ }
+ };
+
+ ui64 Start;
+ ui64 LastTime;
+
std::array<TRecord, 3600> Records;
-
- THistoryInternal() {
- Start = TInstant::Now().Seconds();
- LastTime = Start - 1;
- }
-
- TRecord& GetRecordForTime(ui64 time) {
- return Records[time % Records.size()];
- }
-
- TRecord& GetNowRecord(ui64 now) {
- for (ui64 t = LastTime + 1; t <= now; ++t) {
- GetRecordForTime(t) = TRecord();
- }
- LastTime = now;
- return GetRecordForTime(now);
- }
-
- TExecutorHistory Capture() {
- TExecutorHistory history;
- ui64 now = TInstant::Now().Seconds();
- ui64 lastHistoryRecord = now - 1;
- ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1);
- history.HistoryRecords.resize(historySize);
- for (ui32 i = 0; i < historySize; ++i) {
- history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture();
- }
- history.LastHistoryRecordSecond = lastHistoryRecord;
- return history;
- }
- };
-
-}
-
+
+ THistoryInternal() {
+ Start = TInstant::Now().Seconds();
+ LastTime = Start - 1;
+ }
+
+ TRecord& GetRecordForTime(ui64 time) {
+ return Records[time % Records.size()];
+ }
+
+ TRecord& GetNowRecord(ui64 now) {
+ for (ui64 t = LastTime + 1; t <= now; ++t) {
+ GetRecordForTime(t) = TRecord();
+ }
+ LastTime = now;
+ return GetRecordForTime(now);
+ }
+
+ TExecutorHistory Capture() {
+ TExecutorHistory history;
+ ui64 now = TInstant::Now().Seconds();
+ ui64 lastHistoryRecord = now - 1;
+ ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1);
+ history.HistoryRecords.resize(historySize);
+ for (ui32 i = 0; i < historySize; ++i) {
+ history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture();
+ }
+ history.LastHistoryRecordSecond = lastHistoryRecord;
+ return history;
+ }
+ };
+
+}
+
Y_POD_STATIC_THREAD(TExecutor*)
ThreadCurrentExecutor;
-
-static const char* NoLocation = "nowhere";
-
-struct TExecutorWorkerThreadLocalData {
- ui32 MaxQueueSize;
-};
-
-static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData;
+
+static const char* NoLocation = "nowhere";
+
+struct TExecutorWorkerThreadLocalData {
+ ui32 MaxQueueSize;
+};
+
+static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData;
Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
WorkerThreadLocalData;
-
-namespace NActor {
+
+namespace NActor {
struct TExecutorWorker {
TExecutor* const Executor;
TThread Thread;
const char** WhatThreadDoesLocation;
TExecutorWorkerThreadLocalData* ThreadLocalData;
-
+
TExecutorWorker(TExecutor* executor)
: Executor(executor)
, Thread(RunThreadProc, this)
@@ -98,241 +98,241 @@ namespace NActor {
{
Thread.Start();
}
-
+
void Run() {
WhatThreadDoesLocation = ::WhatThreadDoesLocation();
AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
Executor->RunWorker();
}
-
+
static void* RunThreadProc(void* thiz0) {
TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
thiz->Run();
return nullptr;
}
};
-
+
struct TExecutor::TImpl {
TExecutor* const Executor;
THistoryInternal History;
-
+
TSystemEvent HelperStopSignal;
TThread HelperThread;
-
+
TImpl(TExecutor* executor)
: Executor(executor)
, HelperThread(HelperThreadProc, this)
{
}
-
+
void RunHelper() {
ui64 nowSeconds = TInstant::Now().Seconds();
for (;;) {
TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));
-
+
if (HelperStopSignal.WaitD(nextStop)) {
return;
}
-
+
nowSeconds = nextStop.Seconds();
-
+
THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);
-
+
ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
if (maxQueueSize > record.MaxQueueSize) {
AtomicSet(record.MaxQueueSize, maxQueueSize);
}
- }
- }
-
+ }
+ }
+
static void* HelperThreadProc(void* impl0) {
TImpl* impl = (TImpl*)impl0;
impl->RunHelper();
return nullptr;
}
};
-
-}
-
-static TExecutor::TConfig MakeConfig(unsigned workerCount) {
- TExecutor::TConfig config;
- config.WorkerCount = workerCount;
- return config;
-}
-
-TExecutor::TExecutor(size_t workerCount)
- : Config(MakeConfig(workerCount))
-{
- Init();
-}
-
-TExecutor::TExecutor(const TExecutor::TConfig& config)
- : Config(config)
-{
- Init();
+
}
-
-void TExecutor::Init() {
- Impl.Reset(new TImpl(this));
-
+
+static TExecutor::TConfig MakeConfig(unsigned workerCount) {
+ TExecutor::TConfig config;
+ config.WorkerCount = workerCount;
+ return config;
+}
+
+TExecutor::TExecutor(size_t workerCount)
+ : Config(MakeConfig(workerCount))
+{
+ Init();
+}
+
+TExecutor::TExecutor(const TExecutor::TConfig& config)
+ : Config(config)
+{
+ Init();
+}
+
+void TExecutor::Init() {
+ Impl.Reset(new TImpl(this));
+
AtomicSet(ExitWorkers, 0);
-
+
Y_VERIFY(Config.WorkerCount > 0);
-
- for (size_t i = 0; i < Config.WorkerCount; i++) {
- WorkerThreads.push_back(new TExecutorWorker(this));
- }
-
- Impl->HelperThread.Start();
-}
-
-TExecutor::~TExecutor() {
- Stop();
-}
-
-void TExecutor::Stop() {
+
+ for (size_t i = 0; i < Config.WorkerCount; i++) {
+ WorkerThreads.push_back(new TExecutorWorker(this));
+ }
+
+ Impl->HelperThread.Start();
+}
+
+TExecutor::~TExecutor() {
+ Stop();
+}
+
+void TExecutor::Stop() {
AtomicSet(ExitWorkers, 1);
-
- Impl->HelperStopSignal.Signal();
- Impl->HelperThread.Join();
-
- {
- TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop");
- WorkAvailable.BroadCast();
- }
-
- for (size_t i = 0; i < WorkerThreads.size(); i++) {
- WorkerThreads[i]->Thread.Join();
- }
-
- // TODO: make queue empty at this point
- ProcessWorkQueueHere();
-}
-
+
+ Impl->HelperStopSignal.Signal();
+ Impl->HelperThread.Join();
+
+ {
+ TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop");
+ WorkAvailable.BroadCast();
+ }
+
+ for (size_t i = 0; i < WorkerThreads.size(); i++) {
+ WorkerThreads[i]->Thread.Join();
+ }
+
+ // TODO: make queue empty at this point
+ ProcessWorkQueueHere();
+}
+
void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
- if (wis.empty())
- return;
-
+ if (wis.empty())
+ return;
+
if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) {
Y_VERIFY(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name);
- }
-
- TWhatThreadDoesPushPop pp("executor: EnqueueWork");
-
- WorkItems.PushAll(wis);
-
- {
- if (wis.size() == 1) {
- TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
- WorkAvailable.Signal();
- } else {
- TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
- WorkAvailable.BroadCast();
- }
- }
-}
-
+ }
+
+ TWhatThreadDoesPushPop pp("executor: EnqueueWork");
+
+ WorkItems.PushAll(wis);
+
+ {
+ if (wis.size() == 1) {
+ TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
+ WorkAvailable.Signal();
+ } else {
+ TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
+ WorkAvailable.BroadCast();
+ }
+ }
+}
+
size_t TExecutor::GetWorkQueueSize() const {
- return WorkItems.Size();
-}
-
+ return WorkItems.Size();
+}
+
using namespace NTSAN;
-ui32 TExecutor::GetMaxQueueSizeAndClear() const {
- ui32 max = 0;
- for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
+ui32 TExecutor::GetMaxQueueSizeAndClear() const {
+ ui32 max = 0;
+ for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData);
max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize));
RelaxedStore<ui32>(&wtls->MaxQueueSize, 0);
- }
- return max;
-}
-
+ }
+ return max;
+}
+
TString TExecutor::GetStatus() const {
- return GetStatusRecordInternal().Status;
-}
-
+ return GetStatusRecordInternal().Status;
+}
+
TString TExecutor::GetStatusSingleLine() const {
- TStringStream ss;
- ss << "work items: " << GetWorkQueueSize();
- return ss.Str();
-}
-
+ TStringStream ss;
+ ss << "work items: " << GetWorkQueueSize();
+ return ss.Str();
+}
+
TExecutorStatus TExecutor::GetStatusRecordInternal() const {
- TExecutorStatus r;
-
- r.WorkQueueSize = GetWorkQueueSize();
-
- {
- TStringStream ss;
- ss << "work items: " << GetWorkQueueSize() << "\n";
- ss << "workers:\n";
- for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
+ TExecutorStatus r;
+
+ r.WorkQueueSize = GetWorkQueueSize();
+
+ {
+ TStringStream ss;
+ ss << "work items: " << GetWorkQueueSize() << "\n";
+ ss << "workers:\n";
+ for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n";
- }
- r.Status = ss.Str();
- }
-
- r.History = Impl->History.Capture();
-
- return r;
-}
-
+ }
+ r.Status = ss.Str();
+ }
+
+ r.History = Impl->History.Capture();
+
+ return r;
+}
+
bool TExecutor::IsInExecutorThread() const {
- return ThreadCurrentExecutor == this;
-}
-
-TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
- IWorkItem* wi = reinterpret_cast<IWorkItem*>(1);
- size_t queueSize = Max<size_t>();
- if (!WorkItems.TryPop(&wi, &queueSize)) {
- TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork");
- while (!WorkItems.TryPop(&wi, &queueSize)) {
+ return ThreadCurrentExecutor == this;
+}
+
+TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
+ IWorkItem* wi = reinterpret_cast<IWorkItem*>(1);
+ size_t queueSize = Max<size_t>();
+ if (!WorkItems.TryPop(&wi, &queueSize)) {
+ TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork");
+ while (!WorkItems.TryPop(&wi, &queueSize)) {
if (AtomicGet(ExitWorkers) != 0)
return nullptr;
-
- TWhatThreadDoesPushPop pp("waiting for work on condvar");
- WorkAvailable.Wait(WorkMutex);
- }
- }
+
+ TWhatThreadDoesPushPop pp("waiting for work on condvar");
+ WorkAvailable.Wait(WorkMutex);
+ }
+ }
auto& wtls = TlsRef(WorkerThreadLocalData);
if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
- }
-
- return wi;
-}
-
-void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) {
- WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
- wi.Release()->DoWork();
-}
-
-void TExecutor::ProcessWorkQueueHere() {
- IWorkItem* wi;
- while (WorkItems.TryPop(&wi)) {
- RunWorkItem(wi);
- }
-}
-
-void TExecutor::RunWorker() {
+ }
+
+ return wi;
+}
+
+void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) {
+ WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
+ wi.Release()->DoWork();
+}
+
+void TExecutor::ProcessWorkQueueHere() {
+ IWorkItem* wi;
+ while (WorkItems.TryPop(&wi)) {
+ RunWorkItem(wi);
+ }
+}
+
+void TExecutor::RunWorker() {
Y_VERIFY(!ThreadCurrentExecutor, "state check");
- ThreadCurrentExecutor = this;
-
+ ThreadCurrentExecutor = this;
+
SetCurrentThreadName("wrkr");
-
- for (;;) {
- TAutoPtr<IWorkItem> wi = DequeueWork();
- if (!wi) {
- break;
- }
- // Note for messagebus users: make sure program crashes
- // on uncaught exception in thread, otherewise messagebus may just hang on error.
- RunWorkItem(wi);
- }
-
+
+ for (;;) {
+ TAutoPtr<IWorkItem> wi = DequeueWork();
+ if (!wi) {
+ break;
+ }
+ // Note for messagebus users: make sure program crashes
+ // on uncaught exception in thread, otherewise messagebus may just hang on error.
+ RunWorkItem(wi);
+ }
+
ThreadCurrentExecutor = (TExecutor*)nullptr;
-}
+}
diff --git a/library/cpp/messagebus/actor/executor.h b/library/cpp/messagebus/actor/executor.h
index 7292d8be53..2a30580ff1 100644
--- a/library/cpp/messagebus/actor/executor.h
+++ b/library/cpp/messagebus/actor/executor.h
@@ -1,16 +1,16 @@
-#pragma once
-
+#pragma once
+
#include "ring_buffer_with_spin_lock.h"
#include <util/generic/array_ref.h>
-#include <util/generic/vector.h>
-#include <util/system/condvar.h>
-#include <util/system/event.h>
+#include <util/generic/vector.h>
+#include <util/system/condvar.h>
+#include <util/system/event.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
-#include <util/thread/lfqueue.h>
-
-namespace NActor {
+#include <util/thread/lfqueue.h>
+
+namespace NActor {
namespace NPrivate {
struct TExecutorHistory {
struct THistoryRecord {
@@ -18,88 +18,88 @@ namespace NActor {
};
TVector<THistoryRecord> HistoryRecords;
ui64 LastHistoryRecordSecond;
-
+
ui64 FirstHistoryRecordSecond() const {
return LastHistoryRecordSecond - HistoryRecords.size() + 1;
}
};
-
+
struct TExecutorStatus {
size_t WorkQueueSize = 0;
TExecutorHistory History;
TString Status;
- };
+ };
}
-
+
class IWorkItem {
public:
virtual ~IWorkItem() {
- }
+ }
virtual void DoWork(/* must release this */) = 0;
- };
-
+ };
+
struct TExecutorWorker;
-
+
class TExecutor: public TAtomicRefCount<TExecutor> {
friend struct TExecutorWorker;
-
+
public:
struct TConfig {
size_t WorkerCount;
const char* Name;
-
+
TConfig()
: WorkerCount(1)
, Name()
{
}
};
-
+
private:
struct TImpl;
THolder<TImpl> Impl;
-
+
const TConfig Config;
-
+
TAtomic ExitWorkers;
-
+
TVector<TAutoPtr<TExecutorWorker>> WorkerThreads;
-
+
TRingBufferWithSpinLock<IWorkItem*> WorkItems;
-
+
TMutex WorkMutex;
TCondVar WorkAvailable;
-
+
public:
explicit TExecutor(size_t workerCount);
TExecutor(const TConfig& config);
~TExecutor();
-
+
void Stop();
-
+
void EnqueueWork(TArrayRef<IWorkItem* const> w);
-
+
size_t GetWorkQueueSize() const;
TString GetStatus() const;
TString GetStatusSingleLine() const;
NPrivate::TExecutorStatus GetStatusRecordInternal() const;
-
+
bool IsInExecutorThread() const;
-
+
private:
void Init();
-
+
TAutoPtr<IWorkItem> DequeueWork();
-
+
void ProcessWorkQueueHere();
-
+
inline void RunWorkItem(TAutoPtr<IWorkItem>);
-
+
void RunWorker();
-
+
ui32 GetMaxQueueSizeAndClear() const;
};
-
+
using TExecutorPtr = TIntrusivePtr<TExecutor>;
-
+
}
diff --git a/library/cpp/messagebus/actor/queue_for_actor.h b/library/cpp/messagebus/actor/queue_for_actor.h
index 40fa536b82..b64a2a27a2 100644
--- a/library/cpp/messagebus/actor/queue_for_actor.h
+++ b/library/cpp/messagebus/actor/queue_for_actor.h
@@ -1,65 +1,65 @@
-#pragma once
-
-#include <util/generic/vector.h>
-#include <util/system/yassert.h>
-#include <util/thread/lfstack.h>
-#include <util/thread/singleton.h>
-
-// TODO: include from correct directory
-#include "temp_tls_vector.h"
-
-namespace NActor {
+#pragma once
+
+#include <util/generic/vector.h>
+#include <util/system/yassert.h>
+#include <util/thread/lfstack.h>
+#include <util/thread/singleton.h>
+
+// TODO: include from correct directory
+#include "temp_tls_vector.h"
+
+namespace NActor {
namespace NPrivate {
struct TTagForTl {};
-
+
}
-
+
template <typename T>
class TQueueForActor {
private:
TLockFreeStack<T> Queue;
-
+
public:
~TQueueForActor() {
Y_VERIFY(Queue.IsEmpty());
}
-
+
bool IsEmpty() {
return Queue.IsEmpty();
}
-
+
void Enqueue(const T& value) {
Queue.Enqueue(value);
}
-
+
template <typename TCollection>
void EnqueueAll(const TCollection& all) {
Queue.EnqueueAll(all);
}
-
+
void Clear() {
TVector<T> tmp;
Queue.DequeueAll(&tmp);
}
-
+
template <typename TFunc>
void DequeueAll(const TFunc& func
// TODO: , std::enable_if_t<TFunctionParamCount<TFunc>::Value == 1>* = 0
) {
TTempTlsVector<T> temp;
-
+
Queue.DequeueAllSingleConsumer(temp.GetVector());
-
+
for (typename TVector<T>::reverse_iterator i = temp.GetVector()->rbegin(); i != temp.GetVector()->rend(); ++i) {
func(*i);
}
-
+
temp.Clear();
-
+
if (temp.Capacity() * sizeof(T) > 64 * 1024) {
temp.Shrink();
}
- }
+ }
template <typename TFunc>
void DequeueAllLikelyEmpty(const TFunc& func) {
@@ -70,5 +70,5 @@ namespace NActor {
DequeueAll(func);
}
};
-
+
}
diff --git a/library/cpp/messagebus/actor/queue_in_actor.h b/library/cpp/messagebus/actor/queue_in_actor.h
index 9865996532..f93eb03070 100644
--- a/library/cpp/messagebus/actor/queue_in_actor.h
+++ b/library/cpp/messagebus/actor/queue_in_actor.h
@@ -1,80 +1,80 @@
-#pragma once
-
-#include "actor.h"
-#include "queue_for_actor.h"
-
+#pragma once
+
+#include "actor.h"
+#include "queue_for_actor.h"
+
#include <functional>
-namespace NActor {
+namespace NActor {
template <typename TItem>
class IQueueInActor {
public:
virtual void EnqueueAndScheduleV(const TItem& item) = 0;
virtual void DequeueAllV() = 0;
virtual void DequeueAllLikelyEmptyV() = 0;
-
+
virtual ~IQueueInActor() {
}
};
-
+
template <typename TThis, typename TItem, typename TActorTag = TDefaultTag, typename TQueueTag = TDefaultTag>
class TQueueInActor: public IQueueInActor<TItem> {
typedef TQueueInActor<TThis, TItem, TActorTag, TQueueTag> TSelf;
-
+
public:
// TODO: make protected
TQueueForActor<TItem> QueueInActor;
-
+
private:
TActor<TThis, TActorTag>* GetActor() {
return GetThis();
}
-
+
TThis* GetThis() {
return static_cast<TThis*>(this);
}
-
+
void ProcessItem(const TItem& item) {
GetThis()->ProcessItem(TActorTag(), TQueueTag(), item);
}
-
+
public:
void EnqueueAndNoSchedule(const TItem& item) {
QueueInActor.Enqueue(item);
}
-
+
void EnqueueAndSchedule(const TItem& item) {
EnqueueAndNoSchedule(item);
GetActor()->Schedule();
}
-
+
void EnqueueAndScheduleV(const TItem& item) override {
EnqueueAndSchedule(item);
}
-
+
void Clear() {
QueueInActor.Clear();
}
-
+
void DequeueAll() {
QueueInActor.DequeueAll(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1));
}
-
+
void DequeueAllV() override {
return DequeueAll();
}
-
+
void DequeueAllLikelyEmpty() {
QueueInActor.DequeueAllLikelyEmpty(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1));
}
-
+
void DequeueAllLikelyEmptyV() override {
return DequeueAllLikelyEmpty();
}
-
+
bool IsEmpty() {
return QueueInActor.IsEmpty();
}
};
-
+
}
diff --git a/library/cpp/messagebus/actor/ring_buffer.h b/library/cpp/messagebus/actor/ring_buffer.h
index ec5706f7c7..a0e2f481bf 100644
--- a/library/cpp/messagebus/actor/ring_buffer.h
+++ b/library/cpp/messagebus/actor/ring_buffer.h
@@ -1,135 +1,135 @@
-#pragma once
-
+#pragma once
+
#include <util/generic/array_ref.h>
#include <util/generic/maybe.h>
#include <util/generic/utility.h>
#include <util/generic/vector.h>
-#include <util/system/yassert.h>
-
-template <typename T>
-struct TRingBuffer {
-private:
- ui32 CapacityPow;
- ui32 CapacityMask;
- ui32 Capacity;
- ui32 WritePos;
- ui32 ReadPos;
+#include <util/system/yassert.h>
+
+template <typename T>
+struct TRingBuffer {
+private:
+ ui32 CapacityPow;
+ ui32 CapacityMask;
+ ui32 Capacity;
+ ui32 WritePos;
+ ui32 ReadPos;
TVector<T> Data;
-
- void StateCheck() const {
+
+ void StateCheck() const {
Y_ASSERT(Capacity == Data.size());
Y_ASSERT(Capacity == (1u << CapacityPow));
Y_ASSERT((Capacity & CapacityMask) == 0u);
Y_ASSERT(Capacity - CapacityMask == 1u);
Y_ASSERT(WritePos < Capacity);
Y_ASSERT(ReadPos < Capacity);
- }
-
- size_t Writable() const {
- return (Capacity + ReadPos - WritePos - 1) & CapacityMask;
- }
-
- void ReserveWritable(ui32 sz) {
- if (sz <= Writable())
- return;
-
- ui32 newCapacityPow = CapacityPow;
+ }
+
+ size_t Writable() const {
+ return (Capacity + ReadPos - WritePos - 1) & CapacityMask;
+ }
+
+ void ReserveWritable(ui32 sz) {
+ if (sz <= Writable())
+ return;
+
+ ui32 newCapacityPow = CapacityPow;
while ((1u << newCapacityPow) < sz + ui32(Size()) + 1u) {
- ++newCapacityPow;
- }
- ui32 newCapacity = 1u << newCapacityPow;
- ui32 newCapacityMask = newCapacity - 1u;
+ ++newCapacityPow;
+ }
+ ui32 newCapacity = 1u << newCapacityPow;
+ ui32 newCapacityMask = newCapacity - 1u;
TVector<T> newData(newCapacity);
- ui32 oldSize = Size();
- // Copy old elements
- for (size_t i = 0; i < oldSize; ++i) {
- newData[i] = Get(i);
- }
-
- CapacityPow = newCapacityPow;
- Capacity = newCapacity;
- CapacityMask = newCapacityMask;
- Data.swap(newData);
- ReadPos = 0;
- WritePos = oldSize;
-
- StateCheck();
- }
-
- const T& Get(ui32 i) const {
- return Data[(ReadPos + i) & CapacityMask];
- }
-
-public:
- TRingBuffer()
- : CapacityPow(0)
- , CapacityMask(0)
- , Capacity(1 << CapacityPow)
- , WritePos(0)
- , ReadPos(0)
- , Data(Capacity)
- {
- StateCheck();
- }
-
- size_t Size() const {
- return (Capacity + WritePos - ReadPos) & CapacityMask;
- }
-
- bool Empty() const {
- return WritePos == ReadPos;
- }
-
+ ui32 oldSize = Size();
+ // Copy old elements
+ for (size_t i = 0; i < oldSize; ++i) {
+ newData[i] = Get(i);
+ }
+
+ CapacityPow = newCapacityPow;
+ Capacity = newCapacity;
+ CapacityMask = newCapacityMask;
+ Data.swap(newData);
+ ReadPos = 0;
+ WritePos = oldSize;
+
+ StateCheck();
+ }
+
+ const T& Get(ui32 i) const {
+ return Data[(ReadPos + i) & CapacityMask];
+ }
+
+public:
+ TRingBuffer()
+ : CapacityPow(0)
+ , CapacityMask(0)
+ , Capacity(1 << CapacityPow)
+ , WritePos(0)
+ , ReadPos(0)
+ , Data(Capacity)
+ {
+ StateCheck();
+ }
+
+ size_t Size() const {
+ return (Capacity + WritePos - ReadPos) & CapacityMask;
+ }
+
+ bool Empty() const {
+ return WritePos == ReadPos;
+ }
+
void PushAll(TArrayRef<const T> value) {
- ReserveWritable(value.size());
-
- ui32 secondSize;
- ui32 firstSize;
-
- if (WritePos + value.size() <= Capacity) {
- firstSize = value.size();
- secondSize = 0;
- } else {
- firstSize = Capacity - WritePos;
- secondSize = value.size() - firstSize;
- }
-
- for (size_t i = 0; i < firstSize; ++i) {
+ ReserveWritable(value.size());
+
+ ui32 secondSize;
+ ui32 firstSize;
+
+ if (WritePos + value.size() <= Capacity) {
+ firstSize = value.size();
+ secondSize = 0;
+ } else {
+ firstSize = Capacity - WritePos;
+ secondSize = value.size() - firstSize;
+ }
+
+ for (size_t i = 0; i < firstSize; ++i) {
Data[WritePos + i] = value[i];
- }
-
- for (size_t i = 0; i < secondSize; ++i) {
+ }
+
+ for (size_t i = 0; i < secondSize; ++i) {
Data[i] = value[firstSize + i];
- }
-
- WritePos = (WritePos + value.size()) & CapacityMask;
- StateCheck();
- }
-
- void Push(const T& t) {
+ }
+
+ WritePos = (WritePos + value.size()) & CapacityMask;
+ StateCheck();
+ }
+
+ void Push(const T& t) {
PushAll(MakeArrayRef(&t, 1));
- }
-
- bool TryPop(T* r) {
- StateCheck();
- if (Empty()) {
- return false;
- }
- *r = Data[ReadPos];
- ReadPos = (ReadPos + 1) & CapacityMask;
- return true;
- }
-
- TMaybe<T> TryPop() {
- T tmp;
- if (TryPop(&tmp)) {
- return tmp;
- } else {
- return TMaybe<T>();
- }
- }
-
- T Pop() {
- return *TryPop();
- }
-};
+ }
+
+ bool TryPop(T* r) {
+ StateCheck();
+ if (Empty()) {
+ return false;
+ }
+ *r = Data[ReadPos];
+ ReadPos = (ReadPos + 1) & CapacityMask;
+ return true;
+ }
+
+ TMaybe<T> TryPop() {
+ T tmp;
+ if (TryPop(&tmp)) {
+ return tmp;
+ } else {
+ return TMaybe<T>();
+ }
+ }
+
+ T Pop() {
+ return *TryPop();
+ }
+};
diff --git a/library/cpp/messagebus/actor/ring_buffer_ut.cpp b/library/cpp/messagebus/actor/ring_buffer_ut.cpp
index bdb379b3a9..9b2f46957b 100644
--- a/library/cpp/messagebus/actor/ring_buffer_ut.cpp
+++ b/library/cpp/messagebus/actor/ring_buffer_ut.cpp
@@ -1,60 +1,60 @@
#include <library/cpp/testing/unittest/registar.h>
-
-#include "ring_buffer.h"
-
+
+#include "ring_buffer.h"
+
#include <util/random/random.h>
Y_UNIT_TEST_SUITE(RingBuffer) {
- struct TRingBufferTester {
- TRingBuffer<unsigned> RingBuffer;
-
- unsigned NextPush;
- unsigned NextPop;
-
+ struct TRingBufferTester {
+ TRingBuffer<unsigned> RingBuffer;
+
+ unsigned NextPush;
+ unsigned NextPop;
+
TRingBufferTester()
: NextPush()
, NextPop()
{
}
-
- void Push() {
- //Cerr << "push " << NextPush << "\n";
- RingBuffer.Push(NextPush);
- NextPush += 1;
- }
-
- void Pop() {
- //Cerr << "pop " << NextPop << "\n";
- unsigned popped = RingBuffer.Pop();
- UNIT_ASSERT_VALUES_EQUAL(NextPop, popped);
- NextPop += 1;
- }
-
- bool Empty() const {
- UNIT_ASSERT_VALUES_EQUAL(RingBuffer.Size(), NextPush - NextPop);
- UNIT_ASSERT_VALUES_EQUAL(RingBuffer.Empty(), RingBuffer.Size() == 0);
- return RingBuffer.Empty();
- }
- };
-
- void Iter() {
- TRingBufferTester rb;
-
- while (rb.NextPush < 1000) {
- rb.Push();
- while (!rb.Empty() && RandomNumber<bool>()) {
- rb.Pop();
- }
- }
-
- while (!rb.Empty()) {
- rb.Pop();
- }
- }
-
+
+ void Push() {
+ //Cerr << "push " << NextPush << "\n";
+ RingBuffer.Push(NextPush);
+ NextPush += 1;
+ }
+
+ void Pop() {
+ //Cerr << "pop " << NextPop << "\n";
+ unsigned popped = RingBuffer.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(NextPop, popped);
+ NextPop += 1;
+ }
+
+ bool Empty() const {
+ UNIT_ASSERT_VALUES_EQUAL(RingBuffer.Size(), NextPush - NextPop);
+ UNIT_ASSERT_VALUES_EQUAL(RingBuffer.Empty(), RingBuffer.Size() == 0);
+ return RingBuffer.Empty();
+ }
+ };
+
+ void Iter() {
+ TRingBufferTester rb;
+
+ while (rb.NextPush < 1000) {
+ rb.Push();
+ while (!rb.Empty() && RandomNumber<bool>()) {
+ rb.Pop();
+ }
+ }
+
+ while (!rb.Empty()) {
+ rb.Pop();
+ }
+ }
+
Y_UNIT_TEST(Random) {
- for (unsigned i = 0; i < 100; ++i) {
- Iter();
- }
- }
-}
+ for (unsigned i = 0; i < 100; ++i) {
+ Iter();
+ }
+ }
+}
diff --git a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
index f0b7cd90e4..d05dec8577 100644
--- a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
+++ b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
@@ -1,91 +1,91 @@
-#pragma once
-
+#pragma once
+
#include "ring_buffer.h"
-#include <util/system/spinlock.h>
-
-template <typename T>
-class TRingBufferWithSpinLock {
-private:
- TRingBuffer<T> RingBuffer;
- TSpinLock SpinLock;
+#include <util/system/spinlock.h>
+
+template <typename T>
+class TRingBufferWithSpinLock {
+private:
+ TRingBuffer<T> RingBuffer;
+ TSpinLock SpinLock;
TAtomic CachedSize;
-public:
- TRingBufferWithSpinLock()
+public:
+ TRingBufferWithSpinLock()
: CachedSize(0)
{
}
-
- void Push(const T& t) {
- PushAll(t);
- }
-
+
+ void Push(const T& t) {
+ PushAll(t);
+ }
+
void PushAll(TArrayRef<const T> collection) {
- if (collection.empty()) {
- return;
- }
-
- TGuard<TSpinLock> Guard(SpinLock);
- RingBuffer.PushAll(collection);
+ if (collection.empty()) {
+ return;
+ }
+
+ TGuard<TSpinLock> Guard(SpinLock);
+ RingBuffer.PushAll(collection);
AtomicSet(CachedSize, RingBuffer.Size());
- }
-
+ }
+
bool TryPop(T* r, size_t* sizePtr = nullptr) {
if (AtomicGet(CachedSize) == 0) {
- return false;
- }
-
- bool ok;
- size_t size;
- {
- TGuard<TSpinLock> Guard(SpinLock);
- ok = RingBuffer.TryPop(r);
- size = RingBuffer.Size();
+ return false;
+ }
+
+ bool ok;
+ size_t size;
+ {
+ TGuard<TSpinLock> Guard(SpinLock);
+ ok = RingBuffer.TryPop(r);
+ size = RingBuffer.Size();
AtomicSet(CachedSize, size);
- }
- if (!!sizePtr) {
- *sizePtr = size;
- }
- return ok;
- }
-
- TMaybe<T> TryPop() {
- T tmp;
- if (TryPop(&tmp)) {
- return tmp;
- } else {
- return TMaybe<T>();
- }
- }
-
+ }
+ if (!!sizePtr) {
+ *sizePtr = size;
+ }
+ return ok;
+ }
+
+ TMaybe<T> TryPop() {
+ T tmp;
+ if (TryPop(&tmp)) {
+ return tmp;
+ } else {
+ return TMaybe<T>();
+ }
+ }
+
bool PushAllAndTryPop(TArrayRef<const T> collection, T* r) {
- if (collection.size() == 0) {
- return TryPop(r);
- } else {
+ if (collection.size() == 0) {
+ return TryPop(r);
+ } else {
if (AtomicGet(CachedSize) == 0) {
- *r = collection[0];
- if (collection.size() > 1) {
- TGuard<TSpinLock> guard(SpinLock);
+ *r = collection[0];
+ if (collection.size() > 1) {
+ TGuard<TSpinLock> guard(SpinLock);
RingBuffer.PushAll(MakeArrayRef(collection.data() + 1, collection.size() - 1));
AtomicSet(CachedSize, RingBuffer.Size());
- }
- } else {
- TGuard<TSpinLock> guard(SpinLock);
- RingBuffer.PushAll(collection);
- *r = RingBuffer.Pop();
+ }
+ } else {
+ TGuard<TSpinLock> guard(SpinLock);
+ RingBuffer.PushAll(collection);
+ *r = RingBuffer.Pop();
AtomicSet(CachedSize, RingBuffer.Size());
- }
- return true;
- }
- }
-
- bool Empty() const {
+ }
+ return true;
+ }
+ }
+
+ bool Empty() const {
return AtomicGet(CachedSize) == 0;
- }
-
- size_t Size() const {
- TGuard<TSpinLock> Guard(SpinLock);
- return RingBuffer.Size();
- }
-};
+ }
+
+ size_t Size() const {
+ TGuard<TSpinLock> Guard(SpinLock);
+ return RingBuffer.Size();
+ }
+};
diff --git a/library/cpp/messagebus/actor/tasks.h b/library/cpp/messagebus/actor/tasks.h
index 31d35931d2..3eab200b38 100644
--- a/library/cpp/messagebus/actor/tasks.h
+++ b/library/cpp/messagebus/actor/tasks.h
@@ -1,9 +1,9 @@
-#pragma once
-
+#pragma once
+
#include <util/system/atomic.h>
-#include <util/system/yassert.h>
-
-namespace NActor {
+#include <util/system/yassert.h>
+
+namespace NActor {
class TTasks {
enum {
// order of values is important
@@ -11,27 +11,27 @@ namespace NActor {
E_RUNNING_NO_TASKS,
E_RUNNING_GOT_TASKS,
};
-
+
private:
TAtomic State;
-
+
public:
TTasks()
: State(E_WAITING)
{
- }
-
+ }
+
// @return true iff caller have to either schedule task or execute it
bool AddTask() {
// High contention case optimization: AtomicGet is cheaper than AtomicSwap.
if (E_RUNNING_GOT_TASKS == AtomicGet(State)) {
return false;
}
-
+
TAtomicBase oldState = AtomicSwap(&State, E_RUNNING_GOT_TASKS);
return oldState == E_WAITING;
- }
-
+ }
+
// called by executor
// @return true iff we have to recheck queues
bool FetchTask() {
@@ -44,5 +44,5 @@ namespace NActor {
Y_FAIL("unknown");
}
};
-
+
}
diff --git a/library/cpp/messagebus/actor/tasks_ut.cpp b/library/cpp/messagebus/actor/tasks_ut.cpp
index d80e8451a5..8468109a7d 100644
--- a/library/cpp/messagebus/actor/tasks_ut.cpp
+++ b/library/cpp/messagebus/actor/tasks_ut.cpp
@@ -1,37 +1,37 @@
#include <library/cpp/testing/unittest/registar.h>
-
-#include "tasks.h"
-
-using namespace NActor;
-
+
+#include "tasks.h"
+
+using namespace NActor;
+
Y_UNIT_TEST_SUITE(TTasks) {
Y_UNIT_TEST(AddTask_FetchTask_Simple) {
- TTasks tasks;
-
- UNIT_ASSERT(tasks.AddTask());
- UNIT_ASSERT(!tasks.AddTask());
- UNIT_ASSERT(!tasks.AddTask());
-
- UNIT_ASSERT(tasks.FetchTask());
- UNIT_ASSERT(!tasks.FetchTask());
-
- UNIT_ASSERT(tasks.AddTask());
- }
-
+ TTasks tasks;
+
+ UNIT_ASSERT(tasks.AddTask());
+ UNIT_ASSERT(!tasks.AddTask());
+ UNIT_ASSERT(!tasks.AddTask());
+
+ UNIT_ASSERT(tasks.FetchTask());
+ UNIT_ASSERT(!tasks.FetchTask());
+
+ UNIT_ASSERT(tasks.AddTask());
+ }
+
Y_UNIT_TEST(AddTask_FetchTask_AddTask) {
- TTasks tasks;
-
- UNIT_ASSERT(tasks.AddTask());
- UNIT_ASSERT(!tasks.AddTask());
-
- UNIT_ASSERT(tasks.FetchTask());
- UNIT_ASSERT(!tasks.AddTask());
- UNIT_ASSERT(tasks.FetchTask());
- UNIT_ASSERT(!tasks.AddTask());
- UNIT_ASSERT(!tasks.AddTask());
- UNIT_ASSERT(tasks.FetchTask());
- UNIT_ASSERT(!tasks.FetchTask());
-
- UNIT_ASSERT(tasks.AddTask());
- }
-}
+ TTasks tasks;
+
+ UNIT_ASSERT(tasks.AddTask());
+ UNIT_ASSERT(!tasks.AddTask());
+
+ UNIT_ASSERT(tasks.FetchTask());
+ UNIT_ASSERT(!tasks.AddTask());
+ UNIT_ASSERT(tasks.FetchTask());
+ UNIT_ASSERT(!tasks.AddTask());
+ UNIT_ASSERT(!tasks.AddTask());
+ UNIT_ASSERT(tasks.FetchTask());
+ UNIT_ASSERT(!tasks.FetchTask());
+
+ UNIT_ASSERT(tasks.AddTask());
+ }
+}
diff --git a/library/cpp/messagebus/actor/temp_tls_vector.h b/library/cpp/messagebus/actor/temp_tls_vector.h
index 675d92f5b0..5c535dd07c 100644
--- a/library/cpp/messagebus/actor/temp_tls_vector.h
+++ b/library/cpp/messagebus/actor/temp_tls_vector.h
@@ -1,34 +1,34 @@
-#pragma once
-
+#pragma once
+
#include "thread_extra.h"
#include <util/generic/vector.h>
-#include <util/system/yassert.h>
-
+#include <util/system/yassert.h>
+
template <typename T, typename TTag = void, template <typename, class> class TVectorType = TVector>
-class TTempTlsVector {
-private:
- struct TTagForTls {};
-
+class TTempTlsVector {
+private:
+ struct TTagForTls {};
+
TVectorType<T, std::allocator<T>>* Vector;
-public:
+public:
TVectorType<T, std::allocator<T>>* GetVector() {
- return Vector;
- }
-
- TTempTlsVector() {
+ return Vector;
+ }
+
+ TTempTlsVector() {
Vector = FastTlsSingletonWithTag<TVectorType<T, std::allocator<T>>, TTagForTls>();
Y_ASSERT(Vector->empty());
- }
-
- ~TTempTlsVector() {
+ }
+
+ ~TTempTlsVector() {
Clear();
}
void Clear() {
- Vector->clear();
- }
+ Vector->clear();
+ }
size_t Capacity() const noexcept {
return Vector->capacity();
@@ -37,4 +37,4 @@ public:
void Shrink() {
Vector->shrink_to_fit();
}
-};
+};
diff --git a/library/cpp/messagebus/actor/thread_extra.cpp b/library/cpp/messagebus/actor/thread_extra.cpp
index 048480f255..6472dd92f4 100644
--- a/library/cpp/messagebus/actor/thread_extra.cpp
+++ b/library/cpp/messagebus/actor/thread_extra.cpp
@@ -2,29 +2,29 @@
#include <util/stream/str.h>
#include <util/system/execpath.h>
-#include <util/system/platform.h>
-#include <util/system/thread.h>
-
-namespace {
+#include <util/system/platform.h>
+#include <util/system/thread.h>
+
+namespace {
#ifdef _linux_
TString GetExecName() {
TString execPath = GetExecPath();
- size_t lastSlash = execPath.find_last_of('/');
+ size_t lastSlash = execPath.find_last_of('/');
if (lastSlash == TString::npos) {
- return execPath;
- } else {
- return execPath.substr(lastSlash + 1);
- }
- }
+ return execPath;
+ } else {
+ return execPath.substr(lastSlash + 1);
+ }
+ }
#endif
-}
-
+}
+
void SetCurrentThreadName(const char* name) {
-#ifdef _linux_
- TStringStream linuxName;
- linuxName << GetExecName() << "." << name;
+#ifdef _linux_
+ TStringStream linuxName;
+ linuxName << GetExecName() << "." << name;
TThread::SetCurrentThreadName(linuxName.Str().data());
-#else
+#else
TThread::SetCurrentThreadName(name);
-#endif
-}
+#endif
+}
diff --git a/library/cpp/messagebus/actor/thread_extra.h b/library/cpp/messagebus/actor/thread_extra.h
index b5aa151618..002b2d8d5f 100644
--- a/library/cpp/messagebus/actor/thread_extra.h
+++ b/library/cpp/messagebus/actor/thread_extra.h
@@ -1,7 +1,7 @@
-#pragma once
-
-#include <util/thread/singleton.h>
-
+#pragma once
+
+#include <util/thread/singleton.h>
+
namespace NTSAN {
template <typename T>
inline void RelaxedStore(volatile T* a, T x) {
@@ -25,7 +25,7 @@ namespace NTSAN {
}
void SetCurrentThreadName(const char* name);
-
+
namespace NThreadExtra {
namespace NPrivate {
template <typename TValue, typename TTag>
@@ -34,8 +34,8 @@ namespace NThreadExtra {
};
}
}
-
-template <typename TValue, typename TTag>
-static inline TValue* FastTlsSingletonWithTag() {
+
+template <typename TValue, typename TTag>
+static inline TValue* FastTlsSingletonWithTag() {
return &FastTlsSingleton< ::NThreadExtra::NPrivate::TValueHolder<TValue, TTag>>()->Value;
-}
+}
diff --git a/library/cpp/messagebus/actor/what_thread_does.cpp b/library/cpp/messagebus/actor/what_thread_does.cpp
index bebb6a888c..94e0c0f64f 100644
--- a/library/cpp/messagebus/actor/what_thread_does.cpp
+++ b/library/cpp/messagebus/actor/what_thread_does.cpp
@@ -1,22 +1,22 @@
#include "what_thread_does.h"
-
+
#include "thread_extra.h"
-
+
#include <util/system/tls.h>
Y_POD_STATIC_THREAD(const char*)
WhatThreadDoes;
-
+
const char* PushWhatThreadDoes(const char* what) {
const char* r = NTSAN::RelaxedLoad(&WhatThreadDoes);
NTSAN::RelaxedStore(&WhatThreadDoes, what);
- return r;
-}
-
+ return r;
+}
+
void PopWhatThreadDoes(const char* prev) {
NTSAN::RelaxedStore(&WhatThreadDoes, prev);
-}
-
+}
+
const char** WhatThreadDoesLocation() {
- return &WhatThreadDoes;
-}
+ return &WhatThreadDoes;
+}
diff --git a/library/cpp/messagebus/actor/what_thread_does.h b/library/cpp/messagebus/actor/what_thread_does.h
index 235d2c3700..325528fc55 100644
--- a/library/cpp/messagebus/actor/what_thread_does.h
+++ b/library/cpp/messagebus/actor/what_thread_does.h
@@ -1,28 +1,28 @@
-#pragma once
-
-const char* PushWhatThreadDoes(const char* what);
-void PopWhatThreadDoes(const char* prev);
-const char** WhatThreadDoesLocation();
-
-struct TWhatThreadDoesPushPop {
-private:
- const char* Prev;
-
-public:
- TWhatThreadDoesPushPop(const char* what) {
- Prev = PushWhatThreadDoes(what);
- }
-
- ~TWhatThreadDoesPushPop() {
- PopWhatThreadDoes(Prev);
- }
-};
-
-#ifdef __GNUC__
-#define WHAT_THREAD_DOES_FUNCTION __PRETTY_FUNCTION__
-#else
-#define WHAT_THREAD_DOES_FUNCTION __FUNCTION__
-#endif
-
-#define WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC() \
- TWhatThreadDoesPushPop whatThreadDoesPushPopCurrentFunc(WHAT_THREAD_DOES_FUNCTION)
+#pragma once
+
+const char* PushWhatThreadDoes(const char* what);
+void PopWhatThreadDoes(const char* prev);
+const char** WhatThreadDoesLocation();
+
+struct TWhatThreadDoesPushPop {
+private:
+ const char* Prev;
+
+public:
+ TWhatThreadDoesPushPop(const char* what) {
+ Prev = PushWhatThreadDoes(what);
+ }
+
+ ~TWhatThreadDoesPushPop() {
+ PopWhatThreadDoes(Prev);
+ }
+};
+
+#ifdef __GNUC__
+#define WHAT_THREAD_DOES_FUNCTION __PRETTY_FUNCTION__
+#else
+#define WHAT_THREAD_DOES_FUNCTION __FUNCTION__
+#endif
+
+#define WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC() \
+ TWhatThreadDoesPushPop whatThreadDoesPushPopCurrentFunc(WHAT_THREAD_DOES_FUNCTION)
diff --git a/library/cpp/messagebus/actor/what_thread_does_guard.h b/library/cpp/messagebus/actor/what_thread_does_guard.h
index f104e9e173..f0888f0a8d 100644
--- a/library/cpp/messagebus/actor/what_thread_does_guard.h
+++ b/library/cpp/messagebus/actor/what_thread_does_guard.h
@@ -1,40 +1,40 @@
-#pragma once
-
-#include "what_thread_does.h"
-
-template <class T>
-class TWhatThreadDoesAcquireGuard: public TNonCopyable {
+#pragma once
+
+#include "what_thread_does.h"
+
+template <class T>
+class TWhatThreadDoesAcquireGuard: public TNonCopyable {
public:
inline TWhatThreadDoesAcquireGuard(const T& t, const char* acquire) noexcept {
Init(&t, acquire);
}
-
+
inline TWhatThreadDoesAcquireGuard(const T* t, const char* acquire) noexcept {
Init(t, acquire);
}
-
+
inline ~TWhatThreadDoesAcquireGuard() {
Release();
}
-
+
inline void Release() noexcept {
if (WasAcquired()) {
const_cast<T*>(T_)->Release();
T_ = nullptr;
- }
+ }
}
-
+
inline bool WasAcquired() const noexcept {
return T_ != nullptr;
}
-
+
private:
inline void Init(const T* t, const char* acquire) noexcept {
T_ = const_cast<T*>(t);
TWhatThreadDoesPushPop pp(acquire);
T_->Acquire();
}
-
+
private:
T* T_;
-};
+};
diff --git a/library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp b/library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp
index e4b218a7ca..74137f8f90 100644
--- a/library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp
+++ b/library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp
@@ -1,13 +1,13 @@
#include <library/cpp/testing/unittest/registar.h>
-
-#include "what_thread_does_guard.h"
-
+
+#include "what_thread_does_guard.h"
+
#include <util/system/mutex.h>
Y_UNIT_TEST_SUITE(WhatThreadDoesGuard) {
Y_UNIT_TEST(Simple) {
- TMutex mutex;
-
- TWhatThreadDoesAcquireGuard<TMutex> guard(mutex, "acquiring my mutex");
- }
-}
+ TMutex mutex;
+
+ TWhatThreadDoesAcquireGuard<TMutex> guard(mutex, "acquiring my mutex");
+ }
+}
diff --git a/library/cpp/messagebus/actor/ya.make b/library/cpp/messagebus/actor/ya.make
index 59bd1b0b99..1ea37f5b48 100644
--- a/library/cpp/messagebus/actor/ya.make
+++ b/library/cpp/messagebus/actor/ya.make
@@ -1,11 +1,11 @@
-LIBRARY(messagebus_actor)
-
+LIBRARY(messagebus_actor)
+
OWNER(g:messagebus)
-
-SRCS(
- executor.cpp
- thread_extra.cpp
- what_thread_does.cpp
-)
-
-END()
+
+SRCS(
+ executor.cpp
+ thread_extra.cpp
+ what_thread_does.cpp
+)
+
+END()