aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2024-12-16 18:45:28 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2024-12-16 19:11:27 +0300
commitdae2dbe3496d7557b1ece64d5464bd8e686995a8 (patch)
tree72899b4fb6279a620a14f71005d81afbbe93cd35
parentb1cde7dcb055fb6f3367e81fd0f57bd55b8bb93c (diff)
downloadydb-dae2dbe3496d7557b1ece64d5464bd8e686995a8.tar.gz
[yt/cpp/mapreduce] YT-23616: Move Transaction and Operation methods to THttpRawClient
commit_hash:b093be44005f3d9da9779444cbbc32b93f7372ee
-rw-r--r--yt/cpp/mapreduce/client/abortable_registry.cpp20
-rw-r--r--yt/cpp/mapreduce/client/abortable_registry.h7
-rw-r--r--yt/cpp/mapreduce/client/client.cpp59
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp96
-rw-r--r--yt/cpp/mapreduce/client/operation.h1
-rw-r--r--yt/cpp/mapreduce/client/transaction.cpp11
-rw-r--r--yt/cpp/mapreduce/client/transaction.h1
-rw-r--r--yt/cpp/mapreduce/client/transaction_pinger.cpp3
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h40
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp136
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h38
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp141
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h58
13 files changed, 362 insertions, 249 deletions
diff --git a/yt/cpp/mapreduce/client/abortable_registry.cpp b/yt/cpp/mapreduce/client/abortable_registry.cpp
index 283d39e049..91308cba05 100644
--- a/yt/cpp/mapreduce/client/abortable_registry.cpp
+++ b/yt/cpp/mapreduce/client/abortable_registry.cpp
@@ -2,7 +2,11 @@
#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
#include <yt/cpp/mapreduce/interface/common.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
+
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <util/generic/singleton.h>
@@ -31,16 +35,22 @@ TString TTransactionAbortable::GetType() const
////////////////////////////////////////////////////////////////////////////////
-TOperationAbortable::TOperationAbortable(IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TOperationId& operationId)
- : ClientRetryPolicy_(std::move(clientRetryPolicy))
- , Context_(std::move(context))
+TOperationAbortable::TOperationAbortable(
+ IRawClientPtr rawClient,
+ IClientRetryPolicyPtr clientRetryPolicy,
+ const TOperationId& operationId)
+ : RawClient_(std::move(rawClient))
+ , ClientRetryPolicy_(std::move(clientRetryPolicy))
, OperationId_(operationId)
{ }
-
void TOperationAbortable::Abort()
{
- AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, OperationId_);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this] (TMutationId& mutationId) {
+ RawClient_->AbortOperation(mutationId, OperationId_);
+ });
}
TString TOperationAbortable::GetType() const
diff --git a/yt/cpp/mapreduce/client/abortable_registry.h b/yt/cpp/mapreduce/client/abortable_registry.h
index 119d685cad..7d8996405e 100644
--- a/yt/cpp/mapreduce/client/abortable_registry.h
+++ b/yt/cpp/mapreduce/client/abortable_registry.h
@@ -46,13 +46,16 @@ class TOperationAbortable
: public IAbortable
{
public:
- TOperationAbortable(IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TOperationId& operationId);
+ TOperationAbortable(
+ IRawClientPtr rawClient,
+ IClientRetryPolicyPtr clientRetryPolicy,
+ const TOperationId& operationId);
void Abort() override;
TString GetType() const override;
private:
+ const IRawClientPtr RawClient_;
const IClientRetryPolicyPtr ClientRetryPolicy_;
- const TClientContext Context_;
const TOperationId OperationId_;
};
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 22940c1de5..66cacf6b45 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -711,17 +711,25 @@ IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId)
EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId)
{
- return NYT::NDetail::CheckOperation(ClientRetryPolicy_, Context_, operationId);
+ return NYT::NDetail::CheckOperation(RawClient_, ClientRetryPolicy_, Context_, operationId);
}
void TClientBase::AbortOperation(const TOperationId& operationId)
{
- NRawClient::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId] (TMutationId& mutationId) {
+ RawClient_->AbortOperation(mutationId, operationId);
+ });
}
void TClientBase::CompleteOperation(const TOperationId& operationId)
{
- NRawClient::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId] (TMutationId& mutationId) {
+ RawClient_->CompleteOperation(mutationId, operationId);
+ });
}
void TClientBase::WaitForOperation(const TOperationId& operationId)
@@ -996,7 +1004,11 @@ void TTransaction::Abort()
void TTransaction::Ping()
{
- PingTx(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this] (TMutationId /*mutationId*/) {
+ RawClient_->PingTx(TransactionId_);
+ });
}
void TTransaction::Detach()
@@ -1265,7 +1277,11 @@ TOperationAttributes TClient::GetOperation(
const TGetOperationOptions& options)
{
CheckShutdown();
- return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
+ return RequestWithRetry<TOperationAttributes>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->GetOperation(operationId, options);
+ });
}
TOperationAttributes TClient::GetOperation(
@@ -1273,14 +1289,21 @@ TOperationAttributes TClient::GetOperation(
const TGetOperationOptions& options)
{
CheckShutdown();
- return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, alias, options);
+ return RequestWithRetry<TOperationAttributes>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &alias, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->GetOperation(alias, options);
+ });
}
-TListOperationsResult TClient::ListOperations(
- const TListOperationsOptions& options)
+TListOperationsResult TClient::ListOperations(const TListOperationsOptions& options)
{
CheckShutdown();
- return NRawClient::ListOperations(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, options);
+ return RequestWithRetry<TListOperationsResult>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->ListOperations(options);
+ });
}
void TClient::UpdateOperationParameters(
@@ -1288,7 +1311,11 @@ void TClient::UpdateOperationParameters(
const TUpdateOperationParametersOptions& options)
{
CheckShutdown();
- return NRawClient::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId, &options] (TMutationId /*mutationId*/) {
+ RawClient_->UpdateOperationParameters(operationId, options);
+ });
}
TJobAttributes TClient::GetJob(
@@ -1379,7 +1406,11 @@ void TClient::SuspendOperation(
const TSuspendOperationOptions& options)
{
CheckShutdown();
- NRawClient::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId, &options] (TMutationId& mutationId) {
+ RawClient_->SuspendOperation(mutationId, operationId, options);
+ });
}
void TClient::ResumeOperation(
@@ -1387,7 +1418,11 @@ void TClient::ResumeOperation(
const TResumeOperationOptions& options)
{
CheckShutdown();
- NRawClient::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId, &options] (TMutationId& mutationId) {
+ RawClient_->ResumeOperation(mutationId, operationId, options);
+ });
}
TYtPoller& TClient::GetYtPoller()
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index c5a9d7f523..9f8c14647a 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -552,17 +552,21 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
}
EOperationBriefState CheckOperation(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TClientContext& context,
const TOperationId& operationId)
{
- auto attributes = GetOperation(
+ auto attributes = RequestWithRetry<TOperationAttributes>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- operationId,
- TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
- .Add(EOperationAttribute::State)
- .Add(EOperationAttribute::Result)));
+ [&rawClient, &operationId] (TMutationId /*mutationId*/) {
+ return rawClient->GetOperation(
+ operationId,
+ TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
+ .Add(EOperationAttribute::State)
+ .Add(EOperationAttribute::Result)));
+ });
+
Y_ABORT_UNLESS(attributes.BriefState,
"get_operation for operation %s has not returned \"state\" field",
GetGuidAsString(operationId).data());
@@ -604,7 +608,7 @@ void WaitForOperation(
: context.Config->OperationTrackerPollPeriod;
while (true) {
- auto status = CheckOperation(clientRetryPolicy, context, operationId);
+ auto status = CheckOperation(rawClient, clientRetryPolicy, context, operationId);
if (status == EOperationBriefState::Completed) {
YT_LOG_INFO("Operation %v completed (%v)",
operationId,
@@ -2237,11 +2241,13 @@ class TOperation::TOperationImpl
{
public:
TOperationImpl(
+ IRawClientPtr rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
TClientContext context,
const TMaybe<TOperationId>& operationId = {})
- : ClientRetryPolicy_(clientRetryPolicy)
+ : RawClient_(std::move(rawClient))
, Context_(std::move(context))
+ , ClientRetryPolicy_(clientRetryPolicy)
, Id_(operationId)
, PreparedPromise_(::NThreading::NewPromise<void>())
, StartedPromise_(::NThreading::NewPromise<void>())
@@ -2308,8 +2314,10 @@ private:
void ValidateOperationStarted() const;
private:
- IClientRetryPolicyPtr ClientRetryPolicy_;
+ const IRawClientPtr RawClient_;
const TClientContext Context_;
+
+ IClientRetryPolicyPtr ClientRetryPolicy_;
TMaybe<TOperationId> Id_;
TMutex Lock_;
@@ -2509,7 +2517,7 @@ void TOperation::TOperationImpl::OnStatusUpdated(const TString& newStatus)
auto registry = TAbortableRegistry::Get();
registry->Add(
operationId,
- ::MakeIntrusive<TOperationAbortable>(this_->ClientRetryPolicy_, this_->Context_, operationId));
+ ::MakeIntrusive<TOperationAbortable>(this_->RawClient_, this_->ClientRetryPolicy_, operationId));
// We have to own an IntrusivePtr to registry to prevent use-after-free
auto removeOperation = [registry, operationId] (const ::NThreading::TFuture<void>&) {
registry->Remove(operationId);
@@ -2632,7 +2640,9 @@ void TOperation::TOperationImpl::OnStarted(const TOperationId& operationId)
StartedPromise_.SetValue();
}
-void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func)
+void TOperation::TOperationImpl::UpdateAttributesAndCall(
+ bool needJobStatistics,
+ std::function<void(const TOperationAttributes&)> func)
{
{
auto g = Guard(Lock_);
@@ -2645,15 +2655,17 @@ void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics,
}
}
- TOperationAttributes attributes = NDetail::GetOperation(
+ auto attributes = RequestWithRetry<TOperationAttributes>(
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- Context_,
- *Id_,
- TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
- .Add(EOperationAttribute::Result)
- .Add(EOperationAttribute::Progress)
- .Add(EOperationAttribute::State)
- .Add(EOperationAttribute::BriefProgress)));
+ [this] (TMutationId /*mutationId*/) {
+ return RawClient_->GetOperation(
+ *Id_,
+ TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
+ .Add(EOperationAttribute::Result)
+ .Add(EOperationAttribute::Progress)
+ .Add(EOperationAttribute::State)
+ .Add(EOperationAttribute::BriefProgress)));
+ });
func(attributes);
@@ -2672,37 +2684,61 @@ void TOperation::TOperationImpl::FinishWithException(std::exception_ptr e)
void TOperation::TOperationImpl::AbortOperation()
{
ValidateOperationStarted();
- NYT::NDetail::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this] (TMutationId& mutationId) {
+ RawClient_->AbortOperation(mutationId, *Id_);
+ });
}
void TOperation::TOperationImpl::CompleteOperation()
{
ValidateOperationStarted();
- NYT::NDetail::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this] (TMutationId& mutationId) {
+ RawClient_->CompleteOperation(mutationId, *Id_);
+ });
}
void TOperation::TOperationImpl::SuspendOperation(const TSuspendOperationOptions& options)
{
ValidateOperationStarted();
- NYT::NDetail::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId& mutationId) {
+ RawClient_->SuspendOperation(mutationId, *Id_, options);
+ });
}
void TOperation::TOperationImpl::ResumeOperation(const TResumeOperationOptions& options)
{
ValidateOperationStarted();
- NYT::NDetail::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId& mutationId) {
+ RawClient_->ResumeOperation(mutationId, *Id_, options);
+ });
}
TOperationAttributes TOperation::TOperationImpl::GetAttributes(const TGetOperationOptions& options)
{
ValidateOperationStarted();
- return NYT::NDetail::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ return RequestWithRetry<TOperationAttributes>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->GetOperation(*Id_, options);
+ });
}
void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParametersOptions& options)
{
ValidateOperationStarted();
- return NYT::NDetail::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId /*mutationId*/) {
+ RawClient_->UpdateOperationParameters(*Id_, options);
+ });
}
TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options)
@@ -2815,13 +2851,19 @@ const TClientContext& TOperation::TOperationImpl::GetContext() const
TOperation::TOperation(TClientPtr client)
: Client_(std::move(client))
- , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext()))
+ , Impl_(::MakeIntrusive<TOperationImpl>(
+ Client_->GetRawClient(),
+ Client_->GetRetryPolicy(),
+ Client_->GetContext()))
{
}
TOperation::TOperation(TOperationId id, TClientPtr client)
: Client_(std::move(client))
- , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext(), id))
+ , Impl_(::MakeIntrusive<TOperationImpl>(
+ Client_->GetRawClient(),
+ Client_->GetRetryPolicy(),
+ Client_->GetContext(), id))
{
}
diff --git a/yt/cpp/mapreduce/client/operation.h b/yt/cpp/mapreduce/client/operation.h
index f866c73936..e5e675a1f4 100644
--- a/yt/cpp/mapreduce/client/operation.h
+++ b/yt/cpp/mapreduce/client/operation.h
@@ -179,6 +179,7 @@ void ExecuteVanilla(
const TOperationOptions& options);
EOperationBriefState CheckOperation(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TClientContext& context,
const TOperationId& operationId);
diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp
index c91ac52275..caf1319468 100644
--- a/yt/cpp/mapreduce/client/transaction.cpp
+++ b/yt/cpp/mapreduce/client/transaction.cpp
@@ -118,14 +118,21 @@ const TTransactionId TPingableTransaction::GetId() const
return TransactionId_;
}
-const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const {
+const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const
+{
return {MinPingInterval_, MaxPingInterval_};
}
-const TClientContext TPingableTransaction::GetContext() const {
+const TClientContext TPingableTransaction::GetContext() const
+{
return Context_;
}
+void TPingableTransaction::Ping() const
+{
+ RawClient_->PingTx(TransactionId_);
+}
+
void TPingableTransaction::Commit()
{
Stop(EStopAction::Commit);
diff --git a/yt/cpp/mapreduce/client/transaction.h b/yt/cpp/mapreduce/client/transaction.h
index d596faf770..5e75b74ca6 100644
--- a/yt/cpp/mapreduce/client/transaction.h
+++ b/yt/cpp/mapreduce/client/transaction.h
@@ -44,6 +44,7 @@ public:
const std::pair<TDuration, TDuration> GetPingInterval() const;
const TClientContext GetContext() const;
+ void Ping() const;
void Commit();
void Abort();
void Detach();
diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp
index 0a193352f7..ea42867715 100644
--- a/yt/cpp/mapreduce/client/transaction_pinger.cpp
+++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp
@@ -261,8 +261,7 @@ private:
while (Running_) {
TDuration waitTime = minPingInterval + (maxPingInterval - minPingInterval) * RandomNumber<float>();
try {
- auto noRetryPolicy = MakeIntrusive<TAttemptLimitedRetryPolicy>(1u, PingableTx_->GetContext().Config);
- NDetail::NRawClient::PingTx(noRetryPolicy, PingableTx_->GetContext(), PingableTx_->GetId());
+ PingableTx_->Ping();
} catch (const std::exception& e) {
if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 7bd8e90947..2229486a01 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -1,6 +1,7 @@
#pragma once
#include "client_method_options.h"
+#include "operation.h"
namespace NYT {
@@ -110,6 +111,45 @@ public:
const TVector<TRichYPath>& sourcePaths,
const TRichYPath& destinationPath,
const TConcatenateOptions& options = {}) = 0;
+
+ // Transactions
+
+ virtual void PingTx(const TTransactionId& transactionId) = 0;
+
+ // Operations
+
+ virtual TOperationAttributes GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options = {}) = 0;
+
+ virtual TOperationAttributes GetOperation(
+ const TString& operationId,
+ const TGetOperationOptions& options = {}) = 0;
+
+ virtual void AbortOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId) = 0;
+
+ virtual void CompleteOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId) = 0;
+
+ virtual void SuspendOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options = {}) = 0;
+
+ virtual void ResumeOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options = {}) = 0;
+
+ virtual TListOperationsResult ListOperations(const TListOperationsOptions& options = {}) = 0;
+
+ virtual void UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options = {}) = 0;
+
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 351e00f7ed..7bc83b8ebc 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -1,5 +1,6 @@
#include "raw_client.h"
+#include "raw_requests.h"
#include "rpc_parameters_serialization.h"
#include <yt/cpp/mapreduce/common/helpers.h>
@@ -8,6 +9,8 @@
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+
#include <library/cpp/yson/node/node_io.h>
namespace NYT::NDetail {
@@ -239,6 +242,139 @@ void THttpRawClient::Concatenate(
RequestWithoutRetry(Context_, mutationId, header);
}
+void THttpRawClient::PingTx(const TTransactionId& transactionId)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "ping_tx");
+ header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId));
+ TRequestConfig requestConfig;
+ requestConfig.HttpConfig = NHttpClient::THttpConfig{
+ .SocketTimeout = Context_.Config->PingTimeout
+ };
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+TOperationAttributes THttpRawClient::GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_operation");
+ header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options));
+ auto result = RequestWithoutRetry(Context_, mutationId, header);
+ return NRawClient::ParseOperationAttributes(NodeFromYsonString(result.Response));
+}
+
+TOperationAttributes THttpRawClient::GetOperation(
+ const TString& alias,
+ const TGetOperationOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_operation");
+ header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options));
+ auto result = RequestWithoutRetry(Context_, mutationId, header);
+ return NRawClient::ParseOperationAttributes(NodeFromYsonString(result.Response));
+}
+
+void THttpRawClient::AbortOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId)
+{
+ THttpHeader header("POST", "abort_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+void THttpRawClient::CompleteOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId)
+{
+ THttpHeader header("POST", "complete_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+void THttpRawClient::SuspendOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options)
+{
+ THttpHeader header("POST", "suspend_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+void THttpRawClient::ResumeOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options)
+{
+ THttpHeader header("POST", "resume_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+template <typename TKey>
+static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
+{
+ THashMap<TKey, i64> counts;
+ for (const auto& entry : countsNode.AsMap()) {
+ counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
+ }
+ return counts;
+}
+
+TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "list_operations");
+ header.MergeParameters(NRawClient::SerializeParamsForListOperations(options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+
+ const auto& operationNodesList = resultNode["operations"].AsList();
+
+ TListOperationsResult result;
+ result.Operations.reserve(operationNodesList.size());
+ for (const auto& operationNode : operationNodesList) {
+ result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode));
+ }
+
+ if (resultNode.HasKey("pool_counts")) {
+ result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
+ }
+ if (resultNode.HasKey("user_counts")) {
+ result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
+ }
+ if (resultNode.HasKey("type_counts")) {
+ result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
+ }
+ if (resultNode.HasKey("state_counts")) {
+ result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
+ }
+ if (resultNode.HasKey("failed_jobs_count")) {
+ result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
+ }
+
+ result.Incomplete = resultNode["incomplete"].AsBool();
+
+ return result;
+}
+
+void THttpRawClient::UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "update_op_parameters");
+ header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index 756c70f211..c3771497fb 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -116,6 +116,44 @@ public:
const TRichYPath& destinationPath,
const TConcatenateOptions& options = {}) override;
+ // Transactions
+
+ void PingTx(const TTransactionId& transactionId) override;
+
+ // Operations
+
+ TOperationAttributes GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options = {}) override;
+
+ TOperationAttributes GetOperation(
+ const TString& operationId,
+ const TGetOperationOptions& options = {}) override;
+
+ void AbortOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId) override;
+
+ void CompleteOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId) override;
+
+ void SuspendOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options = {}) override;
+
+ void ResumeOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options = {}) override;
+
+ TListOperationsResult ListOperations(const TListOperationsOptions& options = {}) override;
+
+ void UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options = {}) override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index bdaff49d3b..bb69728cb2 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -79,20 +79,6 @@ void ExecuteBatch(
}
}
-void PingTx(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId)
-{
- THttpHeader header("POST", "ping_tx");
- header.MergeParameters(SerializeParamsForPingTx(transactionId));
- TRequestConfig requestConfig;
- requestConfig.HttpConfig = NHttpClient::THttpConfig{
- .SocketTimeout = context.Config->PingTimeout
- };
- RetryRequestWithPolicy(retryPolicy, context, header, {}, requestConfig);
-}
-
TOperationAttributes ParseOperationAttributes(const TNode& node)
{
const auto& mapNode = node.AsMap();
@@ -209,133 +195,6 @@ TOperationAttributes ParseOperationAttributes(const TNode& node)
return result;
}
-TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TGetOperationOptions& options)
-{
- THttpHeader header("GET", "get_operation");
- header.MergeParameters(SerializeParamsForGetOperation(operationId, options));
- auto result = RetryRequestWithPolicy(retryPolicy, context, header);
- return ParseOperationAttributes(NodeFromYsonString(result.Response));
-}
-
-TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TString& alias,
- const TGetOperationOptions& options)
-{
- THttpHeader header("GET", "get_operation");
- header.MergeParameters(SerializeParamsForGetOperation(alias, options));
- auto result = RetryRequestWithPolicy(retryPolicy, context, header);
- return ParseOperationAttributes(NodeFromYsonString(result.Response));
-}
-
-void AbortOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId)
-{
- THttpHeader header("POST", "abort_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForAbortOperation(operationId));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
-void CompleteOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId)
-{
- THttpHeader header("POST", "complete_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForCompleteOperation(operationId));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
-void SuspendOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TSuspendOperationOptions& options)
-{
- THttpHeader header("POST", "suspend_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForSuspendOperation(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
-void ResumeOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TResumeOperationOptions& options)
-{
- THttpHeader header("POST", "resume_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForResumeOperation(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
-template <typename TKey>
-static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
-{
- THashMap<TKey, i64> counts;
- for (const auto& entry : countsNode.AsMap()) {
- counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
- }
- return counts;
-}
-
-TListOperationsResult ListOperations(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TListOperationsOptions& options)
-{
- THttpHeader header("GET", "list_operations");
- header.MergeParameters(SerializeParamsForListOperations(options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
-
- TListOperationsResult result;
- for (const auto& operationNode : resultNode["operations"].AsList()) {
- result.Operations.push_back(ParseOperationAttributes(operationNode));
- }
-
- if (resultNode.HasKey("pool_counts")) {
- result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
- }
- if (resultNode.HasKey("user_counts")) {
- result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
- }
- if (resultNode.HasKey("type_counts")) {
- result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
- }
- if (resultNode.HasKey("state_counts")) {
- result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
- }
- if (resultNode.HasKey("failed_jobs_count")) {
- result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
- }
-
- result.Incomplete = resultNode["incomplete"].AsBool();
-
- return result;
-}
-
-void UpdateOperationParameters(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options)
-{
- THttpHeader header("POST", "update_op_parameters");
- header.MergeParameters(SerializeParamsForUpdateOperationParameters(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
TJobAttributes ParseJobAttributes(const TNode& node)
{
const auto& mapNode = node.AsMap();
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 04c18a753b..28af3ee989 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -36,64 +36,6 @@ void ExecuteBatch(
const TExecuteBatchOptions& options = TExecuteBatchOptions());
//
-// Transactions
-//
-
-void PingTx(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId);
-
-//
-// Operations
-//
-
-TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TGetOperationOptions& options = TGetOperationOptions());
-
-TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TString& operationId,
- const TGetOperationOptions& options = TGetOperationOptions());
-
-void AbortOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId);
-
-void CompleteOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId);
-
-void SuspendOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TSuspendOperationOptions& options = TSuspendOperationOptions());
-
-void ResumeOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TResumeOperationOptions& options = TResumeOperationOptions());
-
-TListOperationsResult ListOperations(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TListOperationsOptions& options = TListOperationsOptions());
-
-void UpdateOperationParameters(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options = TUpdateOperationParametersOptions());
-
-//
// Jobs
//