diff options
author | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-06-29 10:00:50 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-06-29 10:00:50 +0300 |
commit | 6ffe9e53658409f212834330e13564e4952558f6 (patch) | |
tree | 85b1e00183517648b228aafa7c8fb07f5276f419 /contrib/libs/llvm16/lib/Support/ThreadPool.cpp | |
parent | 726057070f9c5a91fc10fde0d5024913d10f1ab9 (diff) | |
download | ydb-6ffe9e53658409f212834330e13564e4952558f6.tar.gz |
YQ Connector: support managed ClickHouse
Со стороны dqrun можно обратиться к инстансу коннектора, который работает на streaming стенде, и извлечь данные из облачного CH.
Diffstat (limited to 'contrib/libs/llvm16/lib/Support/ThreadPool.cpp')
-rw-r--r-- | contrib/libs/llvm16/lib/Support/ThreadPool.cpp | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/contrib/libs/llvm16/lib/Support/ThreadPool.cpp b/contrib/libs/llvm16/lib/Support/ThreadPool.cpp new file mode 100644 index 0000000000..31461e31c6 --- /dev/null +++ b/contrib/libs/llvm16/lib/Support/ThreadPool.cpp @@ -0,0 +1,221 @@ +//==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// This file implements a crude C++11 based thread pool. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/ThreadPool.h" + +#include "llvm/Config/llvm-config.h" + +#if LLVM_ENABLE_THREADS +#include "llvm/Support/Threading.h" +#else +#include "llvm/Support/raw_ostream.h" +#endif + +using namespace llvm; + +#if LLVM_ENABLE_THREADS + +// A note on thread groups: Tasks are by default in no group (represented +// by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality +// here normally works on all tasks regardless of their group (functions +// in that case receive nullptr ThreadPoolTaskGroup pointer as argument). +// A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks +// queue, and functions called to work only on tasks from one group take that +// pointer. + +ThreadPool::ThreadPool(ThreadPoolStrategy S) + : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} + +void ThreadPool::grow(int requested) { + llvm::sys::ScopedWriter LockGuard(ThreadsLock); + if (Threads.size() >= MaxThreadCount) + return; // Already hit the max thread pool size. + int newThreadCount = std::min<int>(requested, MaxThreadCount); + while (static_cast<int>(Threads.size()) < newThreadCount) { + int ThreadID = Threads.size(); + Threads.emplace_back([this, ThreadID] { + Strategy.apply_thread_strategy(ThreadID); + processTasks(nullptr); + }); + } +} + +#ifndef NDEBUG +// The group of the tasks run by the current thread. +static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *> + *CurrentThreadTaskGroups = nullptr; +#endif + +// WaitingForGroup == nullptr means all tasks regardless of their group. +void ThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { + while (true) { + std::function<void()> Task; + ThreadPoolTaskGroup *GroupOfTask; + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + bool workCompletedForGroup = false; // Result of workCompletedUnlocked() + // Wait for tasks to be pushed in the queue + QueueCondition.wait(LockGuard, [&] { + return !EnableFlag || !Tasks.empty() || + (WaitingForGroup != nullptr && + (workCompletedForGroup = + workCompletedUnlocked(WaitingForGroup))); + }); + // Exit condition + if (!EnableFlag && Tasks.empty()) + return; + if (WaitingForGroup != nullptr && workCompletedForGroup) + return; + // Yeah, we have a task, grab it and release the lock on the queue + + // We first need to signal that we are active before popping the queue + // in order for wait() to properly detect that even if the queue is + // empty, there is still a task in flight. + ++ActiveThreads; + Task = std::move(Tasks.front().first); + GroupOfTask = Tasks.front().second; + // Need to count active threads in each group separately, ActiveThreads + // would never be 0 if waiting for another group inside a wait. + if (GroupOfTask != nullptr) + ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item + Tasks.pop_front(); + } +#ifndef NDEBUG + if (CurrentThreadTaskGroups == nullptr) + CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>; + CurrentThreadTaskGroups->push_back(GroupOfTask); +#endif + + // Run the task we just grabbed + Task(); + +#ifndef NDEBUG + CurrentThreadTaskGroups->pop_back(); + if (CurrentThreadTaskGroups->empty()) { + delete CurrentThreadTaskGroups; + CurrentThreadTaskGroups = nullptr; + } +#endif + + bool Notify; + bool NotifyGroup; + { + // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() + std::lock_guard<std::mutex> LockGuard(QueueLock); + --ActiveThreads; + if (GroupOfTask != nullptr) { + auto A = ActiveGroups.find(GroupOfTask); + if (--(A->second) == 0) + ActiveGroups.erase(A); + } + Notify = workCompletedUnlocked(GroupOfTask); + NotifyGroup = GroupOfTask != nullptr && Notify; + } + // Notify task completion if this is the last active thread, in case + // someone waits on ThreadPool::wait(). + if (Notify) + CompletionCondition.notify_all(); + // If this was a task in a group, notify also threads waiting for tasks + // in this function on QueueCondition, to make a recursive wait() return + // after the group it's been waiting for has finished. + if (NotifyGroup) + QueueCondition.notify_all(); + } +} + +bool ThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { + if (Group == nullptr) + return !ActiveThreads && Tasks.empty(); + return ActiveGroups.count(Group) == 0 && + !llvm::any_of(Tasks, + [Group](const auto &T) { return T.second == Group; }); +} + +void ThreadPool::wait() { + assert(!isWorkerThread()); // Would deadlock waiting for itself. + // Wait for all threads to complete and the queue to be empty + std::unique_lock<std::mutex> LockGuard(QueueLock); + CompletionCondition.wait(LockGuard, + [&] { return workCompletedUnlocked(nullptr); }); +} + +void ThreadPool::wait(ThreadPoolTaskGroup &Group) { + // Wait for all threads in the group to complete. + if (!isWorkerThread()) { + std::unique_lock<std::mutex> LockGuard(QueueLock); + CompletionCondition.wait(LockGuard, + [&] { return workCompletedUnlocked(&Group); }); + return; + } + // Make sure to not deadlock waiting for oneself. + assert(CurrentThreadTaskGroups == nullptr || + !llvm::is_contained(*CurrentThreadTaskGroups, &Group)); + // Handle the case of recursive call from another task in a different group, + // in which case process tasks while waiting to keep the thread busy and avoid + // possible deadlock. + processTasks(&Group); +} + +bool ThreadPool::isWorkerThread() const { + llvm::sys::ScopedReader LockGuard(ThreadsLock); + llvm::thread::id CurrentThreadId = llvm::this_thread::get_id(); + for (const llvm::thread &Thread : Threads) + if (CurrentThreadId == Thread.get_id()) + return true; + return false; +} + +// The destructor joins all threads, waiting for completion. +ThreadPool::~ThreadPool() { + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + EnableFlag = false; + } + QueueCondition.notify_all(); + llvm::sys::ScopedReader LockGuard(ThreadsLock); + for (auto &Worker : Threads) + Worker.join(); +} + +#else // LLVM_ENABLE_THREADS Disabled + +// No threads are launched, issue a warning if ThreadCount is not 0 +ThreadPool::ThreadPool(ThreadPoolStrategy S) : MaxThreadCount(1) { + int ThreadCount = S.compute_thread_count(); + if (ThreadCount != 1) { + errs() << "Warning: request a ThreadPool with " << ThreadCount + << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; + } +} + +void ThreadPool::wait() { + // Sequential implementation running the tasks + while (!Tasks.empty()) { + auto Task = std::move(Tasks.front().first); + Tasks.pop_front(); + Task(); + } +} + +void ThreadPool::wait(ThreadPoolTaskGroup &) { + // Simply wait for all, this works even if recursive (the running task + // is already removed from the queue). + wait(); +} + +bool ThreadPool::isWorkerThread() const { + report_fatal_error("LLVM compiled without multithreading"); +} + +ThreadPool::~ThreadPool() { wait(); } + +#endif |