aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/utils/threading/async_queue.h
blob: 3dd75b9e08ace7716a5af8140955cc4de5b695fe (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#pragma once

#include <library/cpp/threading/future/future.h>
#include <library/cpp/threading/future/async.h>

#include <util/thread/pool.h>
#include <util/generic/ptr.h>
#include <util/generic/function.h>
#include <util/system/guard.h>
#include <util/system/rwlock.h>

#include <exception>

namespace NYql {

class TAsyncQueue: public TThrRefBase {
public:
    using TPtr = TIntrusivePtr<TAsyncQueue>;

    static TPtr Make(size_t numThreads, const TString& poolName);

    void Stop() {
        auto guard = TWriteGuard(Lock_);
        if (MtpQueue_) {
            MtpQueue_->Stop();
            MtpQueue_.Destroy();
        }
    }

    template <typename TCallable>
    [[nodiscard]]
    ::NThreading::TFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>> Async(TCallable&& func) {
        {
            auto guard = TReadGuard(Lock_);
            if (MtpQueue_) {
                return ::NThreading::Async(std::move(func), *MtpQueue_);
            }
        }

        return ::NThreading::MakeErrorFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>>(std::make_exception_ptr(yexception() << "Thread pool is already stopped"));
    }

private:
    TAsyncQueue(size_t numThreads, const TString& poolName);

private:
    TRWMutex Lock_;
    THolder<IThreadPool> MtpQueue_;
};

} // NYql