From 09c88b035d29fac5fd49de2fbc3c71e2d2a80754 Mon Sep 17 00:00:00 2001 From: hiddenpath Date: Fri, 13 Dec 2024 15:22:36 +0300 Subject: yt/cpp/mapreduce: move Get, TryGet, Exists, MultisetAttributes to THttpRawClient commit_hash:bd2228f98fa92de408ca850f9bc1608fdf99e7f5 --- yt/cpp/mapreduce/client/client.cpp | 60 +++++++++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 13 deletions(-) (limited to 'yt/cpp/mapreduce/client/client.cpp') diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index a2d302212f1..24aed7fb77f 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -126,14 +126,22 @@ bool TClientBase::Exists( const TYPath& path, const TExistsOptions& options) { - return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options); + return RequestWithRetry( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &options] (TMutationId& mutationId) { + return RawClient_->Exists(mutationId, TransactionId_, path, options); + }); } TNode TClientBase::Get( const TYPath& path, const TGetOptions& options) { - return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options); + return RequestWithRetry( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &options] (TMutationId& mutationId) { + return RawClient_->Get(mutationId, TransactionId_, path, options); + }); } void TClientBase::Set( @@ -149,12 +157,17 @@ void TClientBase::Set( } void TClientBase::MultisetAttributes( - const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options) + const TYPath& path, + const TNode::TMapType& value, + const TMultisetAttributesOptions& options) { - NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options); + RequestWithRetry( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &value, &options] (TMutationId& mutationId) { + RawClient_->MultisetAttributes(mutationId, TransactionId_, path, value, options); + }); } - TNode::TListType TClientBase::List( const TYPath& path, const TListOptions& options) @@ -290,6 +303,7 @@ IFileReaderPtr TClientBase::CreateBlobTableReader( return new TBlobTableReader( path, key, + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -303,6 +317,7 @@ IFileReaderPtr TClientBase::CreateFileReader( { return new TFileReader( CanonizeYPath(path), + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -315,11 +330,18 @@ IFileWriterPtr TClientBase::CreateFileWriter( const TFileWriterOptions& options) { auto realPath = CanonizeYPath(path); - if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) { + + auto exists = RequestWithRetry( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &realPath] (TMutationId& mutationId) { + return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_); + }); + if (!exists) { NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE, TCreateOptions().IgnoreExisting(true)); } - return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options); + + return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options); } TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter( @@ -343,6 +365,7 @@ TRawTableWriterPtr TClientBase::CreateRawWriter( const TTableWriterOptions& options) { return ::MakeIntrusive( + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -673,7 +696,7 @@ void TClientBase::CompleteOperation(const TOperationId& operationId) void TClientBase::WaitForOperation(const TOperationId& operationId) { - NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId); + NYT::NDetail::WaitForOperation(ClientRetryPolicy_, RawClient_, Context_, operationId); } void TClientBase::AlterTable( @@ -691,6 +714,7 @@ void TClientBase::AlterTable( { return ::MakeIntrusive( CanonizeYPath(path), + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -706,12 +730,20 @@ THolder TClientBase::CreateClientWriter( const TTableWriterOptions& options) { auto realPath = CanonizeYPath(path); - if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) { + + auto exists = RequestWithRetry( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &realPath] (TMutationId& mutationId) { + return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_); + }); + if (!exists) { NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE, TCreateOptions().IgnoreExisting(true)); } + return MakeHolder( realPath, + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -851,15 +883,16 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const //////////////////////////////////////////////////////////////////////////////// TTransaction::TTransaction( - IRawClientPtr rawClient, + const IRawClientPtr& rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options) - : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy()) + : TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( MakeHolder( + rawClient, parentClient->GetRetryPolicy(), context, parentTransactionId, @@ -871,15 +904,16 @@ TTransaction::TTransaction( } TTransaction::TTransaction( - IRawClientPtr rawClient, + const IRawClientPtr& rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, const TAttachTransactionOptions& options) - : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy()) + : TClientBase(rawClient, context, transactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( new TPingableTransaction( + rawClient, parentClient->GetRetryPolicy(), context, transactionId, -- cgit v1.3