diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-12 15:42:31 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-12 16:16:32 +0300 |
commit | 33b0eb2c8e449c6bf0b1d09020ac16823a04c808 (patch) | |
tree | 4896aefe651c0d564419b95f75eec656d12b357a /yt/cpp/mapreduce | |
parent | 488068b0d84538afd45ca9c7b805b06981eb0d84 (diff) | |
download | ydb-33b0eb2c8e449c6bf0b1d09020ac16823a04c808.tar.gz |
yt/cpp/mapreduce: Move Set operation to THttpRawClient
commit_hash:3c4bb23f8331162e4667c907c007bc859cc2fc76
Diffstat (limited to 'yt/cpp/mapreduce')
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 37 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client.h | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/retry_request.cpp | 10 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/retry_request.h | 64 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/fwd.h | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 25 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 33 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 33 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/ya.make | 1 |
15 files changed, 230 insertions, 24 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 43e3864ae2..a2d302212f 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -42,6 +42,7 @@ #include <yt/cpp/mapreduce/library/table_schema/protobuf.h> +#include <yt/cpp/mapreduce/raw_client/raw_client.h> #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h> @@ -90,10 +91,12 @@ void ApplyProxyUrlAliasingRules(TString& url) //////////////////////////////////////////////////////////////////////////////// TClientBase::TClientBase( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& transactionId, IClientRetryPolicyPtr retryPolicy) - : Context_(context) + : RawClient_(std::move(rawClient)) + , Context_(context) , TransactionId_(transactionId) , ClientRetryPolicy_(std::move(retryPolicy)) { } @@ -101,7 +104,7 @@ TClientBase::TClientBase( ITransactionPtr TClientBase::StartTransaction( const TStartTransactionOptions& options) { - return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options); + return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options); } TNodeId TClientBase::Create( @@ -138,7 +141,11 @@ void TClientBase::Set( const TNode& value, const TSetOptions& options) { - NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &value, &options] (TMutationId& mutationId) { + RawClient_->Set(mutationId, TransactionId_, path, value, options); + }); } void TClientBase::MultisetAttributes( @@ -826,6 +833,11 @@ IClientPtr TClientBase::GetParentClient() return GetParentClientImpl(); } +IRawClientPtr TClientBase::GetRawClient() const +{ + return RawClient_; +} + const TClientContext& TClientBase::GetContext() const { return Context_; @@ -839,11 +851,12 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const //////////////////////////////////////////////////////////////////////////////// TTransaction::TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options) - : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy()) + : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( MakeHolder<TPingableTransaction>( @@ -858,11 +871,12 @@ TTransaction::TTransaction( } TTransaction::TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, const TAttachTransactionOptions& options) - : TClientBase(context, transactionId, parentClient->GetRetryPolicy()) + : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( new TPingableTransaction( @@ -928,10 +942,11 @@ TClientPtr TTransaction::GetParentClientImpl() //////////////////////////////////////////////////////////////////////////////// TClient::TClient( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& globalId, IClientRetryPolicyPtr retryPolicy) - : TClientBase(context, globalId, retryPolicy) + : TClientBase(std::move(rawClient), context, globalId, retryPolicy) , TransactionPinger_(nullptr) { } @@ -943,7 +958,7 @@ ITransactionPtr TClient::AttachTransaction( { CheckShutdown(); - return MakeIntrusive<TTransaction>(this, Context_, transactionId, options); + return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options); } void TClient::MountTable( @@ -1435,9 +1450,15 @@ TClientPtr CreateClientImpl( retryConfigProvider = CreateDefaultRetryConfigProvider(); } + auto rawClient = MakeIntrusive<THttpRawClient>(context); + EnsureInitialized(); - return new TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config)); + return new TClient( + std::move(rawClient), + context, + globalTxId, + CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 32c458316d..2a0398d056 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -5,6 +5,7 @@ #include "transaction_pinger.h" #include <yt/cpp/mapreduce/interface/client.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> @@ -29,6 +30,7 @@ class TClientBase { public: TClientBase( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& transactionId, IClientRetryPolicyPtr retryPolicy); @@ -222,6 +224,8 @@ public: IClientPtr GetParentClient() override; + IRawClientPtr GetRawClient() const; + const TClientContext& GetContext() const; const IClientRetryPolicyPtr& GetRetryPolicy() const; @@ -232,6 +236,7 @@ protected: virtual TClientPtr GetParentClientImpl() = 0; protected: + const IRawClientPtr RawClient_; const TClientContext Context_; TTransactionId TransactionId_; IClientRetryPolicyPtr ClientRetryPolicy_; @@ -287,6 +292,7 @@ public: // // Start a new transaction. TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, @@ -295,6 +301,7 @@ public: // // Attach an existing transaction. TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, @@ -325,6 +332,7 @@ protected: TClientPtr GetParentClientImpl() override; private: + const IRawClientPtr RawClient_; ITransactionPingerPtr TransactionPinger_; THolder<TPingableTransaction> PingableTx_; TClientPtr ParentClient_; @@ -338,6 +346,7 @@ class TClient { public: TClient( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& globalId, IClientRetryPolicyPtr retryPolicy); diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index 07d00e88d6..a6d424c5a1 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -17,6 +17,7 @@ #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h> #include <yt/cpp/mapreduce/interface/error_codes.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> @@ -395,7 +396,8 @@ TJobPreparer::TJobPreparer( size_t outputTableCount, const TVector<TSmallJobFile>& smallFileList, const TOperationOptions& options) - : OperationPreparer_(operationPreparer) + : RawClient_(operationPreparer.GetClient()->GetRawClient()) + , OperationPreparer_(operationPreparer) , Spec_(spec) , Options_(options) , Layers_(spec.Layers_) @@ -631,6 +633,7 @@ TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& it CreateFileInCypress(cypressPath); auto uploadTx = MakeIntrusive<TTransaction>( + OperationPreparer_.GetClient()->GetRawClient(), OperationPreparer_.GetClient(), OperationPreparer_.GetContext(), TTransactionId(), diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h index 54c978c0fb..41594eb52e 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.h +++ b/yt/cpp/mapreduce/client/operation_preparer.h @@ -83,6 +83,7 @@ public: bool ShouldRedirectStdoutToStderr() const; private: + const IRawClientPtr RawClient_; TOperationPreparer& OperationPreparer_; TUserJobSpec Spec_; TOperationOptions Options_; diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index ca243a929a..41ee56f672 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -207,7 +207,7 @@ void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite AddParameter("operation_id", GetGuidAsString(operationId), overwrite); } -void THttpHeader::AddMutationId() +TMutationId THttpHeader::AddMutationId() { TGUID guid; @@ -220,6 +220,8 @@ void THttpHeader::AddMutationId() guid.dw[2] = GetPID() ^ MicroSeconds(); AddParameter("mutation_id", GetGuidAsString(guid), true); + + return guid; } bool THttpHeader::HasMutationId() const @@ -227,6 +229,10 @@ bool THttpHeader::HasMutationId() const return Parameters_.contains("mutation_id"); } +void THttpHeader::SetMutationId(TMutationId mutationId) { + AddParameter("mutation_id", GetGuidAsString(mutationId), /* overwrite */ true); +} + void THttpHeader::SetToken(const TString& token) { Token_ = token; diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 618b1e2c22..0f5e9034ee 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -59,8 +59,9 @@ public: void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false); void AddPath(const TString& path, bool overwrite = false); void AddOperationId(const TOperationId& operationId, bool overwrite = false); - void AddMutationId(); + TMutationId AddMutationId(); bool HasMutationId() const; + void SetMutationId(TMutationId mutationId); void SetToken(const TString& token); void SetProxyAddress(const TString& proxyAddress); diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp index 0c719eeda3..3330cf696c 100644 --- a/yt/cpp/mapreduce/http/retry_request.cpp +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -49,6 +49,7 @@ static TResponseInfo Request( TResponseInfo RequestWithoutRetry( const TClientContext& context, + TMutationId& mutationId, THttpHeader& header, TMaybe<TStringBuf> body, const TRequestConfig& config) @@ -64,8 +65,13 @@ TResponseInfo RequestWithoutRetry( } if (header.HasMutationId()) { - header.RemoveParameter("retry"); - header.AddMutationId(); + if (mutationId.IsEmpty()) { + header.RemoveParameter("retry"); + mutationId = header.AddMutationId(); + } else { + header.AddParameter("retry", true, /* overwrite */ true); + header.SetMutationId(mutationId); + } } auto requestId = CreateGuidAsString(); return Request(context, header, body, requestId, config); diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h index a6007d6eae..19bf0a7c14 100644 --- a/yt/cpp/mapreduce/http/retry_request.h +++ b/yt/cpp/mapreduce/http/retry_request.h @@ -2,8 +2,13 @@ #include "fwd.h" +#include <yt/cpp/mapreduce/interface/errors.h> #include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + #include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> #include <yt/cpp/mapreduce/http/http_client.h> @@ -32,6 +37,64 @@ struct TRequestConfig //////////////////////////////////////////////////////////////////////////////// +template <typename TResult> +TResult RequestWithRetry( + IRequestRetryPolicyPtr retryPolicy, + std::function<TResult(TMutationId&)> func) +{ + bool useSameMutationId = false; + TMutationId mutationId; + + while (true) { + try { + retryPolicy->NotifyNewAttempt(); + if constexpr (std::is_same_v<TResult, void>) { + func(mutationId); + return; + } else { + return func(mutationId); + } + } catch (const TErrorResponse& e) { + YT_LOG_ERROR("Retry failed %v - %v", + e.GetError().GetMessage(), + retryPolicy->GetAttemptDescription()); + + useSameMutationId = e.IsTransportError(); + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnRetriableError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } catch (const std::exception& e) { + YT_LOG_ERROR("Retry failed %v - %v", + e.what(), + retryPolicy->GetAttemptDescription()); + + useSameMutationId = true; + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnGenericError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } + if (!useSameMutationId) { + mutationId = {}; + } + } +} + // Retry request with given `header' and `body' using `retryPolicy'. // If `retryPolicy == nullptr' use default, currently `TAttemptLimitedRetryPolicy(TConfig::Get()->RetryCount)`. TResponseInfo RetryRequestWithPolicy( @@ -43,6 +106,7 @@ TResponseInfo RetryRequestWithPolicy( TResponseInfo RequestWithoutRetry( const TClientContext& context, + TMutationId& mutationId, THttpHeader& header, TMaybe<TStringBuf> body = {}, const TRequestConfig& config = TRequestConfig()); diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h index 485b45129a..162a6ee3e9 100644 --- a/yt/cpp/mapreduce/interface/fwd.h +++ b/yt/cpp/mapreduce/interface/fwd.h @@ -150,6 +150,7 @@ namespace NYT { // common.h //////////////////////////////////////////////////////////////////////////////// + using TMutationId = TGUID; using TTransactionId = TGUID; using TNodeId = TGUID; using TLockId = TGUID; @@ -396,5 +397,12 @@ namespace NYT { struct TRetryConfig; class IRetryConfigProvider; using IRetryConfigProviderPtr = ::TIntrusivePtr<IRetryConfigProvider>; + + //////////////////////////////////////////////////////////////////////////////// + // raw_client.h + //////////////////////////////////////////////////////////////////////////////// + + class IRawClient; + using IRawClientPtr = ::TIntrusivePtr<IRawClient>; } /// @endcond diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h new file mode 100644 index 0000000000..b1d244ad78 --- /dev/null +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -0,0 +1,25 @@ +#pragma once + +#include "client_method_options.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class IRawClient + : public virtual TThrRefBase +{ +public: + // Cypress + + virtual void Set( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode& value, + const TSetOptions& options = {}) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp new file mode 100644 index 0000000000..9a8a9fca84 --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -0,0 +1,33 @@ +#include "raw_client.h" + +#include "rpc_parameters_serialization.h" + +#include <yt/cpp/mapreduce/http/http.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <library/cpp/yson/node/node_io.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +THttpRawClient::THttpRawClient(const TClientContext& context) + : Context_(context) +{ } + +void THttpRawClient::Set( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode& value, + const TSetOptions& options) +{ + THttpHeader header("PUT", "set"); + header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options)); + auto body = NodeToYsonString(value); + RequestWithoutRetry(Context_, mutationId, header, body); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h new file mode 100644 index 0000000000..def521a918 --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -0,0 +1,33 @@ +#pragma once + +#include <yt/cpp/mapreduce/http/context.h> + +#include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class THttpRawClient + : public IRawClient +{ +public: + THttpRawClient(const TClientContext& context); + + // Cypress + + void Set( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode& value, + const TSetOptions& options = {}) override; + +private: + const TClientContext Context_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 0b7bcf2c66..d10c818489 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -173,9 +173,10 @@ TNodeId CopyWithoutRetries( const TCopyOptions& options) { THttpHeader header("POST", "copy"); + TMutationId mutationId; header.AddMutationId(); header.MergeParameters(SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options)); - return ParseGuidFromResponse(RequestWithoutRetry(context, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(context, mutationId, header).Response); } TNodeId CopyInsideMasterCell( @@ -204,9 +205,10 @@ TNodeId MoveWithoutRetries( const TMoveOptions& options) { THttpHeader header("POST", "move"); + TMutationId mutationId; header.AddMutationId(); header.MergeParameters(SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options)); - return ParseGuidFromResponse(RequestWithoutRetry( context, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(context, mutationId, header).Response); } TNodeId MoveInsideMasterCell( @@ -309,9 +311,10 @@ void Concatenate( const TConcatenateOptions& options) { THttpHeader header("POST", "concatenate"); + TMutationId mutationId; header.AddMutationId(); header.MergeParameters(SerializeParamsForConcatenate(transactionId, context.Config->Prefix, sourcePaths, destinationPath, options)); - RequestWithoutRetry(context, header); + RequestWithoutRetry(context, mutationId, header); } void PingTx( diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 0a183d617c..8cb226ab86 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -53,14 +53,6 @@ TNode TryGet( const TYPath& path, const TGetOptions& options); -void Set( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TNode& value, - const TSetOptions& options = TSetOptions()); - void MultisetAttributes( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, diff --git a/yt/cpp/mapreduce/raw_client/ya.make b/yt/cpp/mapreduce/raw_client/ya.make index c201b86f05..a038b2931f 100644 --- a/yt/cpp/mapreduce/raw_client/ya.make +++ b/yt/cpp/mapreduce/raw_client/ya.make @@ -4,6 +4,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( raw_batch_request.cpp + raw_client.cpp raw_requests.cpp rpc_parameters_serialization.cpp ) |