diff options
author | Vitaly Isaev <vitalyisaev@ydb.tech> | 2024-12-12 15:39:00 +0000 |
---|---|---|
committer | Vitaly Isaev <vitalyisaev@ydb.tech> | 2024-12-12 15:39:00 +0000 |
commit | 827b115675004838023427572a7c69f40a86a80a (patch) | |
tree | e99c953fe494b9de8d8597a15859d77c81f118c7 /yt/cpp/mapreduce/client/client.cpp | |
parent | 42701242eaf5be980cb935631586d0e90b82641c (diff) | |
parent | fab222fd8176d00eee5ddafc6bce8cb95a6e3ab0 (diff) | |
download | ydb-827b115675004838023427572a7c69f40a86a80a.tar.gz |
Merge branch 'rightlib' into rightlib_20241212
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 37 |
1 files changed, 29 insertions, 8 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 43e3864ae2..a2d302212f 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -42,6 +42,7 @@ #include <yt/cpp/mapreduce/library/table_schema/protobuf.h> +#include <yt/cpp/mapreduce/raw_client/raw_client.h> #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h> @@ -90,10 +91,12 @@ void ApplyProxyUrlAliasingRules(TString& url) //////////////////////////////////////////////////////////////////////////////// TClientBase::TClientBase( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& transactionId, IClientRetryPolicyPtr retryPolicy) - : Context_(context) + : RawClient_(std::move(rawClient)) + , Context_(context) , TransactionId_(transactionId) , ClientRetryPolicy_(std::move(retryPolicy)) { } @@ -101,7 +104,7 @@ TClientBase::TClientBase( ITransactionPtr TClientBase::StartTransaction( const TStartTransactionOptions& options) { - return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options); + return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options); } TNodeId TClientBase::Create( @@ -138,7 +141,11 @@ void TClientBase::Set( const TNode& value, const TSetOptions& options) { - NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &value, &options] (TMutationId& mutationId) { + RawClient_->Set(mutationId, TransactionId_, path, value, options); + }); } void TClientBase::MultisetAttributes( @@ -826,6 +833,11 @@ IClientPtr TClientBase::GetParentClient() return GetParentClientImpl(); } +IRawClientPtr TClientBase::GetRawClient() const +{ + return RawClient_; +} + const TClientContext& TClientBase::GetContext() const { return Context_; @@ -839,11 +851,12 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const //////////////////////////////////////////////////////////////////////////////// TTransaction::TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options) - : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy()) + : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( MakeHolder<TPingableTransaction>( @@ -858,11 +871,12 @@ TTransaction::TTransaction( } TTransaction::TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, const TAttachTransactionOptions& options) - : TClientBase(context, transactionId, parentClient->GetRetryPolicy()) + : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( new TPingableTransaction( @@ -928,10 +942,11 @@ TClientPtr TTransaction::GetParentClientImpl() //////////////////////////////////////////////////////////////////////////////// TClient::TClient( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& globalId, IClientRetryPolicyPtr retryPolicy) - : TClientBase(context, globalId, retryPolicy) + : TClientBase(std::move(rawClient), context, globalId, retryPolicy) , TransactionPinger_(nullptr) { } @@ -943,7 +958,7 @@ ITransactionPtr TClient::AttachTransaction( { CheckShutdown(); - return MakeIntrusive<TTransaction>(this, Context_, transactionId, options); + return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options); } void TClient::MountTable( @@ -1435,9 +1450,15 @@ TClientPtr CreateClientImpl( retryConfigProvider = CreateDefaultRetryConfigProvider(); } + auto rawClient = MakeIntrusive<THttpRawClient>(context); + EnsureInitialized(); - return new TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config)); + return new TClient( + std::move(rawClient), + context, + globalTxId, + CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config)); } //////////////////////////////////////////////////////////////////////////////// |