summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/transaction_pinger.cpp
blob: d335a997e42993f15d0ffc4bf5208da7137752c5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include "transaction_pinger.h"

#include "transaction.h"

#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>

#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>

#include <yt/yt/core/concurrency/periodic_executor.h>
#include <yt/yt/core/concurrency/scheduler_api.h>
#include <yt/yt/core/concurrency/thread_pool.h>

#include <library/cpp/yt/threading/spin_lock.h>
#include <library/cpp/yt/assert/assert.h>

#include <util/datetime/base.h>
#include <util/random/random.h>

namespace NYT {

////////////////////////////////////////////////////////////////////////////////

class TSharedTransactionPinger
    : public ITransactionPinger
{
public:
    explicit TSharedTransactionPinger(int poolThreadCount)
        : PingerPool_(NConcurrency::CreateThreadPool(
            poolThreadCount, "tx_pinger_pool"))
    { }

    ~TSharedTransactionPinger() override
    {
        PingerPool_->Shutdown();
    }

    ITransactionPingerPtr GetChildTxPinger() override
    {
        return this;
    }

    void RegisterTransaction(const TPingableTransaction& pingableTx) override
    {
        auto [minPingInterval, maxPingInterval] = pingableTx.GetPingInterval();
        auto pingInterval = (minPingInterval + maxPingInterval) / 2;
        double jitter = (maxPingInterval - pingInterval) / pingInterval;

        auto opts = NConcurrency::TPeriodicExecutorOptions{pingInterval, pingInterval, jitter};
        auto periodic = std::make_shared<NConcurrency::TPeriodicExecutorPtr>(nullptr);
        // Have to use weak_ptr in order to break reference cycle
        // This weak_ptr holds pointer to periodic, which will contain this lambda
        // Also we consider that lifetime of this lambda is no longer than lifetime of pingableTx
        // because every pingableTx have to call RemoveTransaction before it is destroyed
        auto pingRoutine = BIND([this, &pingableTx, periodic = std::weak_ptr{periodic}] {
            auto strong_ptr = periodic.lock();
            YT_VERIFY(strong_ptr);
            DoPingTransaction(pingableTx, *strong_ptr);
        });
        *periodic = New<NConcurrency::TPeriodicExecutor>(PingerPool_->GetInvoker(), pingRoutine, opts);
        (*periodic)->Start();

        auto guard = Guard(SpinLock_);
        YT_VERIFY(!Transactions_.contains(pingableTx.GetId()));
        Transactions_[pingableTx.GetId()] = std::move(periodic);
    }

    bool HasTransaction(const TPingableTransaction& pingableTx) override
    {
        auto guard = Guard(SpinLock_);
        return Transactions_.contains(pingableTx.GetId());
    }


    void RemoveTransaction(const TPingableTransaction& pingableTx) override
    {
        std::shared_ptr<NConcurrency::TPeriodicExecutorPtr> periodic;
        {
            auto guard = Guard(SpinLock_);

            auto it = Transactions_.find(pingableTx.GetId());

            YT_VERIFY(it != Transactions_.end());

            periodic = std::move(it->second);
            Transactions_.erase(it);
        }
        NConcurrency::WaitUntilSet((*periodic)->Stop());
    }

private:
    void DoPingTransaction(const TPingableTransaction& pingableTx,
                           NConcurrency::TPeriodicExecutorPtr periodic)
    {
        try {
            pingableTx.Ping();
        } catch (const TErrorResponse& e) {
            /// NB: No logging here, CheckError() already logged TErrorResponse.
            if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
                YT_UNUSED_FUTURE(periodic->Stop());
            } else if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
                periodic->ScheduleOutOfBand();
            }
        } catch (const std::exception& e) {
            YT_LOG_ERROR("DoPingTransaction has failed (TransactionId: %v, Error: %v)",
                GetGuidAsString(pingableTx.GetId()),
                e.what());
        }
    }


private:
    YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
    THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_;

    NConcurrency::IThreadPoolPtr PingerPool_;
};

////////////////////////////////////////////////////////////////////////////////

ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config)
{
    YT_LOG_DEBUG("Using async transaction pinger");

    return MakeIntrusive<TSharedTransactionPinger>(config->AsyncTxPingerPoolThreads);
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT