aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor/executor.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/actor/executor.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor/executor.cpp')
-rw-r--r--library/cpp/messagebus/actor/executor.cpp140
1 files changed, 70 insertions, 70 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp
index 4c428754f3..7a2227a458 100644
--- a/library/cpp/messagebus/actor/executor.cpp
+++ b/library/cpp/messagebus/actor/executor.cpp
@@ -20,10 +20,10 @@ namespace {
struct TRecord {
TAtomic MaxQueueSize;
- TRecord()
- : MaxQueueSize()
- {
- }
+ TRecord()
+ : MaxQueueSize()
+ {
+ }
TExecutorHistory::THistoryRecord Capture() {
TExecutorHistory::THistoryRecord r;
@@ -70,8 +70,8 @@ namespace {
}
-Y_POD_STATIC_THREAD(TExecutor*)
-ThreadCurrentExecutor;
+Y_POD_STATIC_THREAD(TExecutor*)
+ThreadCurrentExecutor;
static const char* NoLocation = "nowhere";
@@ -80,80 +80,80 @@ struct TExecutorWorkerThreadLocalData {
};
static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData;
-Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
-WorkerThreadLocalData;
+Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
+WorkerThreadLocalData;
namespace NActor {
- struct TExecutorWorker {
- TExecutor* const Executor;
- TThread Thread;
- const char** WhatThreadDoesLocation;
- TExecutorWorkerThreadLocalData* ThreadLocalData;
-
- TExecutorWorker(TExecutor* executor)
- : Executor(executor)
- , Thread(RunThreadProc, this)
- , WhatThreadDoesLocation(&NoLocation)
- , ThreadLocalData(&::WorkerNoThreadLocalData)
- {
- Thread.Start();
- }
-
- void Run() {
- WhatThreadDoesLocation = ::WhatThreadDoesLocation();
- AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
- WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
- Executor->RunWorker();
- }
-
- static void* RunThreadProc(void* thiz0) {
- TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
- thiz->Run();
- return nullptr;
- }
- };
-
- struct TExecutor::TImpl {
- TExecutor* const Executor;
- THistoryInternal History;
+ struct TExecutorWorker {
+ TExecutor* const Executor;
+ TThread Thread;
+ const char** WhatThreadDoesLocation;
+ TExecutorWorkerThreadLocalData* ThreadLocalData;
+
+ TExecutorWorker(TExecutor* executor)
+ : Executor(executor)
+ , Thread(RunThreadProc, this)
+ , WhatThreadDoesLocation(&NoLocation)
+ , ThreadLocalData(&::WorkerNoThreadLocalData)
+ {
+ Thread.Start();
+ }
+
+ void Run() {
+ WhatThreadDoesLocation = ::WhatThreadDoesLocation();
+ AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
+ WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
+ Executor->RunWorker();
+ }
+
+ static void* RunThreadProc(void* thiz0) {
+ TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
+ thiz->Run();
+ return nullptr;
+ }
+ };
+
+ struct TExecutor::TImpl {
+ TExecutor* const Executor;
+ THistoryInternal History;
TSystemEvent HelperStopSignal;
- TThread HelperThread;
+ TThread HelperThread;
- TImpl(TExecutor* executor)
- : Executor(executor)
- , HelperThread(HelperThreadProc, this)
- {
- }
+ TImpl(TExecutor* executor)
+ : Executor(executor)
+ , HelperThread(HelperThreadProc, this)
+ {
+ }
- void RunHelper() {
- ui64 nowSeconds = TInstant::Now().Seconds();
- for (;;) {
- TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));
+ void RunHelper() {
+ ui64 nowSeconds = TInstant::Now().Seconds();
+ for (;;) {
+ TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));
- if (HelperStopSignal.WaitD(nextStop)) {
- return;
- }
+ if (HelperStopSignal.WaitD(nextStop)) {
+ return;
+ }
- nowSeconds = nextStop.Seconds();
+ nowSeconds = nextStop.Seconds();
- THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);
+ THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);
- ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
- if (maxQueueSize > record.MaxQueueSize) {
- AtomicSet(record.MaxQueueSize, maxQueueSize);
- }
+ ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
+ if (maxQueueSize > record.MaxQueueSize) {
+ AtomicSet(record.MaxQueueSize, maxQueueSize);
+ }
}
}
- static void* HelperThreadProc(void* impl0) {
- TImpl* impl = (TImpl*)impl0;
- impl->RunHelper();
- return nullptr;
- }
- };
+ static void* HelperThreadProc(void* impl0) {
+ TImpl* impl = (TImpl*)impl0;
+ impl->RunHelper();
+ return nullptr;
+ }
+ };
-}
+}
static TExecutor::TConfig MakeConfig(unsigned workerCount) {
TExecutor::TConfig config;
@@ -296,9 +296,9 @@ TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
WorkAvailable.Wait(WorkMutex);
}
}
-
- auto& wtls = TlsRef(WorkerThreadLocalData);
-
+
+ auto& wtls = TlsRef(WorkerThreadLocalData);
+
if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
}
@@ -334,5 +334,5 @@ void TExecutor::RunWorker() {
RunWorkItem(wi);
}
- ThreadCurrentExecutor = (TExecutor*)nullptr;
+ ThreadCurrentExecutor = (TExecutor*)nullptr;
}