summaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authorermolovd <[email protected]>2026-04-28 10:36:20 +0300
committerermolovd <[email protected]>2026-04-28 11:34:34 +0300
commit693274b40b1a9ebdf2da02f2e87fbc8502105738 (patch)
tree229c575c802e7ae1f9290502efce9b5879fbfea3 /yt/cpp
parent10edf8d9197e9c45458d73cb57a601e0827d159c (diff)
YT-27827: TPingerTransaction use BlockingGet instead of WaitFor in destructor
commit_hash:90bbe36635e0d48c81c153567dcedf28f103efbe
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp2
-rw-r--r--yt/cpp/mapreduce/client/transaction.cpp12
-rw-r--r--yt/cpp/mapreduce/client/transaction.h1
-rw-r--r--yt/cpp/mapreduce/client/transaction_pinger.cpp37
-rw-r--r--yt/cpp/mapreduce/client/transaction_pinger.h8
-rw-r--r--yt/cpp/mapreduce/http/http.cpp12
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.cpp127
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.h7
-rw-r--r--yt/cpp/mapreduce/interface/common.cpp19
-rw-r--r--yt/cpp/mapreduce/interface/common.h2
10 files changed, 139 insertions, 88 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 06e27671736..aeed59a7d65 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1694,7 +1694,7 @@ ITransactionPingerPtr TClient::GetTransactionPinger()
{
auto g = Guard(Lock_);
if (!TransactionPinger_) {
- TransactionPinger_ = CreateTransactionPinger(Context_.Config);
+ TransactionPinger_ = CreateTransactionPinger(Context_.Config, RawClient_);
}
return TransactionPinger_;
}
diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp
index 289c89340e6..c54d01f8136 100644
--- a/yt/cpp/mapreduce/client/transaction.cpp
+++ b/yt/cpp/mapreduce/client/transaction.cpp
@@ -10,6 +10,8 @@
#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <yt/cpp/mapreduce/interface/raw_client.h>
+#include <yt/yt/core/actions/future.h>
+
#include <util/datetime/base.h>
#include <util/generic/scope.h>
@@ -166,11 +168,11 @@ void TPingableTransaction::Stop(EStopAction action)
});
break;
case EStopAction::Abort:
- NDetail::RequestWithRetry<void>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this] (TMutationId& mutationId) {
- RawClient_->AbortTransaction(mutationId, TransactionId_);
- });
+ // Aborting transaction can be called from destructor while unwinding exception stack.
+ // We can't call WaitFor in such conditions (and internally we do it).
+ // So we offload aborting to separate thread.
+ Pinger_->AsyncAbortTransaction(TransactionId_)
+ .BlockingGet();
break;
case EStopAction::Detach:
// Do nothing.
diff --git a/yt/cpp/mapreduce/client/transaction.h b/yt/cpp/mapreduce/client/transaction.h
index d1a5109c3f0..8f939f9d4c4 100644
--- a/yt/cpp/mapreduce/client/transaction.h
+++ b/yt/cpp/mapreduce/client/transaction.h
@@ -49,7 +49,6 @@ public:
void Abort();
void Detach();
-
private:
enum class EStopAction
{
diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp
index d335a997e42..7a659b081ec 100644
--- a/yt/cpp/mapreduce/client/transaction_pinger.cpp
+++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp
@@ -4,6 +4,7 @@
#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
@@ -26,14 +27,15 @@ class TSharedTransactionPinger
: public ITransactionPinger
{
public:
- explicit TSharedTransactionPinger(int poolThreadCount)
- : PingerPool_(NConcurrency::CreateThreadPool(
- poolThreadCount, "tx_pinger_pool"))
+ TSharedTransactionPinger(int poolThreadCount, IRawClientPtr rawClient)
+ : ThreadPool_(NConcurrency::CreateThreadPool(poolThreadCount, "tx_pinger_pool"))
+ , Invoker_(ThreadPool_->GetInvoker())
+ , RawClient_(std::move(rawClient))
{ }
~TSharedTransactionPinger() override
{
- PingerPool_->Shutdown();
+ ThreadPool_->Shutdown();
}
ITransactionPingerPtr GetChildTxPinger() override
@@ -58,7 +60,7 @@ public:
YT_VERIFY(strong_ptr);
DoPingTransaction(pingableTx, *strong_ptr);
});
- *periodic = New<NConcurrency::TPeriodicExecutor>(PingerPool_->GetInvoker(), pingRoutine, opts);
+ *periodic = New<NConcurrency::TPeriodicExecutor>(ThreadPool_->GetInvoker(), pingRoutine, opts);
(*periodic)->Start();
auto guard = Guard(SpinLock_);
@@ -86,12 +88,23 @@ public:
periodic = std::move(it->second);
Transactions_.erase(it);
}
- NConcurrency::WaitUntilSet((*periodic)->Stop());
+ YT_UNUSED_FUTURE((*periodic)->Stop());
+ }
+
+ TFuture<void> AsyncAbortTransaction(const TTransactionId& transactionId) override
+ {
+ auto result = BIND([rawClient = RawClient_, transactionId = transactionId] {
+ TMutationId mutationId;
+ rawClient->AbortTransaction(mutationId, transactionId);
+ })
+ .AsyncVia(ThreadPool_->GetInvoker())
+ .Run();
+
+ return result;
}
private:
- void DoPingTransaction(const TPingableTransaction& pingableTx,
- NConcurrency::TPeriodicExecutorPtr periodic)
+ void DoPingTransaction(const TPingableTransaction& pingableTx, NConcurrency::TPeriodicExecutorPtr periodic)
{
try {
pingableTx.Ping();
@@ -114,16 +127,18 @@ private:
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_;
- NConcurrency::IThreadPoolPtr PingerPool_;
+ NConcurrency::IThreadPoolPtr ThreadPool_;
+ IInvokerPtr Invoker_;
+ IRawClientPtr RawClient_;
};
////////////////////////////////////////////////////////////////////////////////
-ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config)
+ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config, IRawClientPtr rawClient)
{
YT_LOG_DEBUG("Using async transaction pinger");
- return MakeIntrusive<TSharedTransactionPinger>(config->AsyncTxPingerPoolThreads);
+ return MakeIntrusive<TSharedTransactionPinger>(config->AsyncTxPingerPoolThreads, std::move(rawClient));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/transaction_pinger.h b/yt/cpp/mapreduce/client/transaction_pinger.h
index 98e8b5cb2f8..2fa8ec5c9bc 100644
--- a/yt/cpp/mapreduce/client/transaction_pinger.h
+++ b/yt/cpp/mapreduce/client/transaction_pinger.h
@@ -13,6 +13,10 @@ namespace NYT {
class TPingableTransaction;
+// Don't want to include public.h to avoid polluting header with TIntrusivePtr
+template <typename>
+class TFuture;
+
////////////////////////////////////////////////////////////////////////////////
// Each registered transaction must be removed from pinger
@@ -30,9 +34,11 @@ public:
virtual bool HasTransaction(const TPingableTransaction& pingableTx) = 0;
virtual void RemoveTransaction(const TPingableTransaction& pingableTx) = 0;
+
+ virtual TFuture<void> AsyncAbortTransaction(const TTransactionId& transactionId) = 0;
};
-ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config);
+ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config, IRawClientPtr rawClient);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index 44c25079d62..8e8febf44b3 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -210,18 +210,8 @@ void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite
TMutationId THttpHeader::AddMutationId()
{
- TGUID guid;
-
- // Some users use `fork()' with yt wrapper
- // (actually they use python + multiprocessing)
- // and CreateGuid is not resistant to `fork()', so spice it a little bit.
- //
- // Check IGNIETFERRO-610
- CreateGuid(&guid);
- guid.dw[2] = GetPID() ^ MicroSeconds();
-
+ TMutationId guid = NDetail::GenerateMutationId();
AddParameter("mutation_id", GetGuidAsString(guid), true);
-
return guid;
}
diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp
index bf63a7ae3bb..8ef2a349a3e 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/http_client/raw_client.cpp
@@ -65,6 +65,17 @@ void CheckError(const TString& requestId, NHttp::IResponsePtr response)
}
}
+void SetMutationId(TNode& params, TMutationId& mutationId)
+{
+ if (mutationId.IsEmpty()) {
+ params["retry"] = false;
+ mutationId = GenerateMutationId();
+ } else {
+ params["retry"] = true;
+ }
+ params["mutation_id"] = GetGuidAsString(mutationId);
+}
+
} // anonymous namespace
////////////////////////////////////////////////////////////////////////////////
@@ -308,68 +319,20 @@ TTransactionId THttpRawClient::StartTransaction(
void THttpRawClient::PingTransaction(const TTransactionId& transactionId)
{
- auto traceContext = Context_.Config->EnableClientTracing
- ? NTracing::CreateTraceContextFromCurrent("ping_tx")
- : nullptr;
- NTracing::TCurrentTraceContextGuard traceContextGuard(traceContext);
-
- std::call_once(PingClientInitOnceFlag_, [this] () {
- InitPingClient();
- });
-
- auto url = TString::Join(Context_.UseTLS ? "https://" : "http://", Context_.ServerName, "/api/", Context_.Config->ApiVersion, "/ping_tx");
- auto headers = New<NHttp::THeaders>();
- auto requestId = CreateGuidAsString();
-
- headers->Add("Host", url);
- headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
-
- if (const auto& serviceTicketAuth = Context_.ServiceTicketAuth) {
- const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket();
- headers->Add("X-Ya-Service-Ticket", serviceTicket);
- } else if (const auto& token = Context_.Token; !token.empty()) {
- headers->Add("Authorization", "OAuth " + token);
- }
-
- headers->Add("Transfer-Encoding", "chunked");
- headers->Add("X-YT-Correlation-Id", requestId);
- headers->Add("X-YT-Header-Format", "<format=text>yson");
- headers->Add("Content-Encoding", "identity");
- headers->Add("Accept-Encoding", "identity");
-
- if (traceContext) {
- auto traceparent = FormatTraceParentHeader(traceContext->GetTraceId(), traceContext->GetSpanId());
- headers->Add("traceparent", traceparent);
- }
-
TNode node;
node["transaction_id"] = GetGuidAsString(transactionId);
auto strParams = NodeToYsonString(node);
- YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)",
- requestId,
- Context_.ServerName,
- url,
- strParams);
-
- auto response = NConcurrency::WaitFor(PingHttpClient_->Post(url, TSharedRef::FromString(strParams), headers))
- .ValueOrThrow();
- CheckError(requestId, response);
-
- YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)",
- requestId,
- response->ReadAll().size(),
- strParams);
+ PostAsync("ping_tx", node);
}
void THttpRawClient::AbortTransaction(
TMutationId& mutationId,
const TTransactionId& transactionId)
{
- THttpHeader header("POST", "abort_tx");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+ TNode params = NRawClient::SerializeParamsForAbortTransaction(transactionId);
+ SetMutationId(params, mutationId);
+ PostAsync("abort_tx", params);
}
void THttpRawClient::CommitTransaction(
@@ -1257,7 +1220,7 @@ IRawClientPtr THttpRawClient::Clone(const TClientContext& context)
return ::MakeIntrusive<THttpRawClient>(context);
}
-void THttpRawClient::InitPingClient() {
+void THttpRawClient::InitAsyncClient() {
auto httpPoller = NConcurrency::CreateThreadPoolPoller(
Context_.Config->AsyncHttpClientThreads,
"tx_http_client_poller");
@@ -1265,14 +1228,68 @@ void THttpRawClient::InitPingClient() {
if (Context_.UseTLS) {
auto httpsClientConfig = NYT::New<NHttps::TClientConfig>();
httpsClientConfig->MaxIdleConnections = 16;
- PingHttpClient_ = NHttps::CreateClient(std::move(httpsClientConfig), std::move(httpPoller));
+ AsyncHttpClient_ = NHttps::CreateClient(std::move(httpsClientConfig), std::move(httpPoller));
} else {
auto httpClientConfig = NYT::New<NHttp::TClientConfig>();
httpClientConfig->MaxIdleConnections = 16;
- PingHttpClient_ = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller));
+ AsyncHttpClient_ = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller));
}
}
+void THttpRawClient::PostAsync(const TString& command, TNode params)
+{
+ auto traceContext = Context_.Config->EnableClientTracing
+ ? NTracing::CreateTraceContextFromCurrent("ping_tx")
+ : nullptr;
+ NTracing::TCurrentTraceContextGuard traceContextGuard(traceContext);
+
+ std::call_once(AsyncHttpClientInitOnceFlag_, [this] () {
+ InitAsyncClient();
+ });
+
+ auto url = TString::Join(Context_.UseTLS ? "https://" : "http://", Context_.ServerName, "/api/", Context_.Config->ApiVersion, "/", command);
+ auto headers = New<NHttp::THeaders>();
+ auto requestId = CreateGuidAsString();
+
+ headers->Add("Host", url);
+ headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
+
+ if (const auto& serviceTicketAuth = Context_.ServiceTicketAuth) {
+ const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket();
+ headers->Add("X-Ya-Service-Ticket", serviceTicket);
+ } else if (const auto& token = Context_.Token; !token.empty()) {
+ headers->Add("Authorization", "OAuth " + token);
+ }
+
+ headers->Add("Transfer-Encoding", "chunked");
+ headers->Add("X-YT-Correlation-Id", requestId);
+ headers->Add("X-YT-Header-Format", "<format=text>yson");
+ headers->Add("Content-Encoding", "identity");
+ headers->Add("Accept-Encoding", "identity");
+
+ if (traceContext) {
+ auto traceparent = FormatTraceParentHeader(traceContext->GetTraceId(), traceContext->GetSpanId());
+ headers->Add("traceparent", traceparent);
+ }
+
+ auto strParams = NodeToYsonString(params);
+
+ YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)",
+ requestId,
+ Context_.ServerName,
+ url,
+ strParams);
+
+ auto response = NConcurrency::WaitFor(AsyncHttpClient_->Post(url, TSharedRef::FromString(strParams), headers))
+ .ValueOrThrow();
+ CheckError(requestId, response);
+
+ YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)",
+ requestId,
+ response->ReadAll().size(),
+ strParams);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h
index d2a503605ff..363c50b9e21 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.h
+++ b/yt/cpp/mapreduce/http_client/raw_client.h
@@ -408,12 +408,13 @@ public:
IRawClientPtr Clone(const TClientContext& context) override;
private:
- void InitPingClient();
+ void InitAsyncClient();
+ void PostAsync(const TString& command, TNode params);
private:
const TClientContext Context_;
- NHttp::IClientPtr PingHttpClient_;
- std::once_flag PingClientInitOnceFlag_;
+ NHttp::IClientPtr AsyncHttpClient_;
+ std::once_flag AsyncHttpClientInitOnceFlag_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/interface/common.cpp b/yt/cpp/mapreduce/interface/common.cpp
index fdb3e66b191..7c54fd65d09 100644
--- a/yt/cpp/mapreduce/interface/common.cpp
+++ b/yt/cpp/mapreduce/interface/common.cpp
@@ -13,6 +13,10 @@
#include <util/generic/xrange.h>
+#include <util/datetime/base.h>
+
+#include <util/system/getpid.h>
+
namespace NYT {
using ::google::protobuf::Descriptor;
@@ -762,6 +766,21 @@ TString ToString(EValueType type)
ythrow yexception() << "Invalid value type " << static_cast<int>(type);
}
+TMutationId GenerateMutationId()
+{
+ TGUID guid;
+
+ // Some users use `fork()' with yt wrapper
+ // (actually they use python + multiprocessing)
+ // and CreateGuid is not resistant to `fork()', so spice it a little bit.
+ //
+ // Check IGNIETFERRO-610
+ CreateGuid(&guid);
+ guid.dw[2] = GetPID() ^ MicroSeconds();
+
+ return guid;
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NDetail
diff --git a/yt/cpp/mapreduce/interface/common.h b/yt/cpp/mapreduce/interface/common.h
index 7f09fe4c004..d0339f7baf8 100644
--- a/yt/cpp/mapreduce/interface/common.h
+++ b/yt/cpp/mapreduce/interface/common.h
@@ -1402,6 +1402,8 @@ namespace NDetail {
// TODO: we should use default GENERATE_ENUM_SERIALIZATION
TString ToString(EValueType type);
+TMutationId GenerateMutationId();
+
} // namespace NDetail
/// @endcond