#include "transaction_pinger.h" #include "transaction.h" #include #include #include #include #include #include #include #include #include #include #include #include namespace NYT { //////////////////////////////////////////////////////////////////////////////// class TSharedTransactionPinger : public ITransactionPinger { public: TSharedTransactionPinger(int poolThreadCount, IRawClientPtr rawClient) : ThreadPool_(NConcurrency::CreateThreadPool(poolThreadCount, "tx_pinger_pool")) , Invoker_(ThreadPool_->GetInvoker()) , RawClient_(std::move(rawClient)) { } ~TSharedTransactionPinger() override { ThreadPool_->Shutdown(); } ITransactionPingerPtr GetChildTxPinger() override { return this; } void RegisterTransaction(const TPingableTransaction& pingableTx) override { auto [minPingInterval, maxPingInterval] = pingableTx.GetPingInterval(); auto pingInterval = (minPingInterval + maxPingInterval) / 2; double jitter = (maxPingInterval - pingInterval) / pingInterval; auto opts = NConcurrency::TPeriodicExecutorOptions{pingInterval, pingInterval, jitter}; auto periodic = std::make_shared(nullptr); // Have to use weak_ptr in order to break reference cycle // This weak_ptr holds pointer to periodic, which will contain this lambda // Don't capture pingableTx by reference: an in-flight ping may outlive it and race with its destruction auto pingRoutine = BIND([this, rawClient = RawClient_, transactionId = pingableTx.GetId(), periodic = std::weak_ptr{periodic}] { auto strong_ptr = periodic.lock(); // NB: RemoveTransaction calls (*periodic)->Stop() fire-and-forget and then drops the last shared_ptr, // so if this callback was queued but hadn't started yet, lock() returns null. if (!strong_ptr) { // The executor is being torn down — nothing to ping. return; } DoPingTransaction(rawClient, transactionId, *strong_ptr); }); *periodic = New(ThreadPool_->GetInvoker(), pingRoutine, opts); (*periodic)->Start(); auto guard = Guard(SpinLock_); YT_VERIFY(!Transactions_.contains(pingableTx.GetId())); Transactions_[pingableTx.GetId()] = std::move(periodic); } bool HasTransaction(const TPingableTransaction& pingableTx) override { auto guard = Guard(SpinLock_); return Transactions_.contains(pingableTx.GetId()); } void RemoveTransaction(const TPingableTransaction& pingableTx) override { std::shared_ptr periodic; { auto guard = Guard(SpinLock_); auto it = Transactions_.find(pingableTx.GetId()); YT_VERIFY(it != Transactions_.end()); periodic = std::move(it->second); Transactions_.erase(it); } YT_UNUSED_FUTURE((*periodic)->Stop()); } TFuture AsyncAbortTransaction(const TTransactionId& transactionId) override { auto result = BIND([rawClient = RawClient_, transactionId = transactionId] { TMutationId mutationId; rawClient->AbortTransaction(mutationId, transactionId); }) .AsyncVia(ThreadPool_->GetInvoker()) .Run(); return result; } private: void DoPingTransaction(const IRawClientPtr& rawClient, const TTransactionId& transactionId, NConcurrency::TPeriodicExecutorPtr periodic) { try { rawClient->PingTransaction(transactionId); } catch (const TErrorResponse& e) { /// NB: No logging here, CheckError() already logged TErrorResponse. if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) { YT_UNUSED_FUTURE(periodic->Stop()); } else if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) { periodic->ScheduleOutOfBand(); } } catch (const std::exception& e) { YT_LOG_ERROR("DoPingTransaction has failed (TransactionId: %v, Error: %v)", GetGuidAsString(transactionId), e.what()); } } private: YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); THashMap> Transactions_; NConcurrency::IThreadPoolPtr ThreadPool_; IInvokerPtr Invoker_; IRawClientPtr RawClient_; }; //////////////////////////////////////////////////////////////////////////////// ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config, IRawClientPtr rawClient) { YT_LOG_DEBUG("Using async transaction pinger"); return MakeIntrusive(config->AsyncTxPingerPoolThreads, std::move(rawClient)); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT