diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-16 18:45:28 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-16 19:11:27 +0300 |
commit | dae2dbe3496d7557b1ece64d5464bd8e686995a8 (patch) | |
tree | 72899b4fb6279a620a14f71005d81afbbe93cd35 | |
parent | b1cde7dcb055fb6f3367e81fd0f57bd55b8bb93c (diff) | |
download | ydb-dae2dbe3496d7557b1ece64d5464bd8e686995a8.tar.gz |
[yt/cpp/mapreduce] YT-23616: Move Transaction and Operation methods to THttpRawClient
commit_hash:b093be44005f3d9da9779444cbbc32b93f7372ee
-rw-r--r-- | yt/cpp/mapreduce/client/abortable_registry.cpp | 20 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/abortable_registry.h | 7 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 59 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 96 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/transaction.cpp | 11 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/transaction.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/transaction_pinger.cpp | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 40 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 136 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 38 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 141 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 58 |
13 files changed, 362 insertions, 249 deletions
diff --git a/yt/cpp/mapreduce/client/abortable_registry.cpp b/yt/cpp/mapreduce/client/abortable_registry.cpp index 283d39e049..91308cba05 100644 --- a/yt/cpp/mapreduce/client/abortable_registry.cpp +++ b/yt/cpp/mapreduce/client/abortable_registry.cpp @@ -2,7 +2,11 @@ #include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + #include <yt/cpp/mapreduce/interface/common.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> + #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <util/generic/singleton.h> @@ -31,16 +35,22 @@ TString TTransactionAbortable::GetType() const //////////////////////////////////////////////////////////////////////////////// -TOperationAbortable::TOperationAbortable(IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TOperationId& operationId) - : ClientRetryPolicy_(std::move(clientRetryPolicy)) - , Context_(std::move(context)) +TOperationAbortable::TOperationAbortable( + IRawClientPtr rawClient, + IClientRetryPolicyPtr clientRetryPolicy, + const TOperationId& operationId) + : RawClient_(std::move(rawClient)) + , ClientRetryPolicy_(std::move(clientRetryPolicy)) , OperationId_(operationId) { } - void TOperationAbortable::Abort() { - AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, OperationId_); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this] (TMutationId& mutationId) { + RawClient_->AbortOperation(mutationId, OperationId_); + }); } TString TOperationAbortable::GetType() const diff --git a/yt/cpp/mapreduce/client/abortable_registry.h b/yt/cpp/mapreduce/client/abortable_registry.h index 119d685cad..7d8996405e 100644 --- a/yt/cpp/mapreduce/client/abortable_registry.h +++ b/yt/cpp/mapreduce/client/abortable_registry.h @@ -46,13 +46,16 @@ class TOperationAbortable : public IAbortable { public: - TOperationAbortable(IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TOperationId& operationId); + TOperationAbortable( + IRawClientPtr rawClient, + IClientRetryPolicyPtr clientRetryPolicy, + const TOperationId& operationId); void Abort() override; TString GetType() const override; private: + const IRawClientPtr RawClient_; const IClientRetryPolicyPtr ClientRetryPolicy_; - const TClientContext Context_; const TOperationId OperationId_; }; diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 22940c1de5..66cacf6b45 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -711,17 +711,25 @@ IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId) EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId) { - return NYT::NDetail::CheckOperation(ClientRetryPolicy_, Context_, operationId); + return NYT::NDetail::CheckOperation(RawClient_, ClientRetryPolicy_, Context_, operationId); } void TClientBase::AbortOperation(const TOperationId& operationId) { - NRawClient::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId] (TMutationId& mutationId) { + RawClient_->AbortOperation(mutationId, operationId); + }); } void TClientBase::CompleteOperation(const TOperationId& operationId) { - NRawClient::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId] (TMutationId& mutationId) { + RawClient_->CompleteOperation(mutationId, operationId); + }); } void TClientBase::WaitForOperation(const TOperationId& operationId) @@ -996,7 +1004,11 @@ void TTransaction::Abort() void TTransaction::Ping() { - PingTx(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this] (TMutationId /*mutationId*/) { + RawClient_->PingTx(TransactionId_); + }); } void TTransaction::Detach() @@ -1265,7 +1277,11 @@ TOperationAttributes TClient::GetOperation( const TGetOperationOptions& options) { CheckShutdown(); - return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options); + return RequestWithRetry<TOperationAttributes>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId, &options] (TMutationId /*mutationId*/) { + return RawClient_->GetOperation(operationId, options); + }); } TOperationAttributes TClient::GetOperation( @@ -1273,14 +1289,21 @@ TOperationAttributes TClient::GetOperation( const TGetOperationOptions& options) { CheckShutdown(); - return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, alias, options); + return RequestWithRetry<TOperationAttributes>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &alias, &options] (TMutationId /*mutationId*/) { + return RawClient_->GetOperation(alias, options); + }); } -TListOperationsResult TClient::ListOperations( - const TListOperationsOptions& options) +TListOperationsResult TClient::ListOperations(const TListOperationsOptions& options) { CheckShutdown(); - return NRawClient::ListOperations(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, options); + return RequestWithRetry<TListOperationsResult>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId /*mutationId*/) { + return RawClient_->ListOperations(options); + }); } void TClient::UpdateOperationParameters( @@ -1288,7 +1311,11 @@ void TClient::UpdateOperationParameters( const TUpdateOperationParametersOptions& options) { CheckShutdown(); - return NRawClient::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId, &options] (TMutationId /*mutationId*/) { + RawClient_->UpdateOperationParameters(operationId, options); + }); } TJobAttributes TClient::GetJob( @@ -1379,7 +1406,11 @@ void TClient::SuspendOperation( const TSuspendOperationOptions& options) { CheckShutdown(); - NRawClient::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId, &options] (TMutationId& mutationId) { + RawClient_->SuspendOperation(mutationId, operationId, options); + }); } void TClient::ResumeOperation( @@ -1387,7 +1418,11 @@ void TClient::ResumeOperation( const TResumeOperationOptions& options) { CheckShutdown(); - NRawClient::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId, &options] (TMutationId& mutationId) { + RawClient_->ResumeOperation(mutationId, operationId, options); + }); } TYtPoller& TClient::GetYtPoller() diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index c5a9d7f523..9f8c14647a 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -552,17 +552,21 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( } EOperationBriefState CheckOperation( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, const TClientContext& context, const TOperationId& operationId) { - auto attributes = GetOperation( + auto attributes = RequestWithRetry<TOperationAttributes>( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - operationId, - TGetOperationOptions().AttributeFilter(TOperationAttributeFilter() - .Add(EOperationAttribute::State) - .Add(EOperationAttribute::Result))); + [&rawClient, &operationId] (TMutationId /*mutationId*/) { + return rawClient->GetOperation( + operationId, + TGetOperationOptions().AttributeFilter(TOperationAttributeFilter() + .Add(EOperationAttribute::State) + .Add(EOperationAttribute::Result))); + }); + Y_ABORT_UNLESS(attributes.BriefState, "get_operation for operation %s has not returned \"state\" field", GetGuidAsString(operationId).data()); @@ -604,7 +608,7 @@ void WaitForOperation( : context.Config->OperationTrackerPollPeriod; while (true) { - auto status = CheckOperation(clientRetryPolicy, context, operationId); + auto status = CheckOperation(rawClient, clientRetryPolicy, context, operationId); if (status == EOperationBriefState::Completed) { YT_LOG_INFO("Operation %v completed (%v)", operationId, @@ -2237,11 +2241,13 @@ class TOperation::TOperationImpl { public: TOperationImpl( + IRawClientPtr rawClient, IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TMaybe<TOperationId>& operationId = {}) - : ClientRetryPolicy_(clientRetryPolicy) + : RawClient_(std::move(rawClient)) , Context_(std::move(context)) + , ClientRetryPolicy_(clientRetryPolicy) , Id_(operationId) , PreparedPromise_(::NThreading::NewPromise<void>()) , StartedPromise_(::NThreading::NewPromise<void>()) @@ -2308,8 +2314,10 @@ private: void ValidateOperationStarted() const; private: - IClientRetryPolicyPtr ClientRetryPolicy_; + const IRawClientPtr RawClient_; const TClientContext Context_; + + IClientRetryPolicyPtr ClientRetryPolicy_; TMaybe<TOperationId> Id_; TMutex Lock_; @@ -2509,7 +2517,7 @@ void TOperation::TOperationImpl::OnStatusUpdated(const TString& newStatus) auto registry = TAbortableRegistry::Get(); registry->Add( operationId, - ::MakeIntrusive<TOperationAbortable>(this_->ClientRetryPolicy_, this_->Context_, operationId)); + ::MakeIntrusive<TOperationAbortable>(this_->RawClient_, this_->ClientRetryPolicy_, operationId)); // We have to own an IntrusivePtr to registry to prevent use-after-free auto removeOperation = [registry, operationId] (const ::NThreading::TFuture<void>&) { registry->Remove(operationId); @@ -2632,7 +2640,9 @@ void TOperation::TOperationImpl::OnStarted(const TOperationId& operationId) StartedPromise_.SetValue(); } -void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func) +void TOperation::TOperationImpl::UpdateAttributesAndCall( + bool needJobStatistics, + std::function<void(const TOperationAttributes&)> func) { { auto g = Guard(Lock_); @@ -2645,15 +2655,17 @@ void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics, } } - TOperationAttributes attributes = NDetail::GetOperation( + auto attributes = RequestWithRetry<TOperationAttributes>( ClientRetryPolicy_->CreatePolicyForGenericRequest(), - Context_, - *Id_, - TGetOperationOptions().AttributeFilter(TOperationAttributeFilter() - .Add(EOperationAttribute::Result) - .Add(EOperationAttribute::Progress) - .Add(EOperationAttribute::State) - .Add(EOperationAttribute::BriefProgress))); + [this] (TMutationId /*mutationId*/) { + return RawClient_->GetOperation( + *Id_, + TGetOperationOptions().AttributeFilter(TOperationAttributeFilter() + .Add(EOperationAttribute::Result) + .Add(EOperationAttribute::Progress) + .Add(EOperationAttribute::State) + .Add(EOperationAttribute::BriefProgress))); + }); func(attributes); @@ -2672,37 +2684,61 @@ void TOperation::TOperationImpl::FinishWithException(std::exception_ptr e) void TOperation::TOperationImpl::AbortOperation() { ValidateOperationStarted(); - NYT::NDetail::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this] (TMutationId& mutationId) { + RawClient_->AbortOperation(mutationId, *Id_); + }); } void TOperation::TOperationImpl::CompleteOperation() { ValidateOperationStarted(); - NYT::NDetail::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this] (TMutationId& mutationId) { + RawClient_->CompleteOperation(mutationId, *Id_); + }); } void TOperation::TOperationImpl::SuspendOperation(const TSuspendOperationOptions& options) { ValidateOperationStarted(); - NYT::NDetail::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId& mutationId) { + RawClient_->SuspendOperation(mutationId, *Id_, options); + }); } void TOperation::TOperationImpl::ResumeOperation(const TResumeOperationOptions& options) { ValidateOperationStarted(); - NYT::NDetail::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId& mutationId) { + RawClient_->ResumeOperation(mutationId, *Id_, options); + }); } TOperationAttributes TOperation::TOperationImpl::GetAttributes(const TGetOperationOptions& options) { ValidateOperationStarted(); - return NYT::NDetail::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + return RequestWithRetry<TOperationAttributes>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId /*mutationId*/) { + return RawClient_->GetOperation(*Id_, options); + }); } void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParametersOptions& options) { ValidateOperationStarted(); - return NYT::NDetail::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId /*mutationId*/) { + RawClient_->UpdateOperationParameters(*Id_, options); + }); } TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options) @@ -2815,13 +2851,19 @@ const TClientContext& TOperation::TOperationImpl::GetContext() const TOperation::TOperation(TClientPtr client) : Client_(std::move(client)) - , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext())) + , Impl_(::MakeIntrusive<TOperationImpl>( + Client_->GetRawClient(), + Client_->GetRetryPolicy(), + Client_->GetContext())) { } TOperation::TOperation(TOperationId id, TClientPtr client) : Client_(std::move(client)) - , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext(), id)) + , Impl_(::MakeIntrusive<TOperationImpl>( + Client_->GetRawClient(), + Client_->GetRetryPolicy(), + Client_->GetContext(), id)) { } diff --git a/yt/cpp/mapreduce/client/operation.h b/yt/cpp/mapreduce/client/operation.h index f866c73936..e5e675a1f4 100644 --- a/yt/cpp/mapreduce/client/operation.h +++ b/yt/cpp/mapreduce/client/operation.h @@ -179,6 +179,7 @@ void ExecuteVanilla( const TOperationOptions& options); EOperationBriefState CheckOperation( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, const TClientContext& context, const TOperationId& operationId); diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp index c91ac52275..caf1319468 100644 --- a/yt/cpp/mapreduce/client/transaction.cpp +++ b/yt/cpp/mapreduce/client/transaction.cpp @@ -118,14 +118,21 @@ const TTransactionId TPingableTransaction::GetId() const return TransactionId_; } -const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const { +const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const +{ return {MinPingInterval_, MaxPingInterval_}; } -const TClientContext TPingableTransaction::GetContext() const { +const TClientContext TPingableTransaction::GetContext() const +{ return Context_; } +void TPingableTransaction::Ping() const +{ + RawClient_->PingTx(TransactionId_); +} + void TPingableTransaction::Commit() { Stop(EStopAction::Commit); diff --git a/yt/cpp/mapreduce/client/transaction.h b/yt/cpp/mapreduce/client/transaction.h index d596faf770..5e75b74ca6 100644 --- a/yt/cpp/mapreduce/client/transaction.h +++ b/yt/cpp/mapreduce/client/transaction.h @@ -44,6 +44,7 @@ public: const std::pair<TDuration, TDuration> GetPingInterval() const; const TClientContext GetContext() const; + void Ping() const; void Commit(); void Abort(); void Detach(); diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp index 0a193352f7..ea42867715 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.cpp +++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp @@ -261,8 +261,7 @@ private: while (Running_) { TDuration waitTime = minPingInterval + (maxPingInterval - minPingInterval) * RandomNumber<float>(); try { - auto noRetryPolicy = MakeIntrusive<TAttemptLimitedRetryPolicy>(1u, PingableTx_->GetContext().Config); - NDetail::NRawClient::PingTx(noRetryPolicy, PingableTx_->GetContext(), PingableTx_->GetId()); + PingableTx_->Ping(); } catch (const std::exception& e) { if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) { if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) { diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 7bd8e90947..2229486a01 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -1,6 +1,7 @@ #pragma once #include "client_method_options.h" +#include "operation.h" namespace NYT { @@ -110,6 +111,45 @@ public: const TVector<TRichYPath>& sourcePaths, const TRichYPath& destinationPath, const TConcatenateOptions& options = {}) = 0; + + // Transactions + + virtual void PingTx(const TTransactionId& transactionId) = 0; + + // Operations + + virtual TOperationAttributes GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options = {}) = 0; + + virtual TOperationAttributes GetOperation( + const TString& operationId, + const TGetOperationOptions& options = {}) = 0; + + virtual void AbortOperation( + TMutationId& mutationId, + const TOperationId& operationId) = 0; + + virtual void CompleteOperation( + TMutationId& mutationId, + const TOperationId& operationId) = 0; + + virtual void SuspendOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TSuspendOperationOptions& options = {}) = 0; + + virtual void ResumeOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TResumeOperationOptions& options = {}) = 0; + + virtual TListOperationsResult ListOperations(const TListOperationsOptions& options = {}) = 0; + + virtual void UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options = {}) = 0; + }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 351e00f7ed..7bc83b8ebc 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -1,5 +1,6 @@ #include "raw_client.h" +#include "raw_requests.h" #include "rpc_parameters_serialization.h" #include <yt/cpp/mapreduce/common/helpers.h> @@ -8,6 +9,8 @@ #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> +#include <yt/cpp/mapreduce/interface/operation.h> + #include <library/cpp/yson/node/node_io.h> namespace NYT::NDetail { @@ -239,6 +242,139 @@ void THttpRawClient::Concatenate( RequestWithoutRetry(Context_, mutationId, header); } +void THttpRawClient::PingTx(const TTransactionId& transactionId) +{ + TMutationId mutationId; + THttpHeader header("POST", "ping_tx"); + header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId)); + TRequestConfig requestConfig; + requestConfig.HttpConfig = NHttpClient::THttpConfig{ + .SocketTimeout = Context_.Config->PingTimeout + }; + RequestWithoutRetry(Context_, mutationId, header); +} + +TOperationAttributes THttpRawClient::GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "get_operation"); + header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options)); + auto result = RequestWithoutRetry(Context_, mutationId, header); + return NRawClient::ParseOperationAttributes(NodeFromYsonString(result.Response)); +} + +TOperationAttributes THttpRawClient::GetOperation( + const TString& alias, + const TGetOperationOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "get_operation"); + header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options)); + auto result = RequestWithoutRetry(Context_, mutationId, header); + return NRawClient::ParseOperationAttributes(NodeFromYsonString(result.Response)); +} + +void THttpRawClient::AbortOperation( + TMutationId& mutationId, + const TOperationId& operationId) +{ + THttpHeader header("POST", "abort_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId)); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::CompleteOperation( + TMutationId& mutationId, + const TOperationId& operationId) +{ + THttpHeader header("POST", "complete_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId)); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::SuspendOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TSuspendOperationOptions& options) +{ + THttpHeader header("POST", "suspend_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options)); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::ResumeOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TResumeOperationOptions& options) +{ + THttpHeader header("POST", "resume_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options)); + RequestWithoutRetry(Context_, mutationId, header); +} + +template <typename TKey> +static THashMap<TKey, i64> GetCounts(const TNode& countsNode) +{ + THashMap<TKey, i64> counts; + for (const auto& entry : countsNode.AsMap()) { + counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64()); + } + return counts; +} + +TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "list_operations"); + header.MergeParameters(NRawClient::SerializeParamsForListOperations(options)); + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + + const auto& operationNodesList = resultNode["operations"].AsList(); + + TListOperationsResult result; + result.Operations.reserve(operationNodesList.size()); + for (const auto& operationNode : operationNodesList) { + result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode)); + } + + if (resultNode.HasKey("pool_counts")) { + result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]); + } + if (resultNode.HasKey("user_counts")) { + result.UserCounts = GetCounts<TString>(resultNode["user_counts"]); + } + if (resultNode.HasKey("type_counts")) { + result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]); + } + if (resultNode.HasKey("state_counts")) { + result.StateCounts = GetCounts<TString>(resultNode["state_counts"]); + } + if (resultNode.HasKey("failed_jobs_count")) { + result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64(); + } + + result.Incomplete = resultNode["incomplete"].AsBool(); + + return result; +} + +void THttpRawClient::UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options) +{ + TMutationId mutationId; + THttpHeader header("POST", "update_op_parameters"); + header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options)); + RequestWithoutRetry(Context_, mutationId, header); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index 756c70f211..c3771497fb 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -116,6 +116,44 @@ public: const TRichYPath& destinationPath, const TConcatenateOptions& options = {}) override; + // Transactions + + void PingTx(const TTransactionId& transactionId) override; + + // Operations + + TOperationAttributes GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options = {}) override; + + TOperationAttributes GetOperation( + const TString& operationId, + const TGetOperationOptions& options = {}) override; + + void AbortOperation( + TMutationId& mutationId, + const TOperationId& operationId) override; + + void CompleteOperation( + TMutationId& mutationId, + const TOperationId& operationId) override; + + void SuspendOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TSuspendOperationOptions& options = {}) override; + + void ResumeOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TResumeOperationOptions& options = {}) override; + + TListOperationsResult ListOperations(const TListOperationsOptions& options = {}) override; + + void UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options = {}) override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index bdaff49d3b..bb69728cb2 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -79,20 +79,6 @@ void ExecuteBatch( } } -void PingTx( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId) -{ - THttpHeader header("POST", "ping_tx"); - header.MergeParameters(SerializeParamsForPingTx(transactionId)); - TRequestConfig requestConfig; - requestConfig.HttpConfig = NHttpClient::THttpConfig{ - .SocketTimeout = context.Config->PingTimeout - }; - RetryRequestWithPolicy(retryPolicy, context, header, {}, requestConfig); -} - TOperationAttributes ParseOperationAttributes(const TNode& node) { const auto& mapNode = node.AsMap(); @@ -209,133 +195,6 @@ TOperationAttributes ParseOperationAttributes(const TNode& node) return result; } -TOperationAttributes GetOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TGetOperationOptions& options) -{ - THttpHeader header("GET", "get_operation"); - header.MergeParameters(SerializeParamsForGetOperation(operationId, options)); - auto result = RetryRequestWithPolicy(retryPolicy, context, header); - return ParseOperationAttributes(NodeFromYsonString(result.Response)); -} - -TOperationAttributes GetOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TString& alias, - const TGetOperationOptions& options) -{ - THttpHeader header("GET", "get_operation"); - header.MergeParameters(SerializeParamsForGetOperation(alias, options)); - auto result = RetryRequestWithPolicy(retryPolicy, context, header); - return ParseOperationAttributes(NodeFromYsonString(result.Response)); -} - -void AbortOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId) -{ - THttpHeader header("POST", "abort_op"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForAbortOperation(operationId)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - -void CompleteOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId) -{ - THttpHeader header("POST", "complete_op"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForCompleteOperation(operationId)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - -void SuspendOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TSuspendOperationOptions& options) -{ - THttpHeader header("POST", "suspend_op"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForSuspendOperation(operationId, options)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - -void ResumeOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TResumeOperationOptions& options) -{ - THttpHeader header("POST", "resume_op"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForResumeOperation(operationId, options)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - -template <typename TKey> -static THashMap<TKey, i64> GetCounts(const TNode& countsNode) -{ - THashMap<TKey, i64> counts; - for (const auto& entry : countsNode.AsMap()) { - counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64()); - } - return counts; -} - -TListOperationsResult ListOperations( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TListOperationsOptions& options) -{ - THttpHeader header("GET", "list_operations"); - header.MergeParameters(SerializeParamsForListOperations(options)); - auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); - auto resultNode = NodeFromYsonString(responseInfo.Response); - - TListOperationsResult result; - for (const auto& operationNode : resultNode["operations"].AsList()) { - result.Operations.push_back(ParseOperationAttributes(operationNode)); - } - - if (resultNode.HasKey("pool_counts")) { - result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]); - } - if (resultNode.HasKey("user_counts")) { - result.UserCounts = GetCounts<TString>(resultNode["user_counts"]); - } - if (resultNode.HasKey("type_counts")) { - result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]); - } - if (resultNode.HasKey("state_counts")) { - result.StateCounts = GetCounts<TString>(resultNode["state_counts"]); - } - if (resultNode.HasKey("failed_jobs_count")) { - result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64(); - } - - result.Incomplete = resultNode["incomplete"].AsBool(); - - return result; -} - -void UpdateOperationParameters( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TUpdateOperationParametersOptions& options) -{ - THttpHeader header("POST", "update_op_parameters"); - header.MergeParameters(SerializeParamsForUpdateOperationParameters(operationId, options)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - TJobAttributes ParseJobAttributes(const TNode& node) { const auto& mapNode = node.AsMap(); diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 04c18a753b..28af3ee989 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -36,64 +36,6 @@ void ExecuteBatch( const TExecuteBatchOptions& options = TExecuteBatchOptions()); // -// Transactions -// - -void PingTx( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId); - -// -// Operations -// - -TOperationAttributes GetOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TGetOperationOptions& options = TGetOperationOptions()); - -TOperationAttributes GetOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TString& operationId, - const TGetOperationOptions& options = TGetOperationOptions()); - -void AbortOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId); - -void CompleteOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId); - -void SuspendOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TSuspendOperationOptions& options = TSuspendOperationOptions()); - -void ResumeOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TResumeOperationOptions& options = TResumeOperationOptions()); - -TListOperationsResult ListOperations( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TListOperationsOptions& options = TListOperationsOptions()); - -void UpdateOperationParameters( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TUpdateOperationParametersOptions& options = TUpdateOperationParametersOptions()); - -// // Jobs // |