diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-04-07 08:39:07 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-04-07 08:39:07 +0000 |
commit | 288a443b15bc64c9676fb70884e73e6aba03e108 (patch) | |
tree | 9e6a6f10fffb8273222d14c68c0d0a79f1faf394 /library/cpp/threading | |
parent | c3301110258be632f5b73a6e348d5f9d98f3eacc (diff) | |
parent | cbea66c165a9c9d142053892cac24550a030656c (diff) | |
download | ydb-288a443b15bc64c9676fb70884e73e6aba03e108.tar.gz |
Merge pull request #16825 from ydb-platform/merge-libs-250407-0050
Diffstat (limited to 'library/cpp/threading')
4 files changed, 30 insertions, 7 deletions
diff --git a/library/cpp/threading/future/future.h b/library/cpp/threading/future/future.h index 35db9abbe2..91ade3f6e5 100644 --- a/library/cpp/threading/future/future.h +++ b/library/cpp/threading/future/future.h @@ -1,4 +1,6 @@ #pragma once +// IWYU pragma: begin_exports #include "core/future.h" #include "wait/wait.h" +// IWYU pragma: end_exports diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index c1c824f67c..4eb67b034e 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -54,8 +54,12 @@ namespace NPar { // @param flags Same as for `Exec`. virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0; - // 0-based ILocalExecutor worker thread identification + // returns: + // 0 for for a thread outside the internal pool + // (because ILocalExecutor is allowed to use a calling thread to execute tasks as well), + // 1 ... GetThreadCount() for a thread inside the internal pool virtual int GetWorkerThreadId() const noexcept = 0; + virtual int GetThreadCount() const noexcept = 0; // Describes a range of tasks with parameters from integer range [FirstId, LastId). @@ -237,7 +241,6 @@ namespace NPar { int GetLPQueueSize() const noexcept; void ClearLPQueue(); - // 0-based TLocalExecutor worker thread identification int GetWorkerThreadId() const noexcept override; int GetThreadCount() const noexcept override; diff --git a/library/cpp/threading/local_executor/tbb_local_executor.cpp b/library/cpp/threading/local_executor/tbb_local_executor.cpp index 65d6659443..91a8460b0e 100644 --- a/library/cpp/threading/local_executor/tbb_local_executor.cpp +++ b/library/cpp/threading/local_executor/tbb_local_executor.cpp @@ -14,9 +14,21 @@ int NPar::TTbbLocalExecutor<RespectTls>::GetThreadCount() const noexcept { template <bool RespectTls> int NPar::TTbbLocalExecutor<RespectTls>::GetWorkerThreadId() const noexcept { - return TbbArena.execute([] { - return tbb::this_task_arena::current_thread_index(); - }); + static thread_local int WorkerThreadId = -1; + if (WorkerThreadId == -1) { + // Can't rely on return value except checking that it is 'not_initialized' because of + // "Since a thread may exit the arena at any time if it does not execute a task, the index of + // a thread may change between any two tasks" + // (https://oneapi-spec.uxlfoundation.org/specifications/oneapi/latest/elements/onetbb/source/task_scheduler/task_arena/this_task_arena_ns#_CPPv4N3tbb15this_task_arena20current_thread_indexEv) + const auto tbbThreadIndex = tbb::this_task_arena::current_thread_index(); + if (tbbThreadIndex == tbb::task_arena::not_initialized) { + // This thread does not belong to TBB worker threads + WorkerThreadId = 0; + } else { + WorkerThreadId = ++RegisteredThreadCounter; + } + } + return WorkerThreadId; } template <bool RespectTls> diff --git a/library/cpp/threading/local_executor/tbb_local_executor.h b/library/cpp/threading/local_executor/tbb_local_executor.h index 8d790db18c..f67c07349d 100644 --- a/library/cpp/threading/local_executor/tbb_local_executor.h +++ b/library/cpp/threading/local_executor/tbb_local_executor.h @@ -9,6 +9,9 @@ #include <contrib/libs/tbb/include/tbb/task_arena.h> #include <contrib/libs/tbb/include/tbb/task_group.h> +#include <atomic> + + namespace NPar { template <bool RespectTls = false> class TTbbLocalExecutor final: public ILocalExecutor { @@ -16,10 +19,11 @@ namespace NPar { TTbbLocalExecutor(int nThreads) : ILocalExecutor() , TbbArena(nThreads) - , NumberOfTbbThreads(nThreads) {} + , NumberOfTbbThreads(nThreads) + , RegisteredThreadCounter(0) + {} ~TTbbLocalExecutor() noexcept override {} - // 0-based ILocalExecutor worker thread identification virtual int GetWorkerThreadId() const noexcept override; virtual int GetThreadCount() const noexcept override; @@ -45,5 +49,7 @@ namespace NPar { mutable tbb::task_arena TbbArena; tbb::task_group Group; int NumberOfTbbThreads; + + mutable std::atomic_int RegisteredThreadCounter; }; } |