diff options
| author | udovichenko-r <[email protected]> | 2022-08-20 16:50:23 +0300 | 
|---|---|---|
| committer | udovichenko-r <[email protected]> | 2022-08-20 16:50:23 +0300 | 
| commit | 75d8b5d5e58445073b0260897a93a78269c3f43f (patch) | |
| tree | 865a771729e127c1d7ff74a7a5df55dbeb8cb3b6 | |
| parent | 65827f08ae1b27337d63bd10f56a660b25b28991 (diff) | |
[] Proper redirect to session pool threads
| -rw-r--r-- | ydb/library/yql/utils/threading/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | ydb/library/yql/utils/threading/async_queue.cpp | 18 | ||||
| -rw-r--r-- | ydb/library/yql/utils/threading/async_queue.h | 36 | 
3 files changed, 55 insertions, 0 deletions
| diff --git a/ydb/library/yql/utils/threading/CMakeLists.txt b/ydb/library/yql/utils/threading/CMakeLists.txt index 54c02aaac3a..117ec898ba2 100644 --- a/ydb/library/yql/utils/threading/CMakeLists.txt +++ b/ydb/library/yql/utils/threading/CMakeLists.txt @@ -14,4 +14,5 @@ target_link_libraries(yql-utils-threading PUBLIC  )  target_sources(yql-utils-threading PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_semaphore.cpp +  ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_queue.cpp  ) diff --git a/ydb/library/yql/utils/threading/async_queue.cpp b/ydb/library/yql/utils/threading/async_queue.cpp new file mode 100644 index 00000000000..4021636b552 --- /dev/null +++ b/ydb/library/yql/utils/threading/async_queue.cpp @@ -0,0 +1,18 @@ +#include "async_queue.h" + +namespace NYql { + +TAsyncQueue::TAsyncQueue(size_t numThreads, const TString& poolName) { +    if (1 == numThreads) { +        MtpQueue_.Reset(new TFakeThreadPool()); +    } else { +        MtpQueue_.Reset(new TSimpleThreadPool(TThreadPoolParams{poolName})); +    } +    MtpQueue_->Start(numThreads); +} + +TAsyncQueue::TPtr TAsyncQueue::Make(size_t numThreads, const TString& poolName) { +    return new TAsyncQueue(numThreads, poolName); +} + +} diff --git a/ydb/library/yql/utils/threading/async_queue.h b/ydb/library/yql/utils/threading/async_queue.h new file mode 100644 index 00000000000..1999efc881f --- /dev/null +++ b/ydb/library/yql/utils/threading/async_queue.h @@ -0,0 +1,36 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> +#include <library/cpp/threading/future/async.h> + +#include <util/thread/pool.h> +#include <util/generic/ptr.h> +#include <util/generic/function.h> + + +namespace NYql { + +class TAsyncQueue: public TThrRefBase { +public: +    using TPtr = TIntrusivePtr<TAsyncQueue>; + +    static TPtr Make(size_t numThreads, const TString& poolName); + +    void Stop() { +        MtpQueue_->Stop(); +    } + +    template <typename TCallable> +    [[nodiscard]] +    ::NThreading::TFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>> Async(TCallable&& func) { +        return ::NThreading::Async(std::move(func), *MtpQueue_); +    } + +private: +    TAsyncQueue(size_t numThreads, const TString& poolName); + +private: +    THolder<IThreadPool> MtpQueue_; +}; + +} // NYql | 
