diff options
author | nalpp <nalpp@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
commit | 30d1ef3941e0dc835be7609de5ebee66958f215a (patch) | |
tree | 49e222ea1c5804306084bb3ae065bb702625360f /contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading | |
parent | 87f3eb38999df2d3c1cb77f8ffb9c52ec9c516fb (diff) | |
download | ydb-30d1ef3941e0dc835be7609de5ebee66958f215a.tar.gz |
Restoring authorship annotation for <nalpp@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading')
4 files changed, 287 insertions, 287 deletions
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp index d63c66c884..4a3c4209c4 100644 --- a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp +++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp @@ -2,154 +2,154 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ - -#include <aws/core/utils/threading/Executor.h> -#include <aws/core/utils/threading/ThreadTask.h> -#include <thread> -#include <cassert> - -static const char* POOLED_CLASS_TAG = "PooledThreadExecutor"; - -using namespace Aws::Utils::Threading; - -bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx) -{ - auto main = [fx, this] { - fx(); - Detach(std::this_thread::get_id()); - }; - - State expected; - do - { - expected = State::Free; - if(m_state.compare_exchange_strong(expected, State::Locked)) - { - std::thread t(main); - const auto id = t.get_id(); // copy the id before we std::move the thread - m_threads.emplace(id, std::move(t)); - m_state = State::Free; - return true; - } + +#include <aws/core/utils/threading/Executor.h> +#include <aws/core/utils/threading/ThreadTask.h> +#include <thread> +#include <cassert> + +static const char* POOLED_CLASS_TAG = "PooledThreadExecutor"; + +using namespace Aws::Utils::Threading; + +bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx) +{ + auto main = [fx, this] { + fx(); + Detach(std::this_thread::get_id()); + }; + + State expected; + do + { + expected = State::Free; + if(m_state.compare_exchange_strong(expected, State::Locked)) + { + std::thread t(main); + const auto id = t.get_id(); // copy the id before we std::move the thread + m_threads.emplace(id, std::move(t)); + m_state = State::Free; + return true; + } + } + while(expected != State::Shutdown); + return false; +} + +void DefaultExecutor::Detach(std::thread::id id) +{ + State expected; + do + { + expected = State::Free; + if(m_state.compare_exchange_strong(expected, State::Locked)) + { + auto it = m_threads.find(id); + assert(it != m_threads.end()); + it->second.detach(); + m_threads.erase(it); + m_state = State::Free; + return; + } } - while(expected != State::Shutdown); - return false; -} - -void DefaultExecutor::Detach(std::thread::id id) -{ - State expected; - do - { + while(expected != State::Shutdown); +} + +DefaultExecutor::~DefaultExecutor() +{ + auto expected = State::Free; + while(!m_state.compare_exchange_strong(expected, State::Shutdown)) + { + //spin while currently detaching threads finish + assert(expected == State::Locked); expected = State::Free; - if(m_state.compare_exchange_strong(expected, State::Locked)) - { - auto it = m_threads.find(id); - assert(it != m_threads.end()); - it->second.detach(); - m_threads.erase(it); - m_state = State::Free; - return; - } - } - while(expected != State::Shutdown); -} - -DefaultExecutor::~DefaultExecutor() -{ - auto expected = State::Free; - while(!m_state.compare_exchange_strong(expected, State::Shutdown)) - { - //spin while currently detaching threads finish - assert(expected == State::Locked); - expected = State::Free; - } - - auto it = m_threads.begin(); - while(!m_threads.empty()) - { - it->second.join(); - it = m_threads.erase(it); - } -} - -PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) : - m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy) -{ - for (size_t index = 0; index < m_poolSize; ++index) - { - m_threadTaskHandles.push_back(Aws::New<ThreadTask>(POOLED_CLASS_TAG, *this)); - } -} - -PooledThreadExecutor::~PooledThreadExecutor() -{ - for(auto threadTask : m_threadTaskHandles) - { - threadTask->StopProcessingWork(); - } - - m_sync.ReleaseAll(); - - for (auto threadTask : m_threadTaskHandles) - { - Aws::Delete(threadTask); - } - - while(m_tasks.size() > 0) - { - std::function<void()>* fn = m_tasks.front(); - m_tasks.pop(); - - if(fn) - { - Aws::Delete(fn); - } - } - -} - -bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn) -{ - //avoid the need to do copies inside the lock. Instead lets do a pointer push. - std::function<void()>* fnCpy = Aws::New<std::function<void()>>(POOLED_CLASS_TAG, std::forward<std::function<void()>>(fn)); - - { - std::lock_guard<std::mutex> locker(m_queueLock); - - if (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize) - { - Aws::Delete(fnCpy); - return false; - } - - m_tasks.push(fnCpy); - } - - m_sync.Release(); - - return true; -} - -std::function<void()>* PooledThreadExecutor::PopTask() -{ - std::lock_guard<std::mutex> locker(m_queueLock); - - if (m_tasks.size() > 0) - { - std::function<void()>* fn = m_tasks.front(); - if (fn) - { - m_tasks.pop(); - return fn; - } - } - - return nullptr; -} - -bool PooledThreadExecutor::HasTasks() -{ - std::lock_guard<std::mutex> locker(m_queueLock); - return m_tasks.size() > 0; -} + } + + auto it = m_threads.begin(); + while(!m_threads.empty()) + { + it->second.join(); + it = m_threads.erase(it); + } +} + +PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) : + m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy) +{ + for (size_t index = 0; index < m_poolSize; ++index) + { + m_threadTaskHandles.push_back(Aws::New<ThreadTask>(POOLED_CLASS_TAG, *this)); + } +} + +PooledThreadExecutor::~PooledThreadExecutor() +{ + for(auto threadTask : m_threadTaskHandles) + { + threadTask->StopProcessingWork(); + } + + m_sync.ReleaseAll(); + + for (auto threadTask : m_threadTaskHandles) + { + Aws::Delete(threadTask); + } + + while(m_tasks.size() > 0) + { + std::function<void()>* fn = m_tasks.front(); + m_tasks.pop(); + + if(fn) + { + Aws::Delete(fn); + } + } + +} + +bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn) +{ + //avoid the need to do copies inside the lock. Instead lets do a pointer push. + std::function<void()>* fnCpy = Aws::New<std::function<void()>>(POOLED_CLASS_TAG, std::forward<std::function<void()>>(fn)); + + { + std::lock_guard<std::mutex> locker(m_queueLock); + + if (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize) + { + Aws::Delete(fnCpy); + return false; + } + + m_tasks.push(fnCpy); + } + + m_sync.Release(); + + return true; +} + +std::function<void()>* PooledThreadExecutor::PopTask() +{ + std::lock_guard<std::mutex> locker(m_queueLock); + + if (m_tasks.size() > 0) + { + std::function<void()>* fn = m_tasks.front(); + if (fn) + { + m_tasks.pop(); + return fn; + } + } + + return nullptr; +} + +bool PooledThreadExecutor::HasTasks() +{ + std::lock_guard<std::mutex> locker(m_queueLock); + return m_tasks.size() > 0; +} diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp index 9daad445d8..ddb5860563 100644 --- a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp +++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp @@ -1,64 +1,64 @@ /** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/core/utils/threading/ReaderWriterLock.h> -#include <cstdint> -#include <limits> -#include <cassert> - -using namespace Aws::Utils::Threading; - + */ + +#include <aws/core/utils/threading/ReaderWriterLock.h> +#include <cstdint> +#include <limits> +#include <cassert> + +using namespace Aws::Utils::Threading; + static const int64_t MaxReaders = (std::numeric_limits<std::int32_t>::max)(); - -ReaderWriterLock::ReaderWriterLock() : - m_readers(0), - m_holdouts(0), - m_readerSem(0, static_cast<size_t>(MaxReaders)), - m_writerSem(0, 1) -{ -} - -void ReaderWriterLock::LockReader() -{ - if (++m_readers < 0) - { - m_readerSem.WaitOne(); - } -} - -void ReaderWriterLock::UnlockReader() -{ - if (--m_readers < 0 && --m_holdouts == 0) - { - m_writerSem.Release(); - } -} - -void ReaderWriterLock::LockWriter() -{ - m_writerLock.lock(); - if(const auto current = m_readers.fetch_sub(MaxReaders)) - { - assert(current > 0); - const auto holdouts = m_holdouts.fetch_add(current) + current; - assert(holdouts >= 0); - if(holdouts > 0) - { - m_writerSem.WaitOne(); - } - } -} - -void ReaderWriterLock::UnlockWriter() -{ - assert(m_holdouts == 0); - const auto current = m_readers.fetch_add(MaxReaders) + MaxReaders; - assert(current >= 0); - for(int64_t r = 0; r < current; r++) - { - m_readerSem.Release(); - } - m_writerLock.unlock(); -} + +ReaderWriterLock::ReaderWriterLock() : + m_readers(0), + m_holdouts(0), + m_readerSem(0, static_cast<size_t>(MaxReaders)), + m_writerSem(0, 1) +{ +} + +void ReaderWriterLock::LockReader() +{ + if (++m_readers < 0) + { + m_readerSem.WaitOne(); + } +} + +void ReaderWriterLock::UnlockReader() +{ + if (--m_readers < 0 && --m_holdouts == 0) + { + m_writerSem.Release(); + } +} + +void ReaderWriterLock::LockWriter() +{ + m_writerLock.lock(); + if(const auto current = m_readers.fetch_sub(MaxReaders)) + { + assert(current > 0); + const auto holdouts = m_holdouts.fetch_add(current) + current; + assert(holdouts >= 0); + if(holdouts > 0) + { + m_writerSem.WaitOne(); + } + } +} + +void ReaderWriterLock::UnlockWriter() +{ + assert(m_holdouts == 0); + const auto current = m_readers.fetch_add(MaxReaders) + MaxReaders; + assert(current >= 0); + for(int64_t r = 0; r < current; r++) + { + m_readerSem.Release(); + } + m_writerLock.unlock(); +} diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp index dfc7d1a1f6..86dabc9acf 100644 --- a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp +++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp @@ -1,39 +1,39 @@ /** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/core/utils/threading/Semaphore.h> -#include <algorithm> - -using namespace Aws::Utils::Threading; - -Semaphore::Semaphore(size_t initialCount, size_t maxCount) - : m_count(initialCount), m_maxCount(maxCount) -{ -} - -void Semaphore::WaitOne() -{ - std::unique_lock<std::mutex> locker(m_mutex); - if(0 == m_count) - { - m_syncPoint.wait(locker, [this] { return m_count > 0; }); - } - --m_count; -} - -void Semaphore::Release() -{ - std::lock_guard<std::mutex> locker(m_mutex); - m_count = (std::min)(m_maxCount, m_count + 1); - m_syncPoint.notify_one(); -} - -void Semaphore::ReleaseAll() -{ - std::lock_guard<std::mutex> locker(m_mutex); - m_count = m_maxCount; - m_syncPoint.notify_all(); -} - + */ + +#include <aws/core/utils/threading/Semaphore.h> +#include <algorithm> + +using namespace Aws::Utils::Threading; + +Semaphore::Semaphore(size_t initialCount, size_t maxCount) + : m_count(initialCount), m_maxCount(maxCount) +{ +} + +void Semaphore::WaitOne() +{ + std::unique_lock<std::mutex> locker(m_mutex); + if(0 == m_count) + { + m_syncPoint.wait(locker, [this] { return m_count > 0; }); + } + --m_count; +} + +void Semaphore::Release() +{ + std::lock_guard<std::mutex> locker(m_mutex); + m_count = (std::min)(m_maxCount, m_count + 1); + m_syncPoint.notify_one(); +} + +void Semaphore::ReleaseAll() +{ + std::lock_guard<std::mutex> locker(m_mutex); + m_count = m_maxCount; + m_syncPoint.notify_all(); +} + diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp index 3f32ca2583..a899fe045d 100644 --- a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp +++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp @@ -2,45 +2,45 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ - -#include <aws/core/utils/threading/ThreadTask.h> -#include <aws/core/utils/threading/Executor.h> - -using namespace Aws::Utils; -using namespace Aws::Utils::Threading; - -ThreadTask::ThreadTask(PooledThreadExecutor& executor) : m_continue(true), m_executor(executor), m_thread(std::bind(&ThreadTask::MainTaskRunner, this)) -{ -} - -ThreadTask::~ThreadTask() -{ - StopProcessingWork(); - m_thread.join(); -} - -void ThreadTask::MainTaskRunner() -{ - while (m_continue) - { - while (m_continue && m_executor.HasTasks()) - { - auto fn = m_executor.PopTask(); - if(fn) - { - (*fn)(); - Aws::Delete(fn); - } - } - - if(m_continue) - { - m_executor.m_sync.WaitOne(); - } - } -} - -void ThreadTask::StopProcessingWork() -{ - m_continue = false; -} + +#include <aws/core/utils/threading/ThreadTask.h> +#include <aws/core/utils/threading/Executor.h> + +using namespace Aws::Utils; +using namespace Aws::Utils::Threading; + +ThreadTask::ThreadTask(PooledThreadExecutor& executor) : m_continue(true), m_executor(executor), m_thread(std::bind(&ThreadTask::MainTaskRunner, this)) +{ +} + +ThreadTask::~ThreadTask() +{ + StopProcessingWork(); + m_thread.join(); +} + +void ThreadTask::MainTaskRunner() +{ + while (m_continue) + { + while (m_continue && m_executor.HasTasks()) + { + auto fn = m_executor.PopTask(); + if(fn) + { + (*fn)(); + Aws::Delete(fn); + } + } + + if(m_continue) + { + m_executor.m_sync.WaitOne(); + } + } +} + +void ThreadTask::StopProcessingWork() +{ + m_continue = false; +} |