1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
#include "abortable_registry.h"
#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <yt/cpp/mapreduce/interface/common.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <util/generic/singleton.h>
namespace NYT {
namespace NDetail {
using namespace NRawClient;
////////////////////////////////////////////////////////////////////////////////
TTransactionAbortable::TTransactionAbortable(const TClientContext& context, const TTransactionId& transactionId)
: Context_(context)
, TransactionId_(transactionId)
{ }
void TTransactionAbortable::Abort()
{
AbortTransaction(nullptr, Context_, TransactionId_);
}
TString TTransactionAbortable::GetType() const
{
return "transaction";
}
////////////////////////////////////////////////////////////////////////////////
TOperationAbortable::TOperationAbortable(IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TOperationId& operationId)
: ClientRetryPolicy_(std::move(clientRetryPolicy))
, Context_(std::move(context))
, OperationId_(operationId)
{ }
void TOperationAbortable::Abort()
{
AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, OperationId_);
}
TString TOperationAbortable::GetType() const
{
return "operation";
}
////////////////////////////////////////////////////////////////////////////////
void TAbortableRegistry::AbortAllAndBlockForever()
{
auto guard = Guard(Lock_);
for (const auto& entry : ActiveAbortables_) {
const auto& id = entry.first;
const auto& abortable = entry.second;
try {
abortable->Abort();
} catch (std::exception& ex) {
YT_LOG_ERROR("Exception while aborting %v %v: %v",
abortable->GetType(),
id,
ex.what());
}
}
Running_ = false;
}
void TAbortableRegistry::Add(const TGUID& id, IAbortablePtr abortable)
{
auto guard = Guard(Lock_);
if (!Running_) {
Sleep(TDuration::Max());
}
ActiveAbortables_[id] = abortable;
}
void TAbortableRegistry::Remove(const TGUID& id)
{
auto guard = Guard(Lock_);
if (!Running_) {
Sleep(TDuration::Max());
}
ActiveAbortables_.erase(id);
}
////////////////////////////////////////////////////////////////////////////////
namespace {
class TRegistryHolder
{
public:
TRegistryHolder()
: Registry_(::MakeIntrusive<TAbortableRegistry>())
{ }
::TIntrusivePtr<TAbortableRegistry> Get()
{
return Registry_;
}
private:
::TIntrusivePtr<TAbortableRegistry> Registry_;
};
} // namespace
::TIntrusivePtr<TAbortableRegistry> TAbortableRegistry::Get()
{
return Singleton<TRegistryHolder>()->Get();
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NDetail
} // namespace NYT
|