1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
#include "transaction_pinger.h"
#include "transaction.h"
#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <yt/yt/core/concurrency/periodic_executor.h>
#include <yt/yt/core/concurrency/scheduler_api.h>
#include <yt/yt/core/concurrency/thread_pool.h>
#include <library/cpp/yt/threading/spin_lock.h>
#include <library/cpp/yt/assert/assert.h>
#include <util/datetime/base.h>
#include <util/random/random.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
class TSharedTransactionPinger
: public ITransactionPinger
{
public:
TSharedTransactionPinger(int poolThreadCount, IRawClientPtr rawClient)
: ThreadPool_(NConcurrency::CreateThreadPool(poolThreadCount, "tx_pinger_pool"))
, Invoker_(ThreadPool_->GetInvoker())
, RawClient_(std::move(rawClient))
{ }
~TSharedTransactionPinger() override
{
ThreadPool_->Shutdown();
}
ITransactionPingerPtr GetChildTxPinger() override
{
return this;
}
void RegisterTransaction(const TPingableTransaction& pingableTx) override
{
auto [minPingInterval, maxPingInterval] = pingableTx.GetPingInterval();
auto pingInterval = (minPingInterval + maxPingInterval) / 2;
double jitter = (maxPingInterval - pingInterval) / pingInterval;
auto opts = NConcurrency::TPeriodicExecutorOptions{pingInterval, pingInterval, jitter};
auto periodic = std::make_shared<NConcurrency::TPeriodicExecutorPtr>(nullptr);
// Have to use weak_ptr in order to break reference cycle
// This weak_ptr holds pointer to periodic, which will contain this lambda
// Don't capture pingableTx by reference: an in-flight ping may outlive it and race with its destruction
auto pingRoutine = BIND([this, rawClient = RawClient_, transactionId = pingableTx.GetId(), periodic = std::weak_ptr{periodic}] {
auto strong_ptr = periodic.lock();
// NB: RemoveTransaction calls (*periodic)->Stop() fire-and-forget and then drops the last shared_ptr,
// so if this callback was queued but hadn't started yet, lock() returns null.
if (!strong_ptr) {
// The executor is being torn down — nothing to ping.
return;
}
DoPingTransaction(rawClient, transactionId, *strong_ptr);
});
*periodic = New<NConcurrency::TPeriodicExecutor>(ThreadPool_->GetInvoker(), pingRoutine, opts);
(*periodic)->Start();
auto guard = Guard(SpinLock_);
YT_VERIFY(!Transactions_.contains(pingableTx.GetId()));
Transactions_[pingableTx.GetId()] = std::move(periodic);
}
bool HasTransaction(const TPingableTransaction& pingableTx) override
{
auto guard = Guard(SpinLock_);
return Transactions_.contains(pingableTx.GetId());
}
void RemoveTransaction(const TPingableTransaction& pingableTx) override
{
std::shared_ptr<NConcurrency::TPeriodicExecutorPtr> periodic;
{
auto guard = Guard(SpinLock_);
auto it = Transactions_.find(pingableTx.GetId());
YT_VERIFY(it != Transactions_.end());
periodic = std::move(it->second);
Transactions_.erase(it);
}
YT_UNUSED_FUTURE((*periodic)->Stop());
}
TFuture<void> AsyncAbortTransaction(const TTransactionId& transactionId) override
{
auto result = BIND([rawClient = RawClient_, transactionId = transactionId] {
TMutationId mutationId;
rawClient->AbortTransaction(mutationId, transactionId);
})
.AsyncVia(ThreadPool_->GetInvoker())
.Run();
return result;
}
private:
void DoPingTransaction(const IRawClientPtr& rawClient, const TTransactionId& transactionId, NConcurrency::TPeriodicExecutorPtr periodic)
{
try {
rawClient->PingTransaction(transactionId);
} catch (const TErrorResponse& e) {
/// NB: No logging here, CheckError() already logged TErrorResponse.
if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
YT_UNUSED_FUTURE(periodic->Stop());
} else if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
periodic->ScheduleOutOfBand();
}
} catch (const std::exception& e) {
YT_LOG_ERROR("DoPingTransaction has failed (TransactionId: %v, Error: %v)",
GetGuidAsString(transactionId),
e.what());
}
}
private:
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_;
NConcurrency::IThreadPoolPtr ThreadPool_;
IInvokerPtr Invoker_;
IRawClientPtr RawClient_;
};
////////////////////////////////////////////////////////////////////////////////
ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config, IRawClientPtr rawClient)
{
YT_LOG_DEBUG("Using async transaction pinger");
return MakeIntrusive<TSharedTransactionPinger>(config->AsyncTxPingerPoolThreads, std::move(rawClient));
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
|