diff options
author | udovichenko-r <[email protected]> | 2022-08-20 16:50:23 +0300 |
---|---|---|
committer | udovichenko-r <[email protected]> | 2022-08-20 16:50:23 +0300 |
commit | 75d8b5d5e58445073b0260897a93a78269c3f43f (patch) | |
tree | 865a771729e127c1d7ff74a7a5df55dbeb8cb3b6 | |
parent | 65827f08ae1b27337d63bd10f56a660b25b28991 (diff) |
[] Proper redirect to session pool threads
-rw-r--r-- | ydb/library/yql/utils/threading/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/utils/threading/async_queue.cpp | 18 | ||||
-rw-r--r-- | ydb/library/yql/utils/threading/async_queue.h | 36 |
3 files changed, 55 insertions, 0 deletions
diff --git a/ydb/library/yql/utils/threading/CMakeLists.txt b/ydb/library/yql/utils/threading/CMakeLists.txt index 54c02aaac3a..117ec898ba2 100644 --- a/ydb/library/yql/utils/threading/CMakeLists.txt +++ b/ydb/library/yql/utils/threading/CMakeLists.txt @@ -14,4 +14,5 @@ target_link_libraries(yql-utils-threading PUBLIC ) target_sources(yql-utils-threading PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_semaphore.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_queue.cpp ) diff --git a/ydb/library/yql/utils/threading/async_queue.cpp b/ydb/library/yql/utils/threading/async_queue.cpp new file mode 100644 index 00000000000..4021636b552 --- /dev/null +++ b/ydb/library/yql/utils/threading/async_queue.cpp @@ -0,0 +1,18 @@ +#include "async_queue.h" + +namespace NYql { + +TAsyncQueue::TAsyncQueue(size_t numThreads, const TString& poolName) { + if (1 == numThreads) { + MtpQueue_.Reset(new TFakeThreadPool()); + } else { + MtpQueue_.Reset(new TSimpleThreadPool(TThreadPoolParams{poolName})); + } + MtpQueue_->Start(numThreads); +} + +TAsyncQueue::TPtr TAsyncQueue::Make(size_t numThreads, const TString& poolName) { + return new TAsyncQueue(numThreads, poolName); +} + +} diff --git a/ydb/library/yql/utils/threading/async_queue.h b/ydb/library/yql/utils/threading/async_queue.h new file mode 100644 index 00000000000..1999efc881f --- /dev/null +++ b/ydb/library/yql/utils/threading/async_queue.h @@ -0,0 +1,36 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> +#include <library/cpp/threading/future/async.h> + +#include <util/thread/pool.h> +#include <util/generic/ptr.h> +#include <util/generic/function.h> + + +namespace NYql { + +class TAsyncQueue: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<TAsyncQueue>; + + static TPtr Make(size_t numThreads, const TString& poolName); + + void Stop() { + MtpQueue_->Stop(); + } + + template <typename TCallable> + [[nodiscard]] + ::NThreading::TFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>> Async(TCallable&& func) { + return ::NThreading::Async(std::move(func), *MtpQueue_); + } + +private: + TAsyncQueue(size_t numThreads, const TString& poolName); + +private: + THolder<IThreadPool> MtpQueue_; +}; + +} // NYql |