summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <[email protected]>2022-08-20 16:50:23 +0300
committerudovichenko-r <[email protected]>2022-08-20 16:50:23 +0300
commit75d8b5d5e58445073b0260897a93a78269c3f43f (patch)
tree865a771729e127c1d7ff74a7a5df55dbeb8cb3b6
parent65827f08ae1b27337d63bd10f56a660b25b28991 (diff)
[] Proper redirect to session pool threads
-rw-r--r--ydb/library/yql/utils/threading/CMakeLists.txt1
-rw-r--r--ydb/library/yql/utils/threading/async_queue.cpp18
-rw-r--r--ydb/library/yql/utils/threading/async_queue.h36
3 files changed, 55 insertions, 0 deletions
diff --git a/ydb/library/yql/utils/threading/CMakeLists.txt b/ydb/library/yql/utils/threading/CMakeLists.txt
index 54c02aaac3a..117ec898ba2 100644
--- a/ydb/library/yql/utils/threading/CMakeLists.txt
+++ b/ydb/library/yql/utils/threading/CMakeLists.txt
@@ -14,4 +14,5 @@ target_link_libraries(yql-utils-threading PUBLIC
)
target_sources(yql-utils-threading PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_semaphore.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_queue.cpp
)
diff --git a/ydb/library/yql/utils/threading/async_queue.cpp b/ydb/library/yql/utils/threading/async_queue.cpp
new file mode 100644
index 00000000000..4021636b552
--- /dev/null
+++ b/ydb/library/yql/utils/threading/async_queue.cpp
@@ -0,0 +1,18 @@
+#include "async_queue.h"
+
+namespace NYql {
+
+TAsyncQueue::TAsyncQueue(size_t numThreads, const TString& poolName) {
+ if (1 == numThreads) {
+ MtpQueue_.Reset(new TFakeThreadPool());
+ } else {
+ MtpQueue_.Reset(new TSimpleThreadPool(TThreadPoolParams{poolName}));
+ }
+ MtpQueue_->Start(numThreads);
+}
+
+TAsyncQueue::TPtr TAsyncQueue::Make(size_t numThreads, const TString& poolName) {
+ return new TAsyncQueue(numThreads, poolName);
+}
+
+}
diff --git a/ydb/library/yql/utils/threading/async_queue.h b/ydb/library/yql/utils/threading/async_queue.h
new file mode 100644
index 00000000000..1999efc881f
--- /dev/null
+++ b/ydb/library/yql/utils/threading/async_queue.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/threading/future/async.h>
+
+#include <util/thread/pool.h>
+#include <util/generic/ptr.h>
+#include <util/generic/function.h>
+
+
+namespace NYql {
+
+class TAsyncQueue: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<TAsyncQueue>;
+
+ static TPtr Make(size_t numThreads, const TString& poolName);
+
+ void Stop() {
+ MtpQueue_->Stop();
+ }
+
+ template <typename TCallable>
+ [[nodiscard]]
+ ::NThreading::TFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>> Async(TCallable&& func) {
+ return ::NThreading::Async(std::move(func), *MtpQueue_);
+ }
+
+private:
+ TAsyncQueue(size_t numThreads, const TString& poolName);
+
+private:
+ THolder<IThreadPool> MtpQueue_;
+};
+
+} // NYql