summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/rpc_client/wrap_rpc_error.h
blob: 61aef42d6a741ab1abd6cf0ae50b749aab8d52f1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#pragma once

#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>

#include <yt/yt/core/actions/bind.h>
#include <yt/yt/core/concurrency/scheduler_api.h>

#include <library/cpp/threading/future/core/future.h>


namespace NYT::NDetail {

////////////////////////////////////////////////////////////////////////////////

TErrorResponse ToErrorResponse(TErrorException ex);

////////////////////////////////////////////////////////////////////////////////

template <typename TResult>
::NThreading::TFuture<TResult> WrapRpcError(::NThreading::TFuture<TResult> future) {
    return future.Apply(([](auto&& tryResult){
        try {
            tryResult.TryRethrow();
            return std::forward<decltype(tryResult)>(tryResult);
        } catch (TErrorException ex) {
            return ::NThreading::MakeErrorFuture<TResult>(std::make_exception_ptr(ToErrorResponse(std::move(ex))));
        }
    }));
}

template <typename TResult>
TResult WaitAndProcess(TFuture<TResult> future) {
    try {
        if constexpr (std::is_same_v<TResult, void>) {
            NConcurrency::WaitFor(future).ThrowOnError();
        } else {
            auto result = NConcurrency::WaitFor(future).ValueOrThrow();
            return result;
        }
    } catch (TErrorException ex) {
        throw ToErrorResponse(std::move(ex));
    }
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NDetail