summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/raw_client
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-16 18:45:28 +0300
committerhiddenpath <[email protected]>2024-12-16 19:11:27 +0300
commitdae2dbe3496d7557b1ece64d5464bd8e686995a8 (patch)
tree72899b4fb6279a620a14f71005d81afbbe93cd35 /yt/cpp/mapreduce/raw_client
parentb1cde7dcb055fb6f3367e81fd0f57bd55b8bb93c (diff)
[yt/cpp/mapreduce] YT-23616: Move Transaction and Operation methods to THttpRawClient
commit_hash:b093be44005f3d9da9779444cbbc32b93f7372ee
Diffstat (limited to 'yt/cpp/mapreduce/raw_client')
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp136
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h38
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp141
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h58
4 files changed, 174 insertions, 199 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 351e00f7edf..7bc83b8ebc2 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -1,5 +1,6 @@
#include "raw_client.h"
+#include "raw_requests.h"
#include "rpc_parameters_serialization.h"
#include <yt/cpp/mapreduce/common/helpers.h>
@@ -8,6 +9,8 @@
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+
#include <library/cpp/yson/node/node_io.h>
namespace NYT::NDetail {
@@ -239,6 +242,139 @@ void THttpRawClient::Concatenate(
RequestWithoutRetry(Context_, mutationId, header);
}
+void THttpRawClient::PingTx(const TTransactionId& transactionId)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "ping_tx");
+ header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId));
+ TRequestConfig requestConfig;
+ requestConfig.HttpConfig = NHttpClient::THttpConfig{
+ .SocketTimeout = Context_.Config->PingTimeout
+ };
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+TOperationAttributes THttpRawClient::GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_operation");
+ header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options));
+ auto result = RequestWithoutRetry(Context_, mutationId, header);
+ return NRawClient::ParseOperationAttributes(NodeFromYsonString(result.Response));
+}
+
+TOperationAttributes THttpRawClient::GetOperation(
+ const TString& alias,
+ const TGetOperationOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_operation");
+ header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options));
+ auto result = RequestWithoutRetry(Context_, mutationId, header);
+ return NRawClient::ParseOperationAttributes(NodeFromYsonString(result.Response));
+}
+
+void THttpRawClient::AbortOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId)
+{
+ THttpHeader header("POST", "abort_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+void THttpRawClient::CompleteOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId)
+{
+ THttpHeader header("POST", "complete_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+void THttpRawClient::SuspendOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options)
+{
+ THttpHeader header("POST", "suspend_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+void THttpRawClient::ResumeOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options)
+{
+ THttpHeader header("POST", "resume_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+template <typename TKey>
+static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
+{
+ THashMap<TKey, i64> counts;
+ for (const auto& entry : countsNode.AsMap()) {
+ counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
+ }
+ return counts;
+}
+
+TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "list_operations");
+ header.MergeParameters(NRawClient::SerializeParamsForListOperations(options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+
+ const auto& operationNodesList = resultNode["operations"].AsList();
+
+ TListOperationsResult result;
+ result.Operations.reserve(operationNodesList.size());
+ for (const auto& operationNode : operationNodesList) {
+ result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode));
+ }
+
+ if (resultNode.HasKey("pool_counts")) {
+ result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
+ }
+ if (resultNode.HasKey("user_counts")) {
+ result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
+ }
+ if (resultNode.HasKey("type_counts")) {
+ result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
+ }
+ if (resultNode.HasKey("state_counts")) {
+ result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
+ }
+ if (resultNode.HasKey("failed_jobs_count")) {
+ result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
+ }
+
+ result.Incomplete = resultNode["incomplete"].AsBool();
+
+ return result;
+}
+
+void THttpRawClient::UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "update_op_parameters");
+ header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index 756c70f2116..c3771497fba 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -116,6 +116,44 @@ public:
const TRichYPath& destinationPath,
const TConcatenateOptions& options = {}) override;
+ // Transactions
+
+ void PingTx(const TTransactionId& transactionId) override;
+
+ // Operations
+
+ TOperationAttributes GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options = {}) override;
+
+ TOperationAttributes GetOperation(
+ const TString& operationId,
+ const TGetOperationOptions& options = {}) override;
+
+ void AbortOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId) override;
+
+ void CompleteOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId) override;
+
+ void SuspendOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options = {}) override;
+
+ void ResumeOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options = {}) override;
+
+ TListOperationsResult ListOperations(const TListOperationsOptions& options = {}) override;
+
+ void UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options = {}) override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index bdaff49d3be..bb69728cb2e 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -79,20 +79,6 @@ void ExecuteBatch(
}
}
-void PingTx(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId)
-{
- THttpHeader header("POST", "ping_tx");
- header.MergeParameters(SerializeParamsForPingTx(transactionId));
- TRequestConfig requestConfig;
- requestConfig.HttpConfig = NHttpClient::THttpConfig{
- .SocketTimeout = context.Config->PingTimeout
- };
- RetryRequestWithPolicy(retryPolicy, context, header, {}, requestConfig);
-}
-
TOperationAttributes ParseOperationAttributes(const TNode& node)
{
const auto& mapNode = node.AsMap();
@@ -209,133 +195,6 @@ TOperationAttributes ParseOperationAttributes(const TNode& node)
return result;
}
-TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TGetOperationOptions& options)
-{
- THttpHeader header("GET", "get_operation");
- header.MergeParameters(SerializeParamsForGetOperation(operationId, options));
- auto result = RetryRequestWithPolicy(retryPolicy, context, header);
- return ParseOperationAttributes(NodeFromYsonString(result.Response));
-}
-
-TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TString& alias,
- const TGetOperationOptions& options)
-{
- THttpHeader header("GET", "get_operation");
- header.MergeParameters(SerializeParamsForGetOperation(alias, options));
- auto result = RetryRequestWithPolicy(retryPolicy, context, header);
- return ParseOperationAttributes(NodeFromYsonString(result.Response));
-}
-
-void AbortOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId)
-{
- THttpHeader header("POST", "abort_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForAbortOperation(operationId));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
-void CompleteOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId)
-{
- THttpHeader header("POST", "complete_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForCompleteOperation(operationId));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
-void SuspendOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TSuspendOperationOptions& options)
-{
- THttpHeader header("POST", "suspend_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForSuspendOperation(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
-void ResumeOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TResumeOperationOptions& options)
-{
- THttpHeader header("POST", "resume_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForResumeOperation(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
-template <typename TKey>
-static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
-{
- THashMap<TKey, i64> counts;
- for (const auto& entry : countsNode.AsMap()) {
- counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
- }
- return counts;
-}
-
-TListOperationsResult ListOperations(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TListOperationsOptions& options)
-{
- THttpHeader header("GET", "list_operations");
- header.MergeParameters(SerializeParamsForListOperations(options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
-
- TListOperationsResult result;
- for (const auto& operationNode : resultNode["operations"].AsList()) {
- result.Operations.push_back(ParseOperationAttributes(operationNode));
- }
-
- if (resultNode.HasKey("pool_counts")) {
- result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
- }
- if (resultNode.HasKey("user_counts")) {
- result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
- }
- if (resultNode.HasKey("type_counts")) {
- result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
- }
- if (resultNode.HasKey("state_counts")) {
- result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
- }
- if (resultNode.HasKey("failed_jobs_count")) {
- result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
- }
-
- result.Incomplete = resultNode["incomplete"].AsBool();
-
- return result;
-}
-
-void UpdateOperationParameters(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options)
-{
- THttpHeader header("POST", "update_op_parameters");
- header.MergeParameters(SerializeParamsForUpdateOperationParameters(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
-}
-
TJobAttributes ParseJobAttributes(const TNode& node)
{
const auto& mapNode = node.AsMap();
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 04c18a753b4..28af3ee9896 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -36,64 +36,6 @@ void ExecuteBatch(
const TExecuteBatchOptions& options = TExecuteBatchOptions());
//
-// Transactions
-//
-
-void PingTx(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId);
-
-//
-// Operations
-//
-
-TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TGetOperationOptions& options = TGetOperationOptions());
-
-TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TString& operationId,
- const TGetOperationOptions& options = TGetOperationOptions());
-
-void AbortOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId);
-
-void CompleteOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId);
-
-void SuspendOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TSuspendOperationOptions& options = TSuspendOperationOptions());
-
-void ResumeOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TResumeOperationOptions& options = TResumeOperationOptions());
-
-TListOperationsResult ListOperations(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TListOperationsOptions& options = TListOperationsOptions());
-
-void UpdateOperationParameters(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options = TUpdateOperationParametersOptions());
-
-//
// Jobs
//