aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading
diff options
context:
space:
mode:
authornalpp <nalpp@yandex-team.ru>2022-02-10 16:46:46 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:46 +0300
commit30d1ef3941e0dc835be7609de5ebee66958f215a (patch)
tree49e222ea1c5804306084bb3ae065bb702625360f /contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading
parent87f3eb38999df2d3c1cb77f8ffb9c52ec9c516fb (diff)
downloadydb-30d1ef3941e0dc835be7609de5ebee66958f215a.tar.gz
Restoring authorship annotation for <nalpp@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading')
-rw-r--r--contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp298
-rw-r--r--contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp120
-rw-r--r--contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp72
-rw-r--r--contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp84
4 files changed, 287 insertions, 287 deletions
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp
index d63c66c884..4a3c4209c4 100644
--- a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp
+++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Executor.cpp
@@ -2,154 +2,154 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
-
-#include <aws/core/utils/threading/Executor.h>
-#include <aws/core/utils/threading/ThreadTask.h>
-#include <thread>
-#include <cassert>
-
-static const char* POOLED_CLASS_TAG = "PooledThreadExecutor";
-
-using namespace Aws::Utils::Threading;
-
-bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx)
-{
- auto main = [fx, this] {
- fx();
- Detach(std::this_thread::get_id());
- };
-
- State expected;
- do
- {
- expected = State::Free;
- if(m_state.compare_exchange_strong(expected, State::Locked))
- {
- std::thread t(main);
- const auto id = t.get_id(); // copy the id before we std::move the thread
- m_threads.emplace(id, std::move(t));
- m_state = State::Free;
- return true;
- }
+
+#include <aws/core/utils/threading/Executor.h>
+#include <aws/core/utils/threading/ThreadTask.h>
+#include <thread>
+#include <cassert>
+
+static const char* POOLED_CLASS_TAG = "PooledThreadExecutor";
+
+using namespace Aws::Utils::Threading;
+
+bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx)
+{
+ auto main = [fx, this] {
+ fx();
+ Detach(std::this_thread::get_id());
+ };
+
+ State expected;
+ do
+ {
+ expected = State::Free;
+ if(m_state.compare_exchange_strong(expected, State::Locked))
+ {
+ std::thread t(main);
+ const auto id = t.get_id(); // copy the id before we std::move the thread
+ m_threads.emplace(id, std::move(t));
+ m_state = State::Free;
+ return true;
+ }
+ }
+ while(expected != State::Shutdown);
+ return false;
+}
+
+void DefaultExecutor::Detach(std::thread::id id)
+{
+ State expected;
+ do
+ {
+ expected = State::Free;
+ if(m_state.compare_exchange_strong(expected, State::Locked))
+ {
+ auto it = m_threads.find(id);
+ assert(it != m_threads.end());
+ it->second.detach();
+ m_threads.erase(it);
+ m_state = State::Free;
+ return;
+ }
}
- while(expected != State::Shutdown);
- return false;
-}
-
-void DefaultExecutor::Detach(std::thread::id id)
-{
- State expected;
- do
- {
+ while(expected != State::Shutdown);
+}
+
+DefaultExecutor::~DefaultExecutor()
+{
+ auto expected = State::Free;
+ while(!m_state.compare_exchange_strong(expected, State::Shutdown))
+ {
+ //spin while currently detaching threads finish
+ assert(expected == State::Locked);
expected = State::Free;
- if(m_state.compare_exchange_strong(expected, State::Locked))
- {
- auto it = m_threads.find(id);
- assert(it != m_threads.end());
- it->second.detach();
- m_threads.erase(it);
- m_state = State::Free;
- return;
- }
- }
- while(expected != State::Shutdown);
-}
-
-DefaultExecutor::~DefaultExecutor()
-{
- auto expected = State::Free;
- while(!m_state.compare_exchange_strong(expected, State::Shutdown))
- {
- //spin while currently detaching threads finish
- assert(expected == State::Locked);
- expected = State::Free;
- }
-
- auto it = m_threads.begin();
- while(!m_threads.empty())
- {
- it->second.join();
- it = m_threads.erase(it);
- }
-}
-
-PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) :
- m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy)
-{
- for (size_t index = 0; index < m_poolSize; ++index)
- {
- m_threadTaskHandles.push_back(Aws::New<ThreadTask>(POOLED_CLASS_TAG, *this));
- }
-}
-
-PooledThreadExecutor::~PooledThreadExecutor()
-{
- for(auto threadTask : m_threadTaskHandles)
- {
- threadTask->StopProcessingWork();
- }
-
- m_sync.ReleaseAll();
-
- for (auto threadTask : m_threadTaskHandles)
- {
- Aws::Delete(threadTask);
- }
-
- while(m_tasks.size() > 0)
- {
- std::function<void()>* fn = m_tasks.front();
- m_tasks.pop();
-
- if(fn)
- {
- Aws::Delete(fn);
- }
- }
-
-}
-
-bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn)
-{
- //avoid the need to do copies inside the lock. Instead lets do a pointer push.
- std::function<void()>* fnCpy = Aws::New<std::function<void()>>(POOLED_CLASS_TAG, std::forward<std::function<void()>>(fn));
-
- {
- std::lock_guard<std::mutex> locker(m_queueLock);
-
- if (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize)
- {
- Aws::Delete(fnCpy);
- return false;
- }
-
- m_tasks.push(fnCpy);
- }
-
- m_sync.Release();
-
- return true;
-}
-
-std::function<void()>* PooledThreadExecutor::PopTask()
-{
- std::lock_guard<std::mutex> locker(m_queueLock);
-
- if (m_tasks.size() > 0)
- {
- std::function<void()>* fn = m_tasks.front();
- if (fn)
- {
- m_tasks.pop();
- return fn;
- }
- }
-
- return nullptr;
-}
-
-bool PooledThreadExecutor::HasTasks()
-{
- std::lock_guard<std::mutex> locker(m_queueLock);
- return m_tasks.size() > 0;
-}
+ }
+
+ auto it = m_threads.begin();
+ while(!m_threads.empty())
+ {
+ it->second.join();
+ it = m_threads.erase(it);
+ }
+}
+
+PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) :
+ m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy)
+{
+ for (size_t index = 0; index < m_poolSize; ++index)
+ {
+ m_threadTaskHandles.push_back(Aws::New<ThreadTask>(POOLED_CLASS_TAG, *this));
+ }
+}
+
+PooledThreadExecutor::~PooledThreadExecutor()
+{
+ for(auto threadTask : m_threadTaskHandles)
+ {
+ threadTask->StopProcessingWork();
+ }
+
+ m_sync.ReleaseAll();
+
+ for (auto threadTask : m_threadTaskHandles)
+ {
+ Aws::Delete(threadTask);
+ }
+
+ while(m_tasks.size() > 0)
+ {
+ std::function<void()>* fn = m_tasks.front();
+ m_tasks.pop();
+
+ if(fn)
+ {
+ Aws::Delete(fn);
+ }
+ }
+
+}
+
+bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn)
+{
+ //avoid the need to do copies inside the lock. Instead lets do a pointer push.
+ std::function<void()>* fnCpy = Aws::New<std::function<void()>>(POOLED_CLASS_TAG, std::forward<std::function<void()>>(fn));
+
+ {
+ std::lock_guard<std::mutex> locker(m_queueLock);
+
+ if (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize)
+ {
+ Aws::Delete(fnCpy);
+ return false;
+ }
+
+ m_tasks.push(fnCpy);
+ }
+
+ m_sync.Release();
+
+ return true;
+}
+
+std::function<void()>* PooledThreadExecutor::PopTask()
+{
+ std::lock_guard<std::mutex> locker(m_queueLock);
+
+ if (m_tasks.size() > 0)
+ {
+ std::function<void()>* fn = m_tasks.front();
+ if (fn)
+ {
+ m_tasks.pop();
+ return fn;
+ }
+ }
+
+ return nullptr;
+}
+
+bool PooledThreadExecutor::HasTasks()
+{
+ std::lock_guard<std::mutex> locker(m_queueLock);
+ return m_tasks.size() > 0;
+}
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp
index 9daad445d8..ddb5860563 100644
--- a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp
+++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ReaderWriterLock.cpp
@@ -1,64 +1,64 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
- */
-
-#include <aws/core/utils/threading/ReaderWriterLock.h>
-#include <cstdint>
-#include <limits>
-#include <cassert>
-
-using namespace Aws::Utils::Threading;
-
+ */
+
+#include <aws/core/utils/threading/ReaderWriterLock.h>
+#include <cstdint>
+#include <limits>
+#include <cassert>
+
+using namespace Aws::Utils::Threading;
+
static const int64_t MaxReaders = (std::numeric_limits<std::int32_t>::max)();
-
-ReaderWriterLock::ReaderWriterLock() :
- m_readers(0),
- m_holdouts(0),
- m_readerSem(0, static_cast<size_t>(MaxReaders)),
- m_writerSem(0, 1)
-{
-}
-
-void ReaderWriterLock::LockReader()
-{
- if (++m_readers < 0)
- {
- m_readerSem.WaitOne();
- }
-}
-
-void ReaderWriterLock::UnlockReader()
-{
- if (--m_readers < 0 && --m_holdouts == 0)
- {
- m_writerSem.Release();
- }
-}
-
-void ReaderWriterLock::LockWriter()
-{
- m_writerLock.lock();
- if(const auto current = m_readers.fetch_sub(MaxReaders))
- {
- assert(current > 0);
- const auto holdouts = m_holdouts.fetch_add(current) + current;
- assert(holdouts >= 0);
- if(holdouts > 0)
- {
- m_writerSem.WaitOne();
- }
- }
-}
-
-void ReaderWriterLock::UnlockWriter()
-{
- assert(m_holdouts == 0);
- const auto current = m_readers.fetch_add(MaxReaders) + MaxReaders;
- assert(current >= 0);
- for(int64_t r = 0; r < current; r++)
- {
- m_readerSem.Release();
- }
- m_writerLock.unlock();
-}
+
+ReaderWriterLock::ReaderWriterLock() :
+ m_readers(0),
+ m_holdouts(0),
+ m_readerSem(0, static_cast<size_t>(MaxReaders)),
+ m_writerSem(0, 1)
+{
+}
+
+void ReaderWriterLock::LockReader()
+{
+ if (++m_readers < 0)
+ {
+ m_readerSem.WaitOne();
+ }
+}
+
+void ReaderWriterLock::UnlockReader()
+{
+ if (--m_readers < 0 && --m_holdouts == 0)
+ {
+ m_writerSem.Release();
+ }
+}
+
+void ReaderWriterLock::LockWriter()
+{
+ m_writerLock.lock();
+ if(const auto current = m_readers.fetch_sub(MaxReaders))
+ {
+ assert(current > 0);
+ const auto holdouts = m_holdouts.fetch_add(current) + current;
+ assert(holdouts >= 0);
+ if(holdouts > 0)
+ {
+ m_writerSem.WaitOne();
+ }
+ }
+}
+
+void ReaderWriterLock::UnlockWriter()
+{
+ assert(m_holdouts == 0);
+ const auto current = m_readers.fetch_add(MaxReaders) + MaxReaders;
+ assert(current >= 0);
+ for(int64_t r = 0; r < current; r++)
+ {
+ m_readerSem.Release();
+ }
+ m_writerLock.unlock();
+}
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp
index dfc7d1a1f6..86dabc9acf 100644
--- a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp
+++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/Semaphore.cpp
@@ -1,39 +1,39 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
- */
-
-#include <aws/core/utils/threading/Semaphore.h>
-#include <algorithm>
-
-using namespace Aws::Utils::Threading;
-
-Semaphore::Semaphore(size_t initialCount, size_t maxCount)
- : m_count(initialCount), m_maxCount(maxCount)
-{
-}
-
-void Semaphore::WaitOne()
-{
- std::unique_lock<std::mutex> locker(m_mutex);
- if(0 == m_count)
- {
- m_syncPoint.wait(locker, [this] { return m_count > 0; });
- }
- --m_count;
-}
-
-void Semaphore::Release()
-{
- std::lock_guard<std::mutex> locker(m_mutex);
- m_count = (std::min)(m_maxCount, m_count + 1);
- m_syncPoint.notify_one();
-}
-
-void Semaphore::ReleaseAll()
-{
- std::lock_guard<std::mutex> locker(m_mutex);
- m_count = m_maxCount;
- m_syncPoint.notify_all();
-}
-
+ */
+
+#include <aws/core/utils/threading/Semaphore.h>
+#include <algorithm>
+
+using namespace Aws::Utils::Threading;
+
+Semaphore::Semaphore(size_t initialCount, size_t maxCount)
+ : m_count(initialCount), m_maxCount(maxCount)
+{
+}
+
+void Semaphore::WaitOne()
+{
+ std::unique_lock<std::mutex> locker(m_mutex);
+ if(0 == m_count)
+ {
+ m_syncPoint.wait(locker, [this] { return m_count > 0; });
+ }
+ --m_count;
+}
+
+void Semaphore::Release()
+{
+ std::lock_guard<std::mutex> locker(m_mutex);
+ m_count = (std::min)(m_maxCount, m_count + 1);
+ m_syncPoint.notify_one();
+}
+
+void Semaphore::ReleaseAll()
+{
+ std::lock_guard<std::mutex> locker(m_mutex);
+ m_count = m_maxCount;
+ m_syncPoint.notify_all();
+}
+
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp
index 3f32ca2583..a899fe045d 100644
--- a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp
+++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp
@@ -2,45 +2,45 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
-
-#include <aws/core/utils/threading/ThreadTask.h>
-#include <aws/core/utils/threading/Executor.h>
-
-using namespace Aws::Utils;
-using namespace Aws::Utils::Threading;
-
-ThreadTask::ThreadTask(PooledThreadExecutor& executor) : m_continue(true), m_executor(executor), m_thread(std::bind(&ThreadTask::MainTaskRunner, this))
-{
-}
-
-ThreadTask::~ThreadTask()
-{
- StopProcessingWork();
- m_thread.join();
-}
-
-void ThreadTask::MainTaskRunner()
-{
- while (m_continue)
- {
- while (m_continue && m_executor.HasTasks())
- {
- auto fn = m_executor.PopTask();
- if(fn)
- {
- (*fn)();
- Aws::Delete(fn);
- }
- }
-
- if(m_continue)
- {
- m_executor.m_sync.WaitOne();
- }
- }
-}
-
-void ThreadTask::StopProcessingWork()
-{
- m_continue = false;
-}
+
+#include <aws/core/utils/threading/ThreadTask.h>
+#include <aws/core/utils/threading/Executor.h>
+
+using namespace Aws::Utils;
+using namespace Aws::Utils::Threading;
+
+ThreadTask::ThreadTask(PooledThreadExecutor& executor) : m_continue(true), m_executor(executor), m_thread(std::bind(&ThreadTask::MainTaskRunner, this))
+{
+}
+
+ThreadTask::~ThreadTask()
+{
+ StopProcessingWork();
+ m_thread.join();
+}
+
+void ThreadTask::MainTaskRunner()
+{
+ while (m_continue)
+ {
+ while (m_continue && m_executor.HasTasks())
+ {
+ auto fn = m_executor.PopTask();
+ if(fn)
+ {
+ (*fn)();
+ Aws::Delete(fn);
+ }
+ }
+
+ if(m_continue)
+ {
+ m_executor.m_sync.WaitOne();
+ }
+ }
+}
+
+void ThreadTask::StopProcessingWork()
+{
+ m_continue = false;
+}