blob: 3dd75b9e08ace7716a5af8140955cc4de5b695fe (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
#pragma once
#include <library/cpp/threading/future/future.h>
#include <library/cpp/threading/future/async.h>
#include <util/thread/pool.h>
#include <util/generic/ptr.h>
#include <util/generic/function.h>
#include <util/system/guard.h>
#include <util/system/rwlock.h>
#include <exception>
namespace NYql {
class TAsyncQueue: public TThrRefBase {
public:
using TPtr = TIntrusivePtr<TAsyncQueue>;
static TPtr Make(size_t numThreads, const TString& poolName);
void Stop() {
auto guard = TWriteGuard(Lock_);
if (MtpQueue_) {
MtpQueue_->Stop();
MtpQueue_.Destroy();
}
}
template <typename TCallable>
[[nodiscard]]
::NThreading::TFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>> Async(TCallable&& func) {
{
auto guard = TReadGuard(Lock_);
if (MtpQueue_) {
return ::NThreading::Async(std::move(func), *MtpQueue_);
}
}
return ::NThreading::MakeErrorFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>>(std::make_exception_ptr(yexception() << "Thread pool is already stopped"));
}
private:
TAsyncQueue(size_t numThreads, const TString& poolName);
private:
TRWMutex Lock_;
THolder<IThreadPool> MtpQueue_;
};
} // NYql
|