1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
#include "transaction_pinger.h"
#include "transaction.h"
#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <yt/yt/core/concurrency/periodic_executor.h>
#include <yt/yt/core/concurrency/scheduler_api.h>
#include <yt/yt/core/concurrency/thread_pool.h>
#include <library/cpp/yt/threading/spin_lock.h>
#include <library/cpp/yt/assert/assert.h>
#include <util/datetime/base.h>
#include <util/random/random.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
class TSharedTransactionPinger
: public ITransactionPinger
{
public:
TSharedTransactionPinger(int poolThreadCount, IRawClientPtr rawClient)
: ThreadPool_(NConcurrency::CreateThreadPool(poolThreadCount, "tx_pinger_pool"))
, Invoker_(ThreadPool_->GetInvoker())
, RawClient_(std::move(rawClient))
{ }
~TSharedTransactionPinger() override
{
ThreadPool_->Shutdown();
}
ITransactionPingerPtr GetChildTxPinger() override
{
return this;
}
void RegisterTransaction(const TPingableTransaction& pingableTx) override
{
auto [minPingInterval, maxPingInterval] = pingableTx.GetPingInterval();
auto pingInterval = (minPingInterval + maxPingInterval) / 2;
double jitter = (maxPingInterval - pingInterval) / pingInterval;
auto opts = NConcurrency::TPeriodicExecutorOptions{pingInterval, pingInterval, jitter};
auto periodic = std::make_shared<NConcurrency::TPeriodicExecutorPtr>(nullptr);
// Have to use weak_ptr in order to break reference cycle
// This weak_ptr holds pointer to periodic, which will contain this lambda
// Also we consider that lifetime of this lambda is no longer than lifetime of pingableTx
// because every pingableTx have to call RemoveTransaction before it is destroyed
auto pingRoutine = BIND([this, &pingableTx, periodic = std::weak_ptr{periodic}] {
auto strong_ptr = periodic.lock();
YT_VERIFY(strong_ptr);
DoPingTransaction(pingableTx, *strong_ptr);
});
*periodic = New<NConcurrency::TPeriodicExecutor>(ThreadPool_->GetInvoker(), pingRoutine, opts);
(*periodic)->Start();
auto guard = Guard(SpinLock_);
YT_VERIFY(!Transactions_.contains(pingableTx.GetId()));
Transactions_[pingableTx.GetId()] = std::move(periodic);
}
bool HasTransaction(const TPingableTransaction& pingableTx) override
{
auto guard = Guard(SpinLock_);
return Transactions_.contains(pingableTx.GetId());
}
void RemoveTransaction(const TPingableTransaction& pingableTx) override
{
std::shared_ptr<NConcurrency::TPeriodicExecutorPtr> periodic;
{
auto guard = Guard(SpinLock_);
auto it = Transactions_.find(pingableTx.GetId());
YT_VERIFY(it != Transactions_.end());
periodic = std::move(it->second);
Transactions_.erase(it);
}
YT_UNUSED_FUTURE((*periodic)->Stop());
}
TFuture<void> AsyncAbortTransaction(const TTransactionId& transactionId) override
{
auto result = BIND([rawClient = RawClient_, transactionId = transactionId] {
TMutationId mutationId;
rawClient->AbortTransaction(mutationId, transactionId);
})
.AsyncVia(ThreadPool_->GetInvoker())
.Run();
return result;
}
private:
void DoPingTransaction(const TPingableTransaction& pingableTx, NConcurrency::TPeriodicExecutorPtr periodic)
{
try {
pingableTx.Ping();
} catch (const TErrorResponse& e) {
/// NB: No logging here, CheckError() already logged TErrorResponse.
if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
YT_UNUSED_FUTURE(periodic->Stop());
} else if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
periodic->ScheduleOutOfBand();
}
} catch (const std::exception& e) {
YT_LOG_ERROR("DoPingTransaction has failed (TransactionId: %v, Error: %v)",
GetGuidAsString(pingableTx.GetId()),
e.what());
}
}
private:
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_;
NConcurrency::IThreadPoolPtr ThreadPool_;
IInvokerPtr Invoker_;
IRawClientPtr RawClient_;
};
////////////////////////////////////////////////////////////////////////////////
ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config, IRawClientPtr rawClient)
{
YT_LOG_DEBUG("Using async transaction pinger");
return MakeIntrusive<TSharedTransactionPinger>(config->AsyncTxPingerPoolThreads, std::move(rawClient));
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
|