aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/lock.cpp
blob: 88110f9266ec5c7f7a8468340a653834dc3f1b11 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include "lock.h"

#include "yt_poller.h"

#include <yt/cpp/mapreduce/http/retry_request.h>

#include <yt/cpp/mapreduce/common/retry_lib.h>

#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>

#include <util/string/builder.h>

namespace NYT {
namespace NDetail {

using namespace NRawClient;

////////////////////////////////////////////////////////////////////////////////

class TLockPollerItem
    : public IYtPollerItem
{
public:
    TLockPollerItem(const TLockId& lockId, ::NThreading::TPromise<void> acquired)
        : LockStateYPath_("#" + GetGuidAsString(lockId) + "/@state")
        , Acquired_(acquired)
    { }

    void PrepareRequest(TRawBatchRequest* batchRequest) override
    {
        LockState_ = batchRequest->Get(TTransactionId(), LockStateYPath_, TGetOptions());
    }

    EStatus OnRequestExecuted() override
    {
        try {
            const auto& state = LockState_.GetValue().AsString();
            if (state == "acquired") {
                Acquired_.SetValue();
                return PollBreak;
            }
        } catch (const TErrorResponse& e) {
            if (!IsRetriable(e)) {
                Acquired_.SetException(std::current_exception());
                return PollBreak;
            }
        } catch (const std::exception& e) {
            if (!IsRetriable(e)) {
                Acquired_.SetException(std::current_exception());
                return PollBreak;
            }
        }
        return PollContinue;
    }

    void OnItemDiscarded() override
    {
        Acquired_.SetException(std::make_exception_ptr(yexception() << "Operation cancelled"));
    }

private:
    const TString LockStateYPath_;
    ::NThreading::TPromise<void> Acquired_;

    ::NThreading::TFuture<TNode> LockState_;
};

////////////////////////////////////////////////////////////////////////////////

TLock::TLock(const TLockId& lockId, TClientPtr client, bool waitable)
    : LockId_(lockId)
    , Client_(std::move(client))
{
    if (!waitable) {
        Acquired_ = ::NThreading::MakeFuture();
    }
}

const TLockId& TLock::GetId() const
{
    return LockId_;
}

TNodeId TLock::GetLockedNodeId() const
{
    auto nodeIdNode = Client_->Get(
        ::TStringBuilder() << '#' << GetGuidAsString(LockId_) << "/@node_id",
        TGetOptions());
    return GetGuid(nodeIdNode.AsString());
}

const ::NThreading::TFuture<void>& TLock::GetAcquiredFuture() const
{
    if (!Acquired_) {
        auto promise = ::NThreading::NewPromise<void>();
        Client_->GetYtPoller().Watch(::MakeIntrusive<TLockPollerItem>(LockId_, promise));
        Acquired_ = promise.GetFuture();
    }
    return *Acquired_;
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NDetail
} // namespace NYT