diff options
| author | ermolovd <[email protected]> | 2026-04-28 10:36:20 +0300 |
|---|---|---|
| committer | ermolovd <[email protected]> | 2026-04-28 11:34:34 +0300 |
| commit | 693274b40b1a9ebdf2da02f2e87fbc8502105738 (patch) | |
| tree | 229c575c802e7ae1f9290502efce9b5879fbfea3 /yt/cpp | |
| parent | 10edf8d9197e9c45458d73cb57a601e0827d159c (diff) | |
YT-27827: TPingerTransaction use BlockingGet instead of WaitFor in destructor
commit_hash:90bbe36635e0d48c81c153567dcedf28f103efbe
Diffstat (limited to 'yt/cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 2 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/transaction.cpp | 12 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/transaction.h | 1 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/transaction_pinger.cpp | 37 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/transaction_pinger.h | 8 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 12 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http_client/raw_client.cpp | 127 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http_client/raw_client.h | 7 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/common.cpp | 19 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/common.h | 2 |
10 files changed, 139 insertions, 88 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 06e27671736..aeed59a7d65 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1694,7 +1694,7 @@ ITransactionPingerPtr TClient::GetTransactionPinger() { auto g = Guard(Lock_); if (!TransactionPinger_) { - TransactionPinger_ = CreateTransactionPinger(Context_.Config); + TransactionPinger_ = CreateTransactionPinger(Context_.Config, RawClient_); } return TransactionPinger_; } diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp index 289c89340e6..c54d01f8136 100644 --- a/yt/cpp/mapreduce/client/transaction.cpp +++ b/yt/cpp/mapreduce/client/transaction.cpp @@ -10,6 +10,8 @@ #include <yt/cpp/mapreduce/interface/error_codes.h> #include <yt/cpp/mapreduce/interface/raw_client.h> +#include <yt/yt/core/actions/future.h> + #include <util/datetime/base.h> #include <util/generic/scope.h> @@ -166,11 +168,11 @@ void TPingableTransaction::Stop(EStopAction action) }); break; case EStopAction::Abort: - NDetail::RequestWithRetry<void>( - ClientRetryPolicy_->CreatePolicyForGenericRequest(), - [this] (TMutationId& mutationId) { - RawClient_->AbortTransaction(mutationId, TransactionId_); - }); + // Aborting transaction can be called from destructor while unwinding exception stack. + // We can't call WaitFor in such conditions (and internally we do it). + // So we offload aborting to separate thread. + Pinger_->AsyncAbortTransaction(TransactionId_) + .BlockingGet(); break; case EStopAction::Detach: // Do nothing. diff --git a/yt/cpp/mapreduce/client/transaction.h b/yt/cpp/mapreduce/client/transaction.h index d1a5109c3f0..8f939f9d4c4 100644 --- a/yt/cpp/mapreduce/client/transaction.h +++ b/yt/cpp/mapreduce/client/transaction.h @@ -49,7 +49,6 @@ public: void Abort(); void Detach(); - private: enum class EStopAction { diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp index d335a997e42..7a659b081ec 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.cpp +++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp @@ -4,6 +4,7 @@ #include <yt/cpp/mapreduce/interface/error_codes.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> @@ -26,14 +27,15 @@ class TSharedTransactionPinger : public ITransactionPinger { public: - explicit TSharedTransactionPinger(int poolThreadCount) - : PingerPool_(NConcurrency::CreateThreadPool( - poolThreadCount, "tx_pinger_pool")) + TSharedTransactionPinger(int poolThreadCount, IRawClientPtr rawClient) + : ThreadPool_(NConcurrency::CreateThreadPool(poolThreadCount, "tx_pinger_pool")) + , Invoker_(ThreadPool_->GetInvoker()) + , RawClient_(std::move(rawClient)) { } ~TSharedTransactionPinger() override { - PingerPool_->Shutdown(); + ThreadPool_->Shutdown(); } ITransactionPingerPtr GetChildTxPinger() override @@ -58,7 +60,7 @@ public: YT_VERIFY(strong_ptr); DoPingTransaction(pingableTx, *strong_ptr); }); - *periodic = New<NConcurrency::TPeriodicExecutor>(PingerPool_->GetInvoker(), pingRoutine, opts); + *periodic = New<NConcurrency::TPeriodicExecutor>(ThreadPool_->GetInvoker(), pingRoutine, opts); (*periodic)->Start(); auto guard = Guard(SpinLock_); @@ -86,12 +88,23 @@ public: periodic = std::move(it->second); Transactions_.erase(it); } - NConcurrency::WaitUntilSet((*periodic)->Stop()); + YT_UNUSED_FUTURE((*periodic)->Stop()); + } + + TFuture<void> AsyncAbortTransaction(const TTransactionId& transactionId) override + { + auto result = BIND([rawClient = RawClient_, transactionId = transactionId] { + TMutationId mutationId; + rawClient->AbortTransaction(mutationId, transactionId); + }) + .AsyncVia(ThreadPool_->GetInvoker()) + .Run(); + + return result; } private: - void DoPingTransaction(const TPingableTransaction& pingableTx, - NConcurrency::TPeriodicExecutorPtr periodic) + void DoPingTransaction(const TPingableTransaction& pingableTx, NConcurrency::TPeriodicExecutorPtr periodic) { try { pingableTx.Ping(); @@ -114,16 +127,18 @@ private: YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_; - NConcurrency::IThreadPoolPtr PingerPool_; + NConcurrency::IThreadPoolPtr ThreadPool_; + IInvokerPtr Invoker_; + IRawClientPtr RawClient_; }; //////////////////////////////////////////////////////////////////////////////// -ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config) +ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config, IRawClientPtr rawClient) { YT_LOG_DEBUG("Using async transaction pinger"); - return MakeIntrusive<TSharedTransactionPinger>(config->AsyncTxPingerPoolThreads); + return MakeIntrusive<TSharedTransactionPinger>(config->AsyncTxPingerPoolThreads, std::move(rawClient)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/transaction_pinger.h b/yt/cpp/mapreduce/client/transaction_pinger.h index 98e8b5cb2f8..2fa8ec5c9bc 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.h +++ b/yt/cpp/mapreduce/client/transaction_pinger.h @@ -13,6 +13,10 @@ namespace NYT { class TPingableTransaction; +// Don't want to include public.h to avoid polluting header with TIntrusivePtr +template <typename> +class TFuture; + //////////////////////////////////////////////////////////////////////////////// // Each registered transaction must be removed from pinger @@ -30,9 +34,11 @@ public: virtual bool HasTransaction(const TPingableTransaction& pingableTx) = 0; virtual void RemoveTransaction(const TPingableTransaction& pingableTx) = 0; + + virtual TFuture<void> AsyncAbortTransaction(const TTransactionId& transactionId) = 0; }; -ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config); +ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config, IRawClientPtr rawClient); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index 44c25079d62..8e8febf44b3 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -210,18 +210,8 @@ void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite TMutationId THttpHeader::AddMutationId() { - TGUID guid; - - // Some users use `fork()' with yt wrapper - // (actually they use python + multiprocessing) - // and CreateGuid is not resistant to `fork()', so spice it a little bit. - // - // Check IGNIETFERRO-610 - CreateGuid(&guid); - guid.dw[2] = GetPID() ^ MicroSeconds(); - + TMutationId guid = NDetail::GenerateMutationId(); AddParameter("mutation_id", GetGuidAsString(guid), true); - return guid; } diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp index bf63a7ae3bb..8ef2a349a3e 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.cpp +++ b/yt/cpp/mapreduce/http_client/raw_client.cpp @@ -65,6 +65,17 @@ void CheckError(const TString& requestId, NHttp::IResponsePtr response) } } +void SetMutationId(TNode& params, TMutationId& mutationId) +{ + if (mutationId.IsEmpty()) { + params["retry"] = false; + mutationId = GenerateMutationId(); + } else { + params["retry"] = true; + } + params["mutation_id"] = GetGuidAsString(mutationId); +} + } // anonymous namespace //////////////////////////////////////////////////////////////////////////////// @@ -308,68 +319,20 @@ TTransactionId THttpRawClient::StartTransaction( void THttpRawClient::PingTransaction(const TTransactionId& transactionId) { - auto traceContext = Context_.Config->EnableClientTracing - ? NTracing::CreateTraceContextFromCurrent("ping_tx") - : nullptr; - NTracing::TCurrentTraceContextGuard traceContextGuard(traceContext); - - std::call_once(PingClientInitOnceFlag_, [this] () { - InitPingClient(); - }); - - auto url = TString::Join(Context_.UseTLS ? "https://" : "http://", Context_.ServerName, "/api/", Context_.Config->ApiVersion, "/ping_tx"); - auto headers = New<NHttp::THeaders>(); - auto requestId = CreateGuidAsString(); - - headers->Add("Host", url); - headers->Add("User-Agent", TProcessState::Get()->ClientVersion); - - if (const auto& serviceTicketAuth = Context_.ServiceTicketAuth) { - const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket(); - headers->Add("X-Ya-Service-Ticket", serviceTicket); - } else if (const auto& token = Context_.Token; !token.empty()) { - headers->Add("Authorization", "OAuth " + token); - } - - headers->Add("Transfer-Encoding", "chunked"); - headers->Add("X-YT-Correlation-Id", requestId); - headers->Add("X-YT-Header-Format", "<format=text>yson"); - headers->Add("Content-Encoding", "identity"); - headers->Add("Accept-Encoding", "identity"); - - if (traceContext) { - auto traceparent = FormatTraceParentHeader(traceContext->GetTraceId(), traceContext->GetSpanId()); - headers->Add("traceparent", traceparent); - } - TNode node; node["transaction_id"] = GetGuidAsString(transactionId); auto strParams = NodeToYsonString(node); - YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)", - requestId, - Context_.ServerName, - url, - strParams); - - auto response = NConcurrency::WaitFor(PingHttpClient_->Post(url, TSharedRef::FromString(strParams), headers)) - .ValueOrThrow(); - CheckError(requestId, response); - - YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)", - requestId, - response->ReadAll().size(), - strParams); + PostAsync("ping_tx", node); } void THttpRawClient::AbortTransaction( TMutationId& mutationId, const TTransactionId& transactionId) { - THttpHeader header("POST", "abort_tx"); - header.AddMutationId(); - header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId)); - RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); + TNode params = NRawClient::SerializeParamsForAbortTransaction(transactionId); + SetMutationId(params, mutationId); + PostAsync("abort_tx", params); } void THttpRawClient::CommitTransaction( @@ -1257,7 +1220,7 @@ IRawClientPtr THttpRawClient::Clone(const TClientContext& context) return ::MakeIntrusive<THttpRawClient>(context); } -void THttpRawClient::InitPingClient() { +void THttpRawClient::InitAsyncClient() { auto httpPoller = NConcurrency::CreateThreadPoolPoller( Context_.Config->AsyncHttpClientThreads, "tx_http_client_poller"); @@ -1265,14 +1228,68 @@ void THttpRawClient::InitPingClient() { if (Context_.UseTLS) { auto httpsClientConfig = NYT::New<NHttps::TClientConfig>(); httpsClientConfig->MaxIdleConnections = 16; - PingHttpClient_ = NHttps::CreateClient(std::move(httpsClientConfig), std::move(httpPoller)); + AsyncHttpClient_ = NHttps::CreateClient(std::move(httpsClientConfig), std::move(httpPoller)); } else { auto httpClientConfig = NYT::New<NHttp::TClientConfig>(); httpClientConfig->MaxIdleConnections = 16; - PingHttpClient_ = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller)); + AsyncHttpClient_ = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller)); } } +void THttpRawClient::PostAsync(const TString& command, TNode params) +{ + auto traceContext = Context_.Config->EnableClientTracing + ? NTracing::CreateTraceContextFromCurrent("ping_tx") + : nullptr; + NTracing::TCurrentTraceContextGuard traceContextGuard(traceContext); + + std::call_once(AsyncHttpClientInitOnceFlag_, [this] () { + InitAsyncClient(); + }); + + auto url = TString::Join(Context_.UseTLS ? "https://" : "http://", Context_.ServerName, "/api/", Context_.Config->ApiVersion, "/", command); + auto headers = New<NHttp::THeaders>(); + auto requestId = CreateGuidAsString(); + + headers->Add("Host", url); + headers->Add("User-Agent", TProcessState::Get()->ClientVersion); + + if (const auto& serviceTicketAuth = Context_.ServiceTicketAuth) { + const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket(); + headers->Add("X-Ya-Service-Ticket", serviceTicket); + } else if (const auto& token = Context_.Token; !token.empty()) { + headers->Add("Authorization", "OAuth " + token); + } + + headers->Add("Transfer-Encoding", "chunked"); + headers->Add("X-YT-Correlation-Id", requestId); + headers->Add("X-YT-Header-Format", "<format=text>yson"); + headers->Add("Content-Encoding", "identity"); + headers->Add("Accept-Encoding", "identity"); + + if (traceContext) { + auto traceparent = FormatTraceParentHeader(traceContext->GetTraceId(), traceContext->GetSpanId()); + headers->Add("traceparent", traceparent); + } + + auto strParams = NodeToYsonString(params); + + YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)", + requestId, + Context_.ServerName, + url, + strParams); + + auto response = NConcurrency::WaitFor(AsyncHttpClient_->Post(url, TSharedRef::FromString(strParams), headers)) + .ValueOrThrow(); + CheckError(requestId, response); + + YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)", + requestId, + response->ReadAll().size(), + strParams); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h index d2a503605ff..363c50b9e21 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.h +++ b/yt/cpp/mapreduce/http_client/raw_client.h @@ -408,12 +408,13 @@ public: IRawClientPtr Clone(const TClientContext& context) override; private: - void InitPingClient(); + void InitAsyncClient(); + void PostAsync(const TString& command, TNode params); private: const TClientContext Context_; - NHttp::IClientPtr PingHttpClient_; - std::once_flag PingClientInitOnceFlag_; + NHttp::IClientPtr AsyncHttpClient_; + std::once_flag AsyncHttpClientInitOnceFlag_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/interface/common.cpp b/yt/cpp/mapreduce/interface/common.cpp index fdb3e66b191..7c54fd65d09 100644 --- a/yt/cpp/mapreduce/interface/common.cpp +++ b/yt/cpp/mapreduce/interface/common.cpp @@ -13,6 +13,10 @@ #include <util/generic/xrange.h> +#include <util/datetime/base.h> + +#include <util/system/getpid.h> + namespace NYT { using ::google::protobuf::Descriptor; @@ -762,6 +766,21 @@ TString ToString(EValueType type) ythrow yexception() << "Invalid value type " << static_cast<int>(type); } +TMutationId GenerateMutationId() +{ + TGUID guid; + + // Some users use `fork()' with yt wrapper + // (actually they use python + multiprocessing) + // and CreateGuid is not resistant to `fork()', so spice it a little bit. + // + // Check IGNIETFERRO-610 + CreateGuid(&guid); + guid.dw[2] = GetPID() ^ MicroSeconds(); + + return guid; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NDetail diff --git a/yt/cpp/mapreduce/interface/common.h b/yt/cpp/mapreduce/interface/common.h index 7f09fe4c004..d0339f7baf8 100644 --- a/yt/cpp/mapreduce/interface/common.h +++ b/yt/cpp/mapreduce/interface/common.h @@ -1402,6 +1402,8 @@ namespace NDetail { // TODO: we should use default GENERATE_ENUM_SERIALIZATION TString ToString(EValueType type); +TMutationId GenerateMutationId(); + } // namespace NDetail /// @endcond |
