diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-12-13 10:22:13 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-13 10:22:13 +0000 |
commit | e73e490feb4e1f63d097697324aa48b643a62317 (patch) | |
tree | f63fe3d15819a5148ade51609c5211251d93e425 /yt/cpp/mapreduce/client | |
parent | 19346460a8060a0ed4731edb192745642ff34b3d (diff) | |
parent | 4dde77404d1eae4a633d1cc3807142409a9938eb (diff) | |
download | ydb-e73e490feb4e1f63d097697324aa48b643a62317.tar.gz |
Merge pull request #12582 from vitalyisaev2/rightlib_20241212
Merge rightlib 20241212
Diffstat (limited to 'yt/cpp/mapreduce/client')
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 37 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client.h | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.h | 1 |
4 files changed, 43 insertions, 9 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 43e3864ae2..a2d302212f 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -42,6 +42,7 @@ #include <yt/cpp/mapreduce/library/table_schema/protobuf.h> +#include <yt/cpp/mapreduce/raw_client/raw_client.h> #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h> @@ -90,10 +91,12 @@ void ApplyProxyUrlAliasingRules(TString& url) //////////////////////////////////////////////////////////////////////////////// TClientBase::TClientBase( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& transactionId, IClientRetryPolicyPtr retryPolicy) - : Context_(context) + : RawClient_(std::move(rawClient)) + , Context_(context) , TransactionId_(transactionId) , ClientRetryPolicy_(std::move(retryPolicy)) { } @@ -101,7 +104,7 @@ TClientBase::TClientBase( ITransactionPtr TClientBase::StartTransaction( const TStartTransactionOptions& options) { - return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options); + return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options); } TNodeId TClientBase::Create( @@ -138,7 +141,11 @@ void TClientBase::Set( const TNode& value, const TSetOptions& options) { - NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &value, &options] (TMutationId& mutationId) { + RawClient_->Set(mutationId, TransactionId_, path, value, options); + }); } void TClientBase::MultisetAttributes( @@ -826,6 +833,11 @@ IClientPtr TClientBase::GetParentClient() return GetParentClientImpl(); } +IRawClientPtr TClientBase::GetRawClient() const +{ + return RawClient_; +} + const TClientContext& TClientBase::GetContext() const { return Context_; @@ -839,11 +851,12 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const //////////////////////////////////////////////////////////////////////////////// TTransaction::TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options) - : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy()) + : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( MakeHolder<TPingableTransaction>( @@ -858,11 +871,12 @@ TTransaction::TTransaction( } TTransaction::TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, const TAttachTransactionOptions& options) - : TClientBase(context, transactionId, parentClient->GetRetryPolicy()) + : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( new TPingableTransaction( @@ -928,10 +942,11 @@ TClientPtr TTransaction::GetParentClientImpl() //////////////////////////////////////////////////////////////////////////////// TClient::TClient( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& globalId, IClientRetryPolicyPtr retryPolicy) - : TClientBase(context, globalId, retryPolicy) + : TClientBase(std::move(rawClient), context, globalId, retryPolicy) , TransactionPinger_(nullptr) { } @@ -943,7 +958,7 @@ ITransactionPtr TClient::AttachTransaction( { CheckShutdown(); - return MakeIntrusive<TTransaction>(this, Context_, transactionId, options); + return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options); } void TClient::MountTable( @@ -1435,9 +1450,15 @@ TClientPtr CreateClientImpl( retryConfigProvider = CreateDefaultRetryConfigProvider(); } + auto rawClient = MakeIntrusive<THttpRawClient>(context); + EnsureInitialized(); - return new TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config)); + return new TClient( + std::move(rawClient), + context, + globalTxId, + CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 32c458316d..2a0398d056 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -5,6 +5,7 @@ #include "transaction_pinger.h" #include <yt/cpp/mapreduce/interface/client.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> @@ -29,6 +30,7 @@ class TClientBase { public: TClientBase( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& transactionId, IClientRetryPolicyPtr retryPolicy); @@ -222,6 +224,8 @@ public: IClientPtr GetParentClient() override; + IRawClientPtr GetRawClient() const; + const TClientContext& GetContext() const; const IClientRetryPolicyPtr& GetRetryPolicy() const; @@ -232,6 +236,7 @@ protected: virtual TClientPtr GetParentClientImpl() = 0; protected: + const IRawClientPtr RawClient_; const TClientContext Context_; TTransactionId TransactionId_; IClientRetryPolicyPtr ClientRetryPolicy_; @@ -287,6 +292,7 @@ public: // // Start a new transaction. TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, @@ -295,6 +301,7 @@ public: // // Attach an existing transaction. TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, @@ -325,6 +332,7 @@ protected: TClientPtr GetParentClientImpl() override; private: + const IRawClientPtr RawClient_; ITransactionPingerPtr TransactionPinger_; THolder<TPingableTransaction> PingableTx_; TClientPtr ParentClient_; @@ -338,6 +346,7 @@ class TClient { public: TClient( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& globalId, IClientRetryPolicyPtr retryPolicy); diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index 07d00e88d6..a6d424c5a1 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -17,6 +17,7 @@ #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h> #include <yt/cpp/mapreduce/interface/error_codes.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> @@ -395,7 +396,8 @@ TJobPreparer::TJobPreparer( size_t outputTableCount, const TVector<TSmallJobFile>& smallFileList, const TOperationOptions& options) - : OperationPreparer_(operationPreparer) + : RawClient_(operationPreparer.GetClient()->GetRawClient()) + , OperationPreparer_(operationPreparer) , Spec_(spec) , Options_(options) , Layers_(spec.Layers_) @@ -631,6 +633,7 @@ TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& it CreateFileInCypress(cypressPath); auto uploadTx = MakeIntrusive<TTransaction>( + OperationPreparer_.GetClient()->GetRawClient(), OperationPreparer_.GetClient(), OperationPreparer_.GetContext(), TTransactionId(), diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h index 54c978c0fb..41594eb52e 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.h +++ b/yt/cpp/mapreduce/client/operation_preparer.h @@ -83,6 +83,7 @@ public: bool ShouldRedirectStdoutToStderr() const; private: + const IRawClientPtr RawClient_; TOperationPreparer& OperationPreparer_; TUserJobSpec Spec_; TOperationOptions Options_; |