diff options
| author | hiddenpath <[email protected]> | 2024-12-16 18:45:28 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2024-12-16 19:11:27 +0300 |
| commit | dae2dbe3496d7557b1ece64d5464bd8e686995a8 (patch) | |
| tree | 72899b4fb6279a620a14f71005d81afbbe93cd35 /yt/cpp/mapreduce/raw_client | |
| parent | b1cde7dcb055fb6f3367e81fd0f57bd55b8bb93c (diff) | |
[yt/cpp/mapreduce] YT-23616: Move Transaction and Operation methods to THttpRawClient
commit_hash:b093be44005f3d9da9779444cbbc32b93f7372ee
Diffstat (limited to 'yt/cpp/mapreduce/raw_client')
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 136 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 38 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 141 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 58 |
4 files changed, 174 insertions, 199 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 351e00f7edf..7bc83b8ebc2 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -1,5 +1,6 @@ #include "raw_client.h" +#include "raw_requests.h" #include "rpc_parameters_serialization.h" #include <yt/cpp/mapreduce/common/helpers.h> @@ -8,6 +9,8 @@ #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> +#include <yt/cpp/mapreduce/interface/operation.h> + #include <library/cpp/yson/node/node_io.h> namespace NYT::NDetail { @@ -239,6 +242,139 @@ void THttpRawClient::Concatenate( RequestWithoutRetry(Context_, mutationId, header); } +void THttpRawClient::PingTx(const TTransactionId& transactionId) +{ + TMutationId mutationId; + THttpHeader header("POST", "ping_tx"); + header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId)); + TRequestConfig requestConfig; + requestConfig.HttpConfig = NHttpClient::THttpConfig{ + .SocketTimeout = Context_.Config->PingTimeout + }; + RequestWithoutRetry(Context_, mutationId, header); +} + +TOperationAttributes THttpRawClient::GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "get_operation"); + header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options)); + auto result = RequestWithoutRetry(Context_, mutationId, header); + return NRawClient::ParseOperationAttributes(NodeFromYsonString(result.Response)); +} + +TOperationAttributes THttpRawClient::GetOperation( + const TString& alias, + const TGetOperationOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "get_operation"); + header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options)); + auto result = RequestWithoutRetry(Context_, mutationId, header); + return NRawClient::ParseOperationAttributes(NodeFromYsonString(result.Response)); +} + +void THttpRawClient::AbortOperation( + TMutationId& mutationId, + const TOperationId& operationId) +{ + THttpHeader header("POST", "abort_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId)); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::CompleteOperation( + TMutationId& mutationId, + const TOperationId& operationId) +{ + THttpHeader header("POST", "complete_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId)); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::SuspendOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TSuspendOperationOptions& options) +{ + THttpHeader header("POST", "suspend_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options)); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::ResumeOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TResumeOperationOptions& options) +{ + THttpHeader header("POST", "resume_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options)); + RequestWithoutRetry(Context_, mutationId, header); +} + +template <typename TKey> +static THashMap<TKey, i64> GetCounts(const TNode& countsNode) +{ + THashMap<TKey, i64> counts; + for (const auto& entry : countsNode.AsMap()) { + counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64()); + } + return counts; +} + +TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "list_operations"); + header.MergeParameters(NRawClient::SerializeParamsForListOperations(options)); + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + + const auto& operationNodesList = resultNode["operations"].AsList(); + + TListOperationsResult result; + result.Operations.reserve(operationNodesList.size()); + for (const auto& operationNode : operationNodesList) { + result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode)); + } + + if (resultNode.HasKey("pool_counts")) { + result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]); + } + if (resultNode.HasKey("user_counts")) { + result.UserCounts = GetCounts<TString>(resultNode["user_counts"]); + } + if (resultNode.HasKey("type_counts")) { + result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]); + } + if (resultNode.HasKey("state_counts")) { + result.StateCounts = GetCounts<TString>(resultNode["state_counts"]); + } + if (resultNode.HasKey("failed_jobs_count")) { + result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64(); + } + + result.Incomplete = resultNode["incomplete"].AsBool(); + + return result; +} + +void THttpRawClient::UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options) +{ + TMutationId mutationId; + THttpHeader header("POST", "update_op_parameters"); + header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options)); + RequestWithoutRetry(Context_, mutationId, header); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index 756c70f2116..c3771497fba 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -116,6 +116,44 @@ public: const TRichYPath& destinationPath, const TConcatenateOptions& options = {}) override; + // Transactions + + void PingTx(const TTransactionId& transactionId) override; + + // Operations + + TOperationAttributes GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options = {}) override; + + TOperationAttributes GetOperation( + const TString& operationId, + const TGetOperationOptions& options = {}) override; + + void AbortOperation( + TMutationId& mutationId, + const TOperationId& operationId) override; + + void CompleteOperation( + TMutationId& mutationId, + const TOperationId& operationId) override; + + void SuspendOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TSuspendOperationOptions& options = {}) override; + + void ResumeOperation( + TMutationId& mutationId, + const TOperationId& operationId, + const TResumeOperationOptions& options = {}) override; + + TListOperationsResult ListOperations(const TListOperationsOptions& options = {}) override; + + void UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options = {}) override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index bdaff49d3be..bb69728cb2e 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -79,20 +79,6 @@ void ExecuteBatch( } } -void PingTx( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId) -{ - THttpHeader header("POST", "ping_tx"); - header.MergeParameters(SerializeParamsForPingTx(transactionId)); - TRequestConfig requestConfig; - requestConfig.HttpConfig = NHttpClient::THttpConfig{ - .SocketTimeout = context.Config->PingTimeout - }; - RetryRequestWithPolicy(retryPolicy, context, header, {}, requestConfig); -} - TOperationAttributes ParseOperationAttributes(const TNode& node) { const auto& mapNode = node.AsMap(); @@ -209,133 +195,6 @@ TOperationAttributes ParseOperationAttributes(const TNode& node) return result; } -TOperationAttributes GetOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TGetOperationOptions& options) -{ - THttpHeader header("GET", "get_operation"); - header.MergeParameters(SerializeParamsForGetOperation(operationId, options)); - auto result = RetryRequestWithPolicy(retryPolicy, context, header); - return ParseOperationAttributes(NodeFromYsonString(result.Response)); -} - -TOperationAttributes GetOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TString& alias, - const TGetOperationOptions& options) -{ - THttpHeader header("GET", "get_operation"); - header.MergeParameters(SerializeParamsForGetOperation(alias, options)); - auto result = RetryRequestWithPolicy(retryPolicy, context, header); - return ParseOperationAttributes(NodeFromYsonString(result.Response)); -} - -void AbortOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId) -{ - THttpHeader header("POST", "abort_op"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForAbortOperation(operationId)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - -void CompleteOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId) -{ - THttpHeader header("POST", "complete_op"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForCompleteOperation(operationId)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - -void SuspendOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TSuspendOperationOptions& options) -{ - THttpHeader header("POST", "suspend_op"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForSuspendOperation(operationId, options)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - -void ResumeOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TResumeOperationOptions& options) -{ - THttpHeader header("POST", "resume_op"); - header.AddMutationId(); - header.MergeParameters(SerializeParamsForResumeOperation(operationId, options)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - -template <typename TKey> -static THashMap<TKey, i64> GetCounts(const TNode& countsNode) -{ - THashMap<TKey, i64> counts; - for (const auto& entry : countsNode.AsMap()) { - counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64()); - } - return counts; -} - -TListOperationsResult ListOperations( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TListOperationsOptions& options) -{ - THttpHeader header("GET", "list_operations"); - header.MergeParameters(SerializeParamsForListOperations(options)); - auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); - auto resultNode = NodeFromYsonString(responseInfo.Response); - - TListOperationsResult result; - for (const auto& operationNode : resultNode["operations"].AsList()) { - result.Operations.push_back(ParseOperationAttributes(operationNode)); - } - - if (resultNode.HasKey("pool_counts")) { - result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]); - } - if (resultNode.HasKey("user_counts")) { - result.UserCounts = GetCounts<TString>(resultNode["user_counts"]); - } - if (resultNode.HasKey("type_counts")) { - result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]); - } - if (resultNode.HasKey("state_counts")) { - result.StateCounts = GetCounts<TString>(resultNode["state_counts"]); - } - if (resultNode.HasKey("failed_jobs_count")) { - result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64(); - } - - result.Incomplete = resultNode["incomplete"].AsBool(); - - return result; -} - -void UpdateOperationParameters( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TUpdateOperationParametersOptions& options) -{ - THttpHeader header("POST", "update_op_parameters"); - header.MergeParameters(SerializeParamsForUpdateOperationParameters(operationId, options)); - RetryRequestWithPolicy(retryPolicy, context, header); -} - TJobAttributes ParseJobAttributes(const TNode& node) { const auto& mapNode = node.AsMap(); diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 04c18a753b4..28af3ee9896 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -36,64 +36,6 @@ void ExecuteBatch( const TExecuteBatchOptions& options = TExecuteBatchOptions()); // -// Transactions -// - -void PingTx( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId); - -// -// Operations -// - -TOperationAttributes GetOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TGetOperationOptions& options = TGetOperationOptions()); - -TOperationAttributes GetOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TString& operationId, - const TGetOperationOptions& options = TGetOperationOptions()); - -void AbortOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId); - -void CompleteOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId); - -void SuspendOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TSuspendOperationOptions& options = TSuspendOperationOptions()); - -void ResumeOperation( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TResumeOperationOptions& options = TResumeOperationOptions()); - -TListOperationsResult ListOperations( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TListOperationsOptions& options = TListOperationsOptions()); - -void UpdateOperationParameters( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TUpdateOperationParametersOptions& options = TUpdateOperationParametersOptions()); - -// // Jobs // |
