diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/actor/executor.cpp | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor/executor.cpp')
-rw-r--r-- | library/cpp/messagebus/actor/executor.cpp | 140 |
1 files changed, 70 insertions, 70 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp index 4c428754f3..7a2227a458 100644 --- a/library/cpp/messagebus/actor/executor.cpp +++ b/library/cpp/messagebus/actor/executor.cpp @@ -20,10 +20,10 @@ namespace { struct TRecord { TAtomic MaxQueueSize; - TRecord() - : MaxQueueSize() - { - } + TRecord() + : MaxQueueSize() + { + } TExecutorHistory::THistoryRecord Capture() { TExecutorHistory::THistoryRecord r; @@ -70,8 +70,8 @@ namespace { } -Y_POD_STATIC_THREAD(TExecutor*) -ThreadCurrentExecutor; +Y_POD_STATIC_THREAD(TExecutor*) +ThreadCurrentExecutor; static const char* NoLocation = "nowhere"; @@ -80,80 +80,80 @@ struct TExecutorWorkerThreadLocalData { }; static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData; -Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData) -WorkerThreadLocalData; +Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData) +WorkerThreadLocalData; namespace NActor { - struct TExecutorWorker { - TExecutor* const Executor; - TThread Thread; - const char** WhatThreadDoesLocation; - TExecutorWorkerThreadLocalData* ThreadLocalData; - - TExecutorWorker(TExecutor* executor) - : Executor(executor) - , Thread(RunThreadProc, this) - , WhatThreadDoesLocation(&NoLocation) - , ThreadLocalData(&::WorkerNoThreadLocalData) - { - Thread.Start(); - } - - void Run() { - WhatThreadDoesLocation = ::WhatThreadDoesLocation(); - AtomicSet(ThreadLocalData, &::WorkerThreadLocalData); - WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); - Executor->RunWorker(); - } - - static void* RunThreadProc(void* thiz0) { - TExecutorWorker* thiz = (TExecutorWorker*)thiz0; - thiz->Run(); - return nullptr; - } - }; - - struct TExecutor::TImpl { - TExecutor* const Executor; - THistoryInternal History; + struct TExecutorWorker { + TExecutor* const Executor; + TThread Thread; + const char** WhatThreadDoesLocation; + TExecutorWorkerThreadLocalData* ThreadLocalData; + + TExecutorWorker(TExecutor* executor) + : Executor(executor) + , Thread(RunThreadProc, this) + , WhatThreadDoesLocation(&NoLocation) + , ThreadLocalData(&::WorkerNoThreadLocalData) + { + Thread.Start(); + } + + void Run() { + WhatThreadDoesLocation = ::WhatThreadDoesLocation(); + AtomicSet(ThreadLocalData, &::WorkerThreadLocalData); + WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); + Executor->RunWorker(); + } + + static void* RunThreadProc(void* thiz0) { + TExecutorWorker* thiz = (TExecutorWorker*)thiz0; + thiz->Run(); + return nullptr; + } + }; + + struct TExecutor::TImpl { + TExecutor* const Executor; + THistoryInternal History; TSystemEvent HelperStopSignal; - TThread HelperThread; + TThread HelperThread; - TImpl(TExecutor* executor) - : Executor(executor) - , HelperThread(HelperThreadProc, this) - { - } + TImpl(TExecutor* executor) + : Executor(executor) + , HelperThread(HelperThreadProc, this) + { + } - void RunHelper() { - ui64 nowSeconds = TInstant::Now().Seconds(); - for (;;) { - TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000)); + void RunHelper() { + ui64 nowSeconds = TInstant::Now().Seconds(); + for (;;) { + TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000)); - if (HelperStopSignal.WaitD(nextStop)) { - return; - } + if (HelperStopSignal.WaitD(nextStop)) { + return; + } - nowSeconds = nextStop.Seconds(); + nowSeconds = nextStop.Seconds(); - THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds); + THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds); - ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear(); - if (maxQueueSize > record.MaxQueueSize) { - AtomicSet(record.MaxQueueSize, maxQueueSize); - } + ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear(); + if (maxQueueSize > record.MaxQueueSize) { + AtomicSet(record.MaxQueueSize, maxQueueSize); + } } } - static void* HelperThreadProc(void* impl0) { - TImpl* impl = (TImpl*)impl0; - impl->RunHelper(); - return nullptr; - } - }; + static void* HelperThreadProc(void* impl0) { + TImpl* impl = (TImpl*)impl0; + impl->RunHelper(); + return nullptr; + } + }; -} +} static TExecutor::TConfig MakeConfig(unsigned workerCount) { TExecutor::TConfig config; @@ -296,9 +296,9 @@ TAutoPtr<IWorkItem> TExecutor::DequeueWork() { WorkAvailable.Wait(WorkMutex); } } - - auto& wtls = TlsRef(WorkerThreadLocalData); - + + auto& wtls = TlsRef(WorkerThreadLocalData); + if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) { RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize); } @@ -334,5 +334,5 @@ void TExecutor::RunWorker() { RunWorkItem(wi); } - ThreadCurrentExecutor = (TExecutor*)nullptr; + ThreadCurrentExecutor = (TExecutor*)nullptr; } |