aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/abortable_registry.cpp
blob: 283d39e04950c9d72ac9104bc9275a49dc11228f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include "abortable_registry.h"

#include <yt/cpp/mapreduce/common/retry_lib.h>

#include <yt/cpp/mapreduce/interface/common.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>

#include <util/generic/singleton.h>

namespace NYT {
namespace NDetail {

using namespace NRawClient;

////////////////////////////////////////////////////////////////////////////////

TTransactionAbortable::TTransactionAbortable(const TClientContext& context, const TTransactionId& transactionId)
    : Context_(context)
    , TransactionId_(transactionId)
{ }

void TTransactionAbortable::Abort()
{
    AbortTransaction(nullptr, Context_, TransactionId_);
}

TString TTransactionAbortable::GetType() const
{
    return "transaction";
}

////////////////////////////////////////////////////////////////////////////////

TOperationAbortable::TOperationAbortable(IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TOperationId& operationId)
    : ClientRetryPolicy_(std::move(clientRetryPolicy))
    , Context_(std::move(context))
    , OperationId_(operationId)
{ }


void TOperationAbortable::Abort()
{
    AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, OperationId_);
}

TString TOperationAbortable::GetType() const
{
    return "operation";
}

////////////////////////////////////////////////////////////////////////////////

void TAbortableRegistry::AbortAllAndBlockForever()
{
    auto guard = Guard(Lock_);

    for (const auto& entry : ActiveAbortables_) {
        const auto& id = entry.first;
        const auto& abortable = entry.second;
        try {
            abortable->Abort();
        } catch (std::exception& ex) {
            YT_LOG_ERROR("Exception while aborting %v %v: %v",
                abortable->GetType(),
                id,
                ex.what());
        }
    }

    Running_ = false;
}

void TAbortableRegistry::Add(const TGUID& id, IAbortablePtr abortable)
{
    auto guard = Guard(Lock_);

    if (!Running_) {
        Sleep(TDuration::Max());
    }

    ActiveAbortables_[id] = abortable;
}

void TAbortableRegistry::Remove(const TGUID& id)
{
    auto guard = Guard(Lock_);

    if (!Running_) {
        Sleep(TDuration::Max());
    }

    ActiveAbortables_.erase(id);
}

////////////////////////////////////////////////////////////////////////////////

namespace {

class TRegistryHolder
{
public:
    TRegistryHolder()
        : Registry_(::MakeIntrusive<TAbortableRegistry>())
    { }

    ::TIntrusivePtr<TAbortableRegistry> Get()
    {
        return Registry_;
    }

private:
    ::TIntrusivePtr<TAbortableRegistry> Registry_;
};

} // namespace

::TIntrusivePtr<TAbortableRegistry> TAbortableRegistry::Get()
{
    return Singleton<TRegistryHolder>()->Get();
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NDetail
} // namespace NYT