diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-13 20:18:29 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-13 20:58:57 +0300 |
commit | 91a8ffd57d8783a3d940c9506254fa7e2012e1ec (patch) | |
tree | e482755ebffea84fefd270a9a9d2a32b604d18ad | |
parent | 8afd0e6dc9d41134a0cccfd6b6c5fe843efd80fb (diff) | |
download | ydb-91a8ffd57d8783a3d940c9506254fa7e2012e1ec.tar.gz |
yt/cpp/mapreduce: move Create to THttpRawClient
commit_hash:9ca8428c322034064576bb56f74e704425ce7de9
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 20 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 54 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.cpp | 41 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 7 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 13 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 7 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 29 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 8 |
8 files changed, 98 insertions, 81 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 24aed7fb77..45a5bf1d47 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -112,7 +112,11 @@ TNodeId TClientBase::Create( ENodeType type, const TCreateOptions& options) { - return NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, type, options); + return RequestWithRetry<TNodeId>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &type, &options] (TMutationId& mutationId) { + return RawClient_->Create(mutationId, TransactionId_, path, type, options); + }); } void TClientBase::Remove( @@ -337,8 +341,11 @@ IFileWriterPtr TClientBase::CreateFileWriter( return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_); }); if (!exists) { - NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE, - TCreateOptions().IgnoreExisting(true)); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &realPath] (TMutationId& mutationId) { + RawClient_->Create(mutationId, TransactionId_, realPath.Path_, NT_FILE, TCreateOptions().IgnoreExisting(true)); + }); } return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options); @@ -737,8 +744,11 @@ THolder<TClientWriter> TClientBase::CreateClientWriter( return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_); }); if (!exists) { - NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE, - TCreateOptions().IgnoreExisting(true)); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &realPath] (TMutationId& mutataionId) { + RawClient_->Create(mutataionId, TransactionId_, realPath.Path_, NT_TABLE, TCreateOptions().IgnoreExisting(true)); + }); } return MakeHolder<TClientWriter>( diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 5bb67e3059..0feaa016ed 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -973,26 +973,32 @@ template <typename TSpec> void CreateDebugOutputTables(const TSpec& spec, const TOperationPreparer& preparer) { if (spec.StderrTablePath_.Defined()) { - NYT::NDetail::Create( + RequestWithRetry<void>( preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - preparer.GetContext(), - TTransactionId(), - *spec.StderrTablePath_, - NT_TABLE, - TCreateOptions() - .IgnoreExisting(true) - .Recursive(true)); + [&spec, &preparer] (TMutationId& mutationId) { + preparer.GetClient()->GetRawClient()->Create( + mutationId, + TTransactionId(), + *spec.StderrTablePath_, + NT_TABLE, + TCreateOptions() + .IgnoreExisting(true) + .Recursive(true)); + }); } if (spec.CoreTablePath_.Defined()) { - NYT::NDetail::Create( + RequestWithRetry<void>( preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - preparer.GetContext(), - TTransactionId(), - *spec.CoreTablePath_, - NT_TABLE, - TCreateOptions() - .IgnoreExisting(true) - .Recursive(true)); + [&spec, &preparer] (TMutationId& mutationId) { + preparer.GetClient()->GetRawClient()->Create( + mutationId, + TTransactionId(), + *spec.CoreTablePath_, + NT_TABLE, + TCreateOptions() + .IgnoreExisting(true) + .Recursive(true)); + }); } } @@ -1003,12 +1009,18 @@ void CreateOutputTable( Y_ENSURE(path.Path_, "Output table is not set"); if (!path.Create_.Defined()) { // If `create` attribute is defined - Create( + RequestWithRetry<void>( preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - preparer.GetContext(), preparer.GetTransactionId(), path.Path_, NT_TABLE, - TCreateOptions() - .IgnoreExisting(true) - .Recursive(true)); + [&preparer, &path] (TMutationId& mutationId) { + preparer.GetClient()->GetRawClient()->Create( + mutationId, + preparer.GetTransactionId(), + path.Path_, + NT_TABLE, + TCreateOptions() + .IgnoreExisting(true) + .Recursive(true)); + }); } } diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index eb7264425c..b5263420f0 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -490,15 +490,18 @@ TYPath TJobPreparer::GetCachePath() const void TJobPreparer::CreateStorage() const { - Create( + RequestWithRetry<void>( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - OperationPreparer_.GetContext(), - Options_.FileStorageTransactionId_, - GetCachePath(), - NT_MAP, - TCreateOptions() - .IgnoreExisting(true) - .Recursive(true)); + [this] (TMutationId& mutationId) { + RawClient_->Create( + mutationId, + Options_.FileStorageTransactionId_, + GetCachePath(), + NT_MAP, + TCreateOptions() + .IgnoreExisting(true) + .Recursive(true)); + }); } int TJobPreparer::GetFileCacheReplicationFactor() const @@ -517,17 +520,19 @@ void TJobPreparer::CreateFileInCypress(const TString& path) const attributes["expiration_timeout"] = Options_.FileExpirationTimeout_->MilliSeconds(); } - Create( + RequestWithRetry<void>( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - OperationPreparer_.GetContext(), - Options_.FileStorageTransactionId_, - path, - NT_FILE, - TCreateOptions() - .IgnoreExisting(true) - .Recursive(true) - .Attributes(attributes) - ); + [this, &path, &attributes] (TMutationId& mutationId) { + RawClient_->Create( + mutationId, + Options_.FileStorageTransactionId_, + path, + NT_FILE, + TCreateOptions() + .IgnoreExisting(true) + .Recursive(true) + .Attributes(attributes)); + }); } TString TJobPreparer::PutFileToCypressCache( diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 85b7d1aa54..e5c672929c 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -43,6 +43,13 @@ public: const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options = {}) = 0; + + virtual TNodeId Create( + TMutationId& mutatatonId, + const TTransactionId& transactionId, + const TYPath& path, + const ENodeType& type, + const TCreateOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index cc09682377..034dff7f33 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -81,6 +81,19 @@ void THttpRawClient::MultisetAttributes( RequestWithoutRetry(Context_, mutationId, header, body); } +TNodeId THttpRawClient::Create( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const ENodeType& type, + const TCreateOptions& options) +{ + THttpHeader header("POST", "create"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options)); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index 3b31aa519c..0908794168 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -49,6 +49,13 @@ public: const TNode::TMapType& value, const TMultisetAttributesOptions& options = {}) override; + TNodeId Create( + TMutationId& mutatatonId, + const TTransactionId& transactionId, + const TYPath& path, + const ENodeType& type, + const TCreateOptions& options = {}) override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 13100a2907..9d643afc91 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -79,35 +79,6 @@ void ExecuteBatch( } } -void Set( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TNode& value, - const TSetOptions& options) -{ - THttpHeader header("PUT", "set"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForSet(transactionId, context.Config->Prefix, path, options)); - auto body = NodeToYsonString(value); - RetryRequestWithPolicy(retryPolicy, context, header, body); -} - -TNodeId Create( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const ENodeType& type, - const TCreateOptions& options) -{ - THttpHeader header("POST", "create"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForCreate(transactionId, context.Config->Prefix, path, type, options)); - return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); -} - TNodeId CopyWithoutRetries( const TClientContext& context, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 96636808f3..9a02e34951 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -39,14 +39,6 @@ void ExecuteBatch( // Cypress // -TNodeId Create( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const ENodeType& type, - const TCreateOptions& options = TCreateOptions()); - TNodeId CopyWithoutRetries( const TClientContext& context, const TTransactionId& transactionId, |