diff options
Diffstat (limited to 'yt/cpp')
32 files changed, 376 insertions, 185 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index a2d302212f1..24aed7fb77f 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -126,14 +126,22 @@ bool TClientBase::Exists( const TYPath& path, const TExistsOptions& options) { - return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options); + return RequestWithRetry<bool>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &options] (TMutationId& mutationId) { + return RawClient_->Exists(mutationId, TransactionId_, path, options); + }); } TNode TClientBase::Get( const TYPath& path, const TGetOptions& options) { - return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options); + return RequestWithRetry<TNode>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &options] (TMutationId& mutationId) { + return RawClient_->Get(mutationId, TransactionId_, path, options); + }); } void TClientBase::Set( @@ -149,12 +157,17 @@ void TClientBase::Set( } void TClientBase::MultisetAttributes( - const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options) + const TYPath& path, + const TNode::TMapType& value, + const TMultisetAttributesOptions& options) { - NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &value, &options] (TMutationId& mutationId) { + RawClient_->MultisetAttributes(mutationId, TransactionId_, path, value, options); + }); } - TNode::TListType TClientBase::List( const TYPath& path, const TListOptions& options) @@ -290,6 +303,7 @@ IFileReaderPtr TClientBase::CreateBlobTableReader( return new TBlobTableReader( path, key, + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -303,6 +317,7 @@ IFileReaderPtr TClientBase::CreateFileReader( { return new TFileReader( CanonizeYPath(path), + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -315,11 +330,18 @@ IFileWriterPtr TClientBase::CreateFileWriter( const TFileWriterOptions& options) { auto realPath = CanonizeYPath(path); - if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) { + + auto exists = RequestWithRetry<bool>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &realPath] (TMutationId& mutationId) { + return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_); + }); + if (!exists) { NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE, TCreateOptions().IgnoreExisting(true)); } - return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options); + + return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options); } TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter( @@ -343,6 +365,7 @@ TRawTableWriterPtr TClientBase::CreateRawWriter( const TTableWriterOptions& options) { return ::MakeIntrusive<TRetryfulWriter>( + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -673,7 +696,7 @@ void TClientBase::CompleteOperation(const TOperationId& operationId) void TClientBase::WaitForOperation(const TOperationId& operationId) { - NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId); + NYT::NDetail::WaitForOperation(ClientRetryPolicy_, RawClient_, Context_, operationId); } void TClientBase::AlterTable( @@ -691,6 +714,7 @@ void TClientBase::AlterTable( { return ::MakeIntrusive<TClientReader>( CanonizeYPath(path), + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -706,12 +730,20 @@ THolder<TClientWriter> TClientBase::CreateClientWriter( const TTableWriterOptions& options) { auto realPath = CanonizeYPath(path); - if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) { + + auto exists = RequestWithRetry<bool>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &realPath] (TMutationId& mutationId) { + return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_); + }); + if (!exists) { NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE, TCreateOptions().IgnoreExisting(true)); } + return MakeHolder<TClientWriter>( realPath, + RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, @@ -851,15 +883,16 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const //////////////////////////////////////////////////////////////////////////////// TTransaction::TTransaction( - IRawClientPtr rawClient, + const IRawClientPtr& rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options) - : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy()) + : TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( MakeHolder<TPingableTransaction>( + rawClient, parentClient->GetRetryPolicy(), context, parentTransactionId, @@ -871,15 +904,16 @@ TTransaction::TTransaction( } TTransaction::TTransaction( - IRawClientPtr rawClient, + const IRawClientPtr& rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, const TAttachTransactionOptions& options) - : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy()) + : TClientBase(rawClient, context, transactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( new TPingableTransaction( + rawClient, parentClient->GetRetryPolicy(), context, transactionId, diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 8a29997303b..a3ba768975c 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -292,7 +292,7 @@ public: // // Start a new transaction. TTransaction( - IRawClientPtr rawClient, + const IRawClientPtr& rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, @@ -301,7 +301,7 @@ public: // // Attach an existing transaction. TTransaction( - IRawClientPtr rawClient, + const IRawClientPtr& rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 0efa06ed3af..07f1ef0ad6b 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -8,6 +8,10 @@ #include <yt/cpp/mapreduce/common/retry_lib.h> #include <yt/cpp/mapreduce/common/wait_proxy.h> +#include <yt/cpp/mapreduce/http/helpers.h> +#include <yt/cpp/mapreduce/http/requests.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/tvm.h> @@ -16,10 +20,7 @@ #include <yt/cpp/mapreduce/io/helpers.h> #include <yt/cpp/mapreduce/io/yamr_table_reader.h> -#include <yt/cpp/mapreduce/http/helpers.h> -#include <yt/cpp/mapreduce/http/requests.h> -#include <yt/cpp/mapreduce/http/retry_request.h> - +#include <yt/cpp/mapreduce/raw_client/raw_client.h> #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <library/cpp/yson/node/serialize.h> @@ -38,6 +39,7 @@ using ::ToString; TClientReader::TClientReader( const TRichYPath& path, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, @@ -46,6 +48,7 @@ TClientReader::TClientReader( const TTableReaderOptions& options, bool useFormatFromTableAttributes) : Path_(path) + , RawClient_(rawClient) , ClientRetryPolicy_(std::move(clientRetryPolicy)) , Context_(context) , ParentTransactionId_(transactionId) @@ -56,12 +59,14 @@ TClientReader::TClientReader( if (options.CreateTransaction_) { Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null"); ReadTransaction_ = MakeHolder<TPingableTransaction>( + RawClient_, ClientRetryPolicy_, Context_, transactionId, transactionPinger->GetChildTxPinger(), TStartTransactionOptions()); Path_.Path(Snapshot( + RawClient_, ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), @@ -70,7 +75,7 @@ TClientReader::TClientReader( if (useFormatFromTableAttributes) { auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_; - auto newFormat = GetTableFormat(ClientRetryPolicy_, Context_, transactionId2, Path_); + auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, transactionId2, Path_); if (newFormat) { Format_->Config = *newFormat; } diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h index 782edb77b7a..61bc6983405 100644 --- a/yt/cpp/mapreduce/client/client_reader.h +++ b/yt/cpp/mapreduce/client/client_reader.h @@ -21,6 +21,7 @@ class TClientReader public: TClientReader( const TRichYPath& path, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, @@ -43,8 +44,11 @@ protected: private: TRichYPath Path_; + + const IRawClientPtr RawClient_; const IClientRetryPolicyPtr ClientRetryPolicy_; const TClientContext Context_; + TTransactionId ParentTransactionId_; TMaybe<TFormat> Format_; TTableReaderOptions Options_; diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp index ee14ee0eb2c..84e0802ee23 100644 --- a/yt/cpp/mapreduce/client/client_writer.cpp +++ b/yt/cpp/mapreduce/client/client_writer.cpp @@ -1,19 +1,23 @@ #include "client_writer.h" #include "retryful_writer.h" -#include "retryless_writer.h" #include "retryful_writer_v2.h" +#include "retryless_writer.h" -#include <yt/cpp/mapreduce/interface/io.h> #include <yt/cpp/mapreduce/common/fwd.h> #include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/interface/io.h> + +#include <yt/cpp/mapreduce/raw_client/raw_client.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// TClientWriter::TClientWriter( const TRichYPath& path, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, @@ -38,6 +42,7 @@ TClientWriter::TClientWriter( auto serializedWriterOptions = FormIORequestParameters(options); RawWriter_ = MakeIntrusive<NPrivate::TRetryfulWriterV2>( + rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, @@ -50,6 +55,7 @@ TClientWriter::TClientWriter( options.CreateTransaction_); } else { RawWriter_.Reset(new TRetryfulWriter( + rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h index 19a49778172..b3aec26a59c 100644 --- a/yt/cpp/mapreduce/client/client_writer.h +++ b/yt/cpp/mapreduce/client/client_writer.h @@ -17,6 +17,7 @@ class TClientWriter public: TClientWriter( const TRichYPath& path, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index 57131e5330b..73d5de21f11 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -19,6 +19,7 @@ #include <yt/cpp/mapreduce/http/http_client.h> #include <yt/cpp/mapreduce/http/retry_request.h> +#include <yt/cpp/mapreduce/raw_client/raw_client.h> #include <yt/cpp/mapreduce/raw_client/raw_requests.h> namespace NYT { @@ -39,13 +40,16 @@ static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) { //////////////////////////////////////////////////////////////////////////////// TStreamReaderBase::TStreamReaderBase( + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& transactionId) - : Context_(context) + : RawClient_(rawClient) + , Context_(context) , ClientRetryPolicy_(std::move(clientRetryPolicy)) , ReadTransaction_(MakeHolder<TPingableTransaction>( + RawClient_, ClientRetryPolicy_, context, transactionId, @@ -57,7 +61,7 @@ TStreamReaderBase::~TStreamReaderBase() = default; TYPath TStreamReaderBase::Snapshot(const TYPath& path) { - return NYT::Snapshot(ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path); + return NYT::Snapshot(RawClient_, ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path); } TString TStreamReaderBase::GetActiveRequestId() const @@ -119,12 +123,13 @@ size_t TStreamReaderBase::DoRead(void* buf, size_t len) TFileReader::TFileReader( const TRichYPath& path, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& transactionId, const TFileReaderOptions& options) - : TStreamReaderBase(std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId) + : TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId) , FileReaderOptions_(options) , Path_(path) , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0)) @@ -183,12 +188,13 @@ NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context TBlobTableReader::TBlobTableReader( const TYPath& path, const TKey& key, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr retryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& transactionId, const TBlobTableReaderOptions& options) - : TStreamReaderBase(std::move(retryPolicy), std::move(transactionPinger), context, transactionId) + : TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId) , Key_(key) , Options_(options) { diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h index d850008a312..48248696d34 100644 --- a/yt/cpp/mapreduce/client/file_reader.h +++ b/yt/cpp/mapreduce/client/file_reader.h @@ -22,6 +22,7 @@ class TStreamReaderBase { public: TStreamReaderBase( + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, @@ -33,6 +34,7 @@ protected: TYPath Snapshot(const TYPath& path); protected: + const IRawClientPtr RawClient_; const TClientContext Context_; private: @@ -60,6 +62,7 @@ class TFileReader public: TFileReader( const TRichYPath& path, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, @@ -86,6 +89,7 @@ public: TBlobTableReader( const TYPath& path, const TKey& key, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, diff --git a/yt/cpp/mapreduce/client/file_writer.cpp b/yt/cpp/mapreduce/client/file_writer.cpp index d273731e1d6..fcbb02bec92 100644 --- a/yt/cpp/mapreduce/client/file_writer.cpp +++ b/yt/cpp/mapreduce/client/file_writer.cpp @@ -1,9 +1,12 @@ #include "file_writer.h" -#include <yt/cpp/mapreduce/io/helpers.h> +#include <yt/cpp/mapreduce/common/helpers.h> + #include <yt/cpp/mapreduce/interface/finish_or_die.h> -#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/io/helpers.h> + +#include <yt/cpp/mapreduce/raw_client/raw_client.h> namespace NYT { @@ -11,6 +14,7 @@ namespace NYT { TFileWriter::TFileWriter( const TRichYPath& path, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, @@ -18,6 +22,7 @@ TFileWriter::TFileWriter( const TFileWriterOptions& options) : AutoFinish_(options.AutoFinish_) , RetryfulWriter_( + rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, diff --git a/yt/cpp/mapreduce/client/file_writer.h b/yt/cpp/mapreduce/client/file_writer.h index ae143a75bee..67246bfe25c 100644 --- a/yt/cpp/mapreduce/client/file_writer.h +++ b/yt/cpp/mapreduce/client/file_writer.h @@ -16,6 +16,7 @@ class TFileWriter public: TFileWriter( const TRichYPath& path, + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 005bb507fa5..5bb67e3059f 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -319,6 +319,7 @@ TSimpleOperationIo CreateSimpleOperationIo( TOperationPreparationContext( inputs, outputs, + preparer.GetClient()->GetRawClient(), preparer.GetContext(), preparer.GetClientRetryPolicy(), preparer.GetTransactionId()), @@ -491,6 +492,7 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( TOperationPreparationContext( structuredInputs, structuredOutputs, + preparer.GetClient()->GetRawClient(), preparer.GetContext(), preparer.GetClientRetryPolicy(), preparer.GetTransactionId()), @@ -499,7 +501,12 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( hints); TVector<TSmallJobFile> formatConfigList; - TFormatBuilder formatBuilder(preparer.GetClientRetryPolicy(), preparer.GetContext(), preparer.GetTransactionId(), options); + TFormatBuilder formatBuilder( + preparer.GetClient()->GetRawClient(), + preparer.GetClientRetryPolicy(), + preparer.GetContext(), + preparer.GetTransactionId(), + options); auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat( structuredJob, @@ -587,11 +594,12 @@ EOperationBriefState CheckOperation( void WaitForOperation( const IClientRetryPolicyPtr& clientRetryPolicy, + const IRawClientPtr& rawClient, const TClientContext& context, const TOperationId& operationId) { const TDuration checkOperationStateInterval = - UseLocalModeOptimization(context, clientRetryPolicy) + UseLocalModeOptimization(rawClient, context, clientRetryPolicy) ? Min(TDuration::MilliSeconds(100), context.Config->OperationTrackerPollPeriod) : context.Config->OperationTrackerPollPeriod; @@ -1020,13 +1028,16 @@ void CheckInputTablesExist( Y_ENSURE(!paths.empty(), "Input tables are not set"); for (auto& path : paths) { auto curTransactionId = path.TransactionId_.GetOrElse(preparer.GetTransactionId()); + auto exists = RequestWithRetry<bool>( + preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), + [&preparer, &curTransactionId, &path] (TMutationId& mutationId) { + return preparer.GetClient()->GetRawClient()->Exists( + mutationId, + curTransactionId, + path.Path_); + }); Y_ENSURE_EX( - path.Cluster_.Defined() || - Exists( - preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - preparer.GetContext(), - curTransactionId, - path.Path_), + path.Cluster_.Defined() || exists, TApiUsageError() << "Input table '" << path.Path_ << "' doesn't exist"); } } @@ -1633,6 +1644,7 @@ void ExecuteMapReduce( VerifyHasElements(structuredInputs, "inputs"); TFormatBuilder formatBuilder( + preparer->GetClient()->GetRawClient(), preparer->GetClientRetryPolicy(), preparer->GetContext(), preparer->GetTransactionId(), @@ -1657,6 +1669,7 @@ void ExecuteMapReduce( TOperationPreparationContext( structuredInputs, mapperOutput, + preparer->GetClient()->GetRawClient(), preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), @@ -1726,6 +1739,7 @@ void ExecuteMapReduce( TOperationPreparationContext( inputs, outputs, + preparer->GetClient()->GetRawClient(), preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), @@ -1791,6 +1805,7 @@ void ExecuteMapReduce( TOperationPreparationContext( structuredInputs, structuredOutputs, + preparer->GetClient()->GetRawClient(), preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), diff --git a/yt/cpp/mapreduce/client/operation.h b/yt/cpp/mapreduce/client/operation.h index 141161b0b72..f866c739361 100644 --- a/yt/cpp/mapreduce/client/operation.h +++ b/yt/cpp/mapreduce/client/operation.h @@ -185,6 +185,7 @@ EOperationBriefState CheckOperation( void WaitForOperation( const IClientRetryPolicyPtr& clientRetryPolicy, + const IRawClientPtr& rawClient, const TClientContext& context, const TOperationId& operationId); diff --git a/yt/cpp/mapreduce/client/operation_helpers.cpp b/yt/cpp/mapreduce/client/operation_helpers.cpp index abb21856622..f307efaffea 100644 --- a/yt/cpp/mapreduce/client/operation_helpers.cpp +++ b/yt/cpp/mapreduce/client/operation_helpers.cpp @@ -2,15 +2,17 @@ #include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/requests.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + #include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/cpp/mapreduce/raw_client/raw_requests.h> -#include <yt/cpp/mapreduce/http/context.h> -#include <yt/cpp/mapreduce/http/requests.h> - #include <util/string/builder.h> #include <util/system/mutex.h> @@ -26,7 +28,10 @@ ui64 RoundUpFileSize(ui64 size) return (size + roundUpTo - 1) & ~(roundUpTo - 1); } -bool UseLocalModeOptimization(const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy) +bool UseLocalModeOptimization( + const IRawClientPtr& rawClient, + const TClientContext& context, + const IClientRetryPolicyPtr& clientRetryPolicy) { if (!context.Config->EnableLocalModeOptimization) { return false; @@ -47,18 +52,26 @@ bool UseLocalModeOptimization(const TClientContext& context, const IClientRetryP TString localModeAttr("//sys/@local_mode_fqdn"); // We don't want to pollute logs with errors about failed request, // so we check if path exists before getting it. - if (NRawClient::Exists(clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - TTransactionId(), - localModeAttr, - TExistsOptions().ReadFrom(EMasterReadKind::Cache))) + auto exists = RequestWithRetry<bool>( + clientRetryPolicy->CreatePolicyForGenericRequest(), + [&rawClient, &localModeAttr] (TMutationId& mutationId) { + return rawClient->Exists( + mutationId, + TTransactionId(), + localModeAttr, + TExistsOptions().ReadFrom(EMasterReadKind::Cache)); + }); + if (exists) { - auto fqdnNode = NRawClient::TryGet( + auto fqdnNode = RequestWithRetry<TNode>( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - TTransactionId(), - localModeAttr, - TGetOptions().ReadFrom(EMasterReadKind::Cache)); + [&rawClient, &localModeAttr] (TMutationId& mutationId) { + return rawClient->TryGet( + mutationId, + TTransactionId(), + localModeAttr, + TGetOptions().ReadFrom(EMasterReadKind::Cache)); + }); if (!fqdnNode.IsUndefined()) { auto fqdn = fqdnNode.AsString(); isLocalMode = (fqdn == TProcessState::Get()->FqdnHostName); diff --git a/yt/cpp/mapreduce/client/operation_helpers.h b/yt/cpp/mapreduce/client/operation_helpers.h index 7fd2ffb0de7..f7d25300b1a 100644 --- a/yt/cpp/mapreduce/client/operation_helpers.h +++ b/yt/cpp/mapreduce/client/operation_helpers.h @@ -11,7 +11,10 @@ namespace NYT::NDetail { ui64 RoundUpFileSize(ui64 size); -bool UseLocalModeOptimization(const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy); +bool UseLocalModeOptimization( + const IRawClientPtr& rawClient, + const TClientContext& context, + const IClientRetryPolicyPtr& clientRetryPolicy); TString GetOperationWebInterfaceUrl(TStringBuf serverName, TOperationId operationId); diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index a6d424c5a1d..eb7264425cf 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -142,6 +142,7 @@ TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transac : Client_(std::move(client)) , TransactionId_(transactionId) , FileTransaction_(MakeHolder<TPingableTransaction>( + Client_->GetRawClient(), Client_->GetRetryPolicy(), Client_->GetContext(), TransactionId_, @@ -609,6 +610,7 @@ TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) cons { TFileWriter writer( uniquePath, + OperationPreparer_.GetClient()->GetRawClient(), OperationPreparer_.GetClientRetryPolicy(), OperationPreparer_.GetClient()->GetTransactionPinger(), OperationPreparer_.GetContext(), @@ -746,23 +748,29 @@ TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const void TJobPreparer::UseFileInCypress(const TRichYPath& file) { - if (!Exists( + auto exists = RequestWithRetry<bool>( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - OperationPreparer_.GetContext(), - file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()), - file.Path_)) + [this, &file] (TMutationId& mutationId) { + return RawClient_->Exists( + mutationId, + file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()), + file.Path_); + }); + if (!exists) { ythrow yexception() << "File " << file.Path_ << " does not exist"; } if (ShouldMountSandbox()) { - auto size = Get( + auto size = RequestWithRetry<i64>( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - OperationPreparer_.GetContext(), - file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()), - file.Path_ + "/@uncompressed_data_size") - .AsInt64(); - + [this, &file] (TMutationId& mutationId) { + return RawClient_->Get( + mutationId, + file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()), + file.Path_ + "/@uncompressed_data_size") + .AsInt64(); + }); TotalFileSize_ += RoundUpFileSize(static_cast<ui64>(size)); } CypressFiles_.push_back(file); @@ -835,7 +843,10 @@ void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile) bool TJobPreparer::IsLocalMode() const { - return UseLocalModeOptimization(OperationPreparer_.GetContext(), OperationPreparer_.GetClientRetryPolicy()); + return UseLocalModeOptimization( + OperationPreparer_.GetClient()->GetRawClient(), + OperationPreparer_.GetContext(), + OperationPreparer_.GetClientRetryPolicy()); } void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState) diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp index cd775f53577..f78d15f3e7e 100644 --- a/yt/cpp/mapreduce/client/prepare_operation.cpp +++ b/yt/cpp/mapreduce/client/prepare_operation.cpp @@ -2,6 +2,9 @@ #include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/interface/serialize.h> #include <yt/cpp/mapreduce/raw_client/raw_requests.h> @@ -16,10 +19,12 @@ namespace NYT::NDetail { TOperationPreparationContext::TOperationPreparationContext( const TStructuredJobTableList& structuredInputs, const TStructuredJobTableList& structuredOutputs, + const IRawClientPtr& rawClient, const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId) - : Context_(context) + : RawClient_(rawClient) + , Context_(context) , RetryPolicy_(retryPolicy) , TransactionId_(transactionId) , InputSchemas_(structuredInputs.size()) @@ -38,10 +43,12 @@ TOperationPreparationContext::TOperationPreparationContext( TOperationPreparationContext::TOperationPreparationContext( TVector<TRichYPath> inputs, TVector<TRichYPath> outputs, + const IRawClientPtr& rawClient, const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId) - : Context_(context) + : RawClient_(rawClient) + , Context_(context) , RetryPolicy_(retryPolicy) , TransactionId_(transactionId) , InputSchemas_(inputs.size()) @@ -99,11 +106,11 @@ const TTableSchema& TOperationPreparationContext::GetInputSchema(int index) cons auto& schema = InputSchemas_[index]; if (!InputSchemasLoaded_[index]) { Y_ABORT_UNLESS(Inputs_[index]); - auto schemaNode = NRawClient::Get( + auto schemaNode = RequestWithRetry<TNode>( RetryPolicy_->CreatePolicyForGenericRequest(), - Context_, - TransactionId_, - Inputs_[index]->Path_ + "/@schema"); + [this, &index] (TMutationId& mutationId) { + return RawClient_->Get(mutationId, TransactionId_, Inputs_[index]->Path_ + "/@schema"); + }); Deserialize(schema, schemaNode); } return schema; diff --git a/yt/cpp/mapreduce/client/prepare_operation.h b/yt/cpp/mapreduce/client/prepare_operation.h index 3b64aa28565..3fde1d1678e 100644 --- a/yt/cpp/mapreduce/client/prepare_operation.h +++ b/yt/cpp/mapreduce/client/prepare_operation.h @@ -15,6 +15,7 @@ public: TOperationPreparationContext( const TStructuredJobTableList& structuredInputs, const TStructuredJobTableList& structuredOutputs, + const IRawClientPtr& rawClient, const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId); @@ -22,6 +23,7 @@ public: TOperationPreparationContext( TVector<TRichYPath> inputs, TVector<TRichYPath> outputs, + const IRawClientPtr& rawClient, const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId); @@ -38,8 +40,11 @@ public: private: TVector<TMaybe<TRichYPath>> Inputs_; TVector<TMaybe<TRichYPath>> Outputs_; + + const IRawClientPtr RawClient_; const TClientContext& Context_; const IClientRetryPolicyPtr RetryPolicy_; + TTransactionId TransactionId_; mutable TVector<TTableSchema> InputSchemas_; diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp index 253d7c0d44f..44c7db3a971 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp @@ -25,6 +25,7 @@ using ::ToString; //////////////////////////////////////////////////////////////////////////////// void RetryHeavyWriteRequest( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, const ITransactionPingerPtr& transactionPinger, const TClientContext& context, @@ -44,7 +45,7 @@ void RetryHeavyWriteRequest( } for (int attempt = 0; attempt < retryCount; ++attempt) { - TPingableTransaction attemptTx(clientRetryPolicy, context, parentId, transactionPinger->GetChildTxPinger(), TStartTransactionOptions()); + TPingableTransaction attemptTx(rawClient, clientRetryPolicy, context, parentId, transactionPinger->GetChildTxPinger(), TStartTransactionOptions()); auto input = streamMaker(); TString requestId; @@ -167,6 +168,7 @@ void THeavyRequestRetrier::TryStartAttempt() { Attempt_ = std::make_unique<TAttempt>(); Attempt_->Transaction = std::make_unique<TPingableTransaction>( + Parameters_.RawClientPtr, Parameters_.ClientRetryPolicy, Parameters_.Context, Parameters_.TransactionId, Parameters_.TransactionPinger->GetChildTxPinger(), diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.h b/yt/cpp/mapreduce/client/retry_heavy_write_request.h index b3a6ec0f7c2..6933170e96d 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.h +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.h @@ -1,11 +1,14 @@ #pragma once #include <yt/cpp/mapreduce/client/transaction.h> + #include <yt/cpp/mapreduce/common/fwd.h> #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> +#include <yt/cpp/mapreduce/raw_client/raw_client.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -15,6 +18,7 @@ class THeavyRequestRetrier public: struct TParameters { + IRawClientPtr RawClientPtr; IClientRetryPolicyPtr ClientRetryPolicy; ITransactionPingerPtr TransactionPinger; TClientContext Context; @@ -55,6 +59,7 @@ private: //////////////////////////////////////////////////////////////////////////////// void RetryHeavyWriteRequest( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, const ITransactionPingerPtr& transactionPinger, const TClientContext& context, diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index 55165b17ffe..41ad1298cdd 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -9,6 +9,8 @@ #include <yt/cpp/mapreduce/interface/logging/yt_log.h> +#include <yt/cpp/mapreduce/raw_client/raw_client.h> + #include <util/generic/size_literals.h> namespace NYT { @@ -106,7 +108,7 @@ void TRetryfulWriter::Send(const TBuffer& buffer) }; auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_); - RetryHeavyWriteRequest(ClientRetryPolicy_, TransactionPinger_, Context_, transactionId, header, streamMaker); + RetryHeavyWriteRequest(RawClient_, ClientRetryPolicy_, TransactionPinger_, Context_, transactionId, header, streamMaker); Parameters_ = SecondaryParameters_; // all blocks except the first one are appended } diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h index c2de332bffe..e49f9d5f1b3 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.h +++ b/yt/cpp/mapreduce/client/retryful_writer.h @@ -8,6 +8,7 @@ #include <yt/cpp/mapreduce/interface/common.h> #include <yt/cpp/mapreduce/interface/io.h> #include <yt/cpp/mapreduce/io/helpers.h> + #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <library/cpp/threading/blocking_queue/blocking_queue.h> @@ -28,6 +29,7 @@ class TRetryfulWriter public: template <class TWriterOptions> TRetryfulWriter( + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, @@ -36,7 +38,8 @@ public: const TMaybe<TFormat>& format, const TRichYPath& path, const TWriterOptions& options) - : ClientRetryPolicy_(std::move(clientRetryPolicy)) + : RawClient_(rawClient) + , ClientRetryPolicy_(std::move(clientRetryPolicy)) , TransactionPinger_(std::move(transactionPinger)) , Context_(context) , AutoFinish_(options.AutoFinish_) @@ -61,7 +64,7 @@ public: SecondaryParameters_ = FormIORequestParameters(secondaryPath, options); if (options.CreateTransaction_) { - WriteTransaction_.ConstructInPlace(ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); + WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); auto append = path.Append_.GetOrElse(false); auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE); NDetail::NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, WriteTransaction_->GetId(), path.Path_, lockMode); @@ -89,12 +92,15 @@ private: static size_t GetBufferSize(const TMaybe<TWriterOptions>& writerOptions); private: + const IRawClientPtr RawClient_; const IClientRetryPolicyPtr ClientRetryPolicy_; const ITransactionPingerPtr TransactionPinger_; const TClientContext Context_; const bool AutoFinish_; + TString Command_; TMaybe<TFormat> Format_; + const size_t BufferSize_; TNode Parameters_; diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp index 49f02e0405b..e574afe4401 100644 --- a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp @@ -291,6 +291,7 @@ struct TRetryfulWriterV2::TSendTask //////////////////////////////////////////////////////////////////////////////// TRetryfulWriterV2::TRetryfulWriterV2( + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, @@ -311,6 +312,7 @@ TRetryfulWriterV2::TRetryfulWriterV2( if (createTransaction) { WriteTransaction_ = MakeHolder<TPingableTransaction>( + rawClient, clientRetryPolicy, context, parentId, @@ -329,6 +331,7 @@ TRetryfulWriterV2::TRetryfulWriterV2( } THeavyRequestRetrier::TParameters parameters = { + .RawClientPtr = rawClient, .ClientRetryPolicy = clientRetryPolicy, .TransactionPinger = transactionPinger, .Context = context, diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.h b/yt/cpp/mapreduce/client/retryful_writer_v2.h index bda55d96a60..661ef5d0b58 100644 --- a/yt/cpp/mapreduce/client/retryful_writer_v2.h +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.h @@ -18,6 +18,7 @@ class TRetryfulWriterV2 { public: TRetryfulWriterV2( + const IRawClientPtr& rawClient, IClientRetryPolicyPtr clientRetryPolicy, ITransactionPingerPtr transactionPinger, const TClientContext& context, diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp index 6dba1e14359..a2f47af3aeb 100644 --- a/yt/cpp/mapreduce/client/structured_table_formats.cpp +++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp @@ -5,19 +5,20 @@ #include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <yt/cpp/mapreduce/interface/common.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> + #include <yt/cpp/mapreduce/io/yamr_table_reader.h> #include <yt/cpp/mapreduce/library/table_schema/protobuf.h> -#include <yt/cpp/mapreduce/interface/common.h> - #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <library/cpp/type_info/type_info.h> #include <library/cpp/yson/writer.h> -#include <memory> - namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -66,15 +67,26 @@ TMaybe<TNode> GetCommonTableFormat( TMaybe<TNode> GetTableFormat( const IClientRetryPolicyPtr& retryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TTransactionId& transactionId, const TRichYPath& path) { auto formatPath = path.Path_ + "/@_format"; - if (!NDetail::NRawClient::Exists(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath)) { + + auto exists = NDetail::RequestWithRetry<bool>( + retryPolicy->CreatePolicyForGenericRequest(), + [&rawClient, &transactionId, &formatPath] (TMutationId& mutationId) { + return rawClient->Exists(mutationId, transactionId, formatPath); + }); + if (!exists) { return TMaybe<TNode>(); } - TMaybe<TNode> format = NDetail::NRawClient::Get(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath); + + auto format = NDetail::RequestWithRetry<TMaybe<TNode>>( + retryPolicy->CreatePolicyForGenericRequest(), + [&rawClient, &transactionId, &formatPath] (TMutationId& mutationId) { + return rawClient->Get(mutationId, transactionId, formatPath); + }); if (format.Get()->AsString() != "yamred_dsv") { return TMaybe<TNode>(); } @@ -90,13 +102,13 @@ TMaybe<TNode> GetTableFormat( TMaybe<TNode> GetTableFormats( const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TTransactionId& transactionId, const TVector<TRichYPath>& inputs) { TVector<TMaybe<TNode>> formats; for (auto& table : inputs) { - formats.push_back(GetTableFormat(clientRetryPolicy, context, transactionId, table)); + formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, transactionId, table)); } return GetCommonTableFormat(formats); @@ -310,11 +322,13 @@ struct TFormatBuilder::TFormatSwitcher }; TFormatBuilder::TFormatBuilder( + IRawClientPtr rawClient, IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, TTransactionId transactionId, TOperationOptions operationOptions) - : ClientRetryPolicy_(std::move(clientRetryPolicy)) + : RawClient_(std::move(rawClient)) + , ClientRetryPolicy_(std::move(clientRetryPolicy)) , Context_(std::move(context)) , TransactionId_(transactionId) , OperationOptions_(std::move(operationOptions)) @@ -376,7 +390,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat( Y_ABORT_UNLESS(table.RichYPath, "Cannot use format from table for intermediate table"); tableList.push_back(*table.RichYPath); } - formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, Context_, TransactionId_, tableList); + formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, TransactionId_, tableList); } if (formatFromTableAttributes) { return { diff --git a/yt/cpp/mapreduce/client/structured_table_formats.h b/yt/cpp/mapreduce/client/structured_table_formats.h index 27d980c587a..162630d4110 100644 --- a/yt/cpp/mapreduce/client/structured_table_formats.h +++ b/yt/cpp/mapreduce/client/structured_table_formats.h @@ -21,13 +21,13 @@ TMaybe<TNode> GetCommonTableFormat( TMaybe<TNode> GetTableFormat( const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TTransactionId& transactionId, const TRichYPath& path); TMaybe<TNode> GetTableFormats( const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TTransactionId& transactionId, const TVector<TRichYPath>& paths); @@ -84,6 +84,7 @@ private: public: TFormatBuilder( + IRawClientPtr rawClient, IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, TTransactionId transactionId, @@ -130,6 +131,7 @@ public: bool allowFormatFromTableAttribute); private: + const IRawClientPtr RawClient_; const IClientRetryPolicyPtr ClientRetryPolicy_; const TClientContext Context_; const TTransactionId TransactionId_; diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp index 0aa1a7a1c39..9daef9654da 100644 --- a/yt/cpp/mapreduce/client/transaction.cpp +++ b/yt/cpp/mapreduce/client/transaction.cpp @@ -2,15 +2,16 @@ #include "transaction_pinger.h" -#include <yt/cpp/mapreduce/interface/config.h> -#include <yt/cpp/mapreduce/interface/error_codes.h> - #include <yt/cpp/mapreduce/common/wait_proxy.h> #include <yt/cpp/mapreduce/common/retry_lib.h> #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> + #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <util/datetime/base.h> @@ -26,12 +27,14 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// TPingableTransaction::TPingableTransaction( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& parentId, ITransactionPingerPtr transactionPinger, const TStartTransactionOptions& options) - : ClientRetryPolicy_(retryPolicy) + : RawClient_(rawClient) + , ClientRetryPolicy_(retryPolicy) , Context_(context) , AbortableRegistry_(NDetail::TAbortableRegistry::Get()) , AbortOnTermination_(true) @@ -49,24 +52,29 @@ TPingableTransaction::TPingableTransaction( } TPingableTransaction::TPingableTransaction( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, ITransactionPingerPtr transactionPinger, const TAttachTransactionOptions& options) - : ClientRetryPolicy_(retryPolicy) + : RawClient_(rawClient) + , ClientRetryPolicy_(retryPolicy) , Context_(context) , AbortableRegistry_(NDetail::TAbortableRegistry::Get()) , AbortOnTermination_(options.AbortOnTermination_) , AutoPingable_(options.AutoPingable_) , Pinger_(std::move(transactionPinger)) { - auto timeoutNode = NDetail::NRawClient::TryGet( + auto timeoutNode = NDetail::RequestWithRetry<TNode>( ClientRetryPolicy_->CreatePolicyForGenericRequest(), - context, - TTransactionId(), - "#" + GetGuidAsString(transactionId) + "/@timeout", - TGetOptions()); + [this, &transactionId] (TMutationId& mutationId) { + return RawClient_->TryGet( + mutationId, + TTransactionId(), + "#" + GetGuidAsString(transactionId) + "/@timeout", + TGetOptions()); + }); if (timeoutNode.IsUndefined()) { throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist"; } @@ -171,6 +179,7 @@ void TPingableTransaction::Stop(EStopAction action) //////////////////////////////////////////////////////////////////////////////// TYPath Snapshot( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, const TClientContext& context, const TTransactionId& transactionId, @@ -182,11 +191,15 @@ TYPath Snapshot( transactionId, path, ELockMode::LM_SNAPSHOT); - auto lockedNodeId = NDetail::NRawClient::Get( + + auto lockedNodeId = NDetail::RequestWithRetry<TNode>( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - transactionId, - ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id"); + [&rawClient, &transactionId, &lockId] (TMutationId& mutationId) { + return rawClient->Get( + mutationId, + transactionId, + ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id"); + }); return "#" + lockedNodeId.AsString(); } diff --git a/yt/cpp/mapreduce/client/transaction.h b/yt/cpp/mapreduce/client/transaction.h index a363020b246..c3a95462990 100644 --- a/yt/cpp/mapreduce/client/transaction.h +++ b/yt/cpp/mapreduce/client/transaction.h @@ -20,6 +20,7 @@ public: // // Start a new transaction. TPingableTransaction( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& parentId, @@ -29,6 +30,7 @@ public: // // Attach to an existing transaction. TPingableTransaction( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, @@ -56,6 +58,8 @@ private: }; private: + const IRawClientPtr RawClient_; + IClientRetryPolicyPtr ClientRetryPolicy_; TClientContext Context_; TTransactionId TransactionId_; @@ -83,6 +87,7 @@ private: //////////////////////////////////////////////////////////////////////////////// TYPath Snapshot( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, const TClientContext& context, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index b1d244ad78f..85b7d1aa54e 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -12,12 +12,37 @@ class IRawClient public: // Cypress + virtual TNode Get( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options = {}) = 0; + + virtual TNode TryGet( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options = {}) = 0; + virtual void Set( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& path, const TNode& value, const TSetOptions& options = {}) = 0; + + virtual bool Exists( + TMutationId& mutataionId, + const TTransactionId& transactionId, + const TYPath& path, + const TExistsOptions& options = {}) = 0; + + virtual void MultisetAttributes( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode::TMapType& value, + const TMultisetAttributesOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 9a8a9fca845..cc096823774 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -3,6 +3,7 @@ #include "rpc_parameters_serialization.h" #include <yt/cpp/mapreduce/http/http.h> +#include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> #include <library/cpp/yson/node/node_io.h> @@ -15,6 +16,33 @@ THttpRawClient::THttpRawClient(const TClientContext& context) : Context_(context) { } +TNode THttpRawClient::Get( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options) +{ + THttpHeader header("GET", "get"); + header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options)); + return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header).Response); +} + +TNode THttpRawClient::TryGet( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options) +{ + try { + return Get(mutationId, transactionId, path, options); + } catch (const TErrorResponse& error) { + if (!error.IsResolveError()) { + throw; + } + return TNode(); + } +} + void THttpRawClient::Set( TMutationId& mutationId, const TTransactionId& transactionId, @@ -28,6 +56,31 @@ void THttpRawClient::Set( RequestWithoutRetry(Context_, mutationId, header, body); } +bool THttpRawClient::Exists( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TExistsOptions& options) +{ + THttpHeader header("GET", "exists"); + header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options)); + return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); +} + +void THttpRawClient::MultisetAttributes( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode::TMapType& value, + const TMultisetAttributesOptions& options) +{ + THttpHeader header("PUT", "api/v4/multiset_attributes", false); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options)); + auto body = NodeToYsonString(value); + RequestWithoutRetry(Context_, mutationId, header, body); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index def521a9188..3b31aa519c4 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -17,6 +17,18 @@ public: // Cypress + TNode Get( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options = {}) override; + + TNode TryGet( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options) override; + void Set( TMutationId& mutationId, const TTransactionId& transactionId, @@ -24,6 +36,19 @@ public: const TNode& value, const TSetOptions& options = {}) override; + bool Exists( + TMutationId& mutataionId, + const TTransactionId& transactionId, + const TYPath& path, + const TExistsOptions& options = {}) override; + + void MultisetAttributes( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode::TMapType& value, + const TMultisetAttributesOptions& options = {}) override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index d10c818489c..13100a29070 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -79,35 +79,6 @@ void ExecuteBatch( } } -TNode Get( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TGetOptions& options) -{ - THttpHeader header("GET", "get"); - header.MergeParameters(SerializeParamsForGet(transactionId, context.Config->Prefix, path, options)); - return NodeFromYsonString(RetryRequestWithPolicy(retryPolicy, context, header).Response); -} - -TNode TryGet( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TGetOptions& options) -{ - try { - return Get(retryPolicy, context, transactionId, path, options); - } catch (const TErrorResponse& error) { - if (!error.IsResolveError()) { - throw; - } - return TNode(); - } -} - void Set( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, @@ -123,34 +94,6 @@ void Set( RetryRequestWithPolicy(retryPolicy, context, header, body); } -void MultisetAttributes( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TNode::TMapType& value, - const TMultisetAttributesOptions& options) -{ - THttpHeader header("PUT", "api/v4/multiset_attributes", false); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForMultisetAttributes(transactionId, context.Config->Prefix, path, options)); - - auto body = NodeToYsonString(value); - RetryRequestWithPolicy(retryPolicy, context, header, body); -} - -bool Exists( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TExistsOptions& options) -{ - THttpHeader header("GET", "exists"); - header.MergeParameters(SerializeParamsForExists(transactionId, context.Config->Prefix, path, options)); - return ParseBoolFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); -} - TNodeId Create( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 8cb226ab862..96636808f37 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -39,35 +39,6 @@ void ExecuteBatch( // Cypress // -TNode Get( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TGetOptions& options = TGetOptions()); - -TNode TryGet( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TGetOptions& options); - -void MultisetAttributes( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TNode::TMapType& value, - const TMultisetAttributesOptions& options = TMultisetAttributesOptions()); - -bool Exists( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TExistsOptions& options = TExistsOptions()); - TNodeId Create( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, |
