path: root/yt/cpp/mapreduce/http_client/raw_requests.cpp
diff options
authorhiddenpath <hiddenpath@yandex-team.com>2025-01-22 08:47:22 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-01-22 09:04:11 +0300
commit044fc00c5520ec73b6146427ce9f1cf80ec6a95f (patch)
tree6d8b56e510374542ad49e5588c25d95701d7cf02 /yt/cpp/mapreduce/http_client/raw_requests.cpp
parent8f9ae59afa6108d373d287e973a7597c0a89143e (diff)
YT-23616: Rename raw_client to http_client
Diffstat (limited to 'yt/cpp/mapreduce/http_client/raw_requests.cpp')
1 files changed, 328 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http_client/raw_requests.cpp b/yt/cpp/mapreduce/http_client/raw_requests.cpp
new file mode 100644
index 0000000000..30b1619b71
--- /dev/null
+++ b/yt/cpp/mapreduce/http_client/raw_requests.cpp
@@ -0,0 +1,328 @@
+#include "raw_requests.h"
+#include "raw_batch_request.h"
+#include "rpc_parameters_serialization.h"
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/http/fwd.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/helpers.h>
+#include <yt/cpp/mapreduce/http/http_client.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
+#include <yt/cpp/mapreduce/interface/tvm.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+#include <library/cpp/yson/node/node_io.h>
+#include <util/generic/guid.h>
+#include <util/generic/scope.h>
+namespace NYT::NDetail::NRawClient {
+TOperationAttributes ParseOperationAttributes(const TNode& node)
+ const auto& mapNode = node.AsMap();
+ TOperationAttributes result;
+ if (auto idNode = mapNode.FindPtr("id")) {
+ result.Id = GetGuid(idNode->AsString());
+ }
+ if (auto typeNode = mapNode.FindPtr("type")) {
+ result.Type = FromString<EOperationType>(typeNode->AsString());
+ } else if (auto operationTypeNode = mapNode.FindPtr("operation_type")) {
+ // COMPAT(levysotsky): "operation_type" is a deprecated synonym for "type".
+ // This branch should be removed when all clusters are updated.
+ result.Type = FromString<EOperationType>(operationTypeNode->AsString());
+ }
+ if (auto stateNode = mapNode.FindPtr("state")) {
+ result.State = stateNode->AsString();
+ // We don't use FromString here, because OS_IN_PROGRESS unites many states: "initializing", "running", etc.
+ if (*result.State == "completed") {
+ result.BriefState = EOperationBriefState::Completed;
+ } else if (*result.State == "aborted") {
+ result.BriefState = EOperationBriefState::Aborted;
+ } else if (*result.State == "failed") {
+ result.BriefState = EOperationBriefState::Failed;
+ } else {
+ result.BriefState = EOperationBriefState::InProgress;
+ }
+ }
+ if (auto authenticatedUserNode = mapNode.FindPtr("authenticated_user")) {
+ result.AuthenticatedUser = authenticatedUserNode->AsString();
+ }
+ if (auto startTimeNode = mapNode.FindPtr("start_time")) {
+ result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
+ }
+ if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
+ result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
+ }
+ auto briefProgressNode = mapNode.FindPtr("brief_progress");
+ if (briefProgressNode && briefProgressNode->HasKey("jobs")) {
+ result.BriefProgress.ConstructInPlace();
+ static auto load = [] (const TNode& item) {
+ // Backward compatibility with old YT versions
+ return item.IsInt64() ? item.AsInt64() : item["total"].AsInt64();
+ };
+ const auto& jobs = (*briefProgressNode)["jobs"];
+ result.BriefProgress->Aborted = load(jobs["aborted"]);
+ result.BriefProgress->Completed = load(jobs["completed"]);
+ result.BriefProgress->Running = jobs["running"].AsInt64();
+ result.BriefProgress->Total = jobs["total"].AsInt64();
+ result.BriefProgress->Failed = jobs["failed"].AsInt64();
+ result.BriefProgress->Lost = jobs["lost"].AsInt64();
+ result.BriefProgress->Pending = jobs["pending"].AsInt64();
+ }
+ if (auto briefSpecNode = mapNode.FindPtr("brief_spec")) {
+ result.BriefSpec = *briefSpecNode;
+ }
+ if (auto specNode = mapNode.FindPtr("spec")) {
+ result.Spec = *specNode;
+ }
+ if (auto fullSpecNode = mapNode.FindPtr("full_spec")) {
+ result.FullSpec = *fullSpecNode;
+ }
+ if (auto unrecognizedSpecNode = mapNode.FindPtr("unrecognized_spec")) {
+ result.UnrecognizedSpec = *unrecognizedSpecNode;
+ }
+ if (auto suspendedNode = mapNode.FindPtr("suspended")) {
+ result.Suspended = suspendedNode->AsBool();
+ }
+ if (auto resultNode = mapNode.FindPtr("result")) {
+ result.Result.ConstructInPlace();
+ auto error = TYtError((*resultNode)["error"]);
+ if (error.GetCode() != 0) {
+ result.Result->Error = std::move(error);
+ }
+ }
+ if (auto progressNode = mapNode.FindPtr("progress")) {
+ const auto& progressMap = progressNode->AsMap();
+ TMaybe<TInstant> buildTime;
+ if (auto buildTimeNode = progressMap.FindPtr("build_time")) {
+ buildTime = TInstant::ParseIso8601(buildTimeNode->AsString());
+ }
+ TJobStatistics jobStatistics;
+ if (auto jobStatisticsNode = progressMap.FindPtr("job_statistics")) {
+ jobStatistics = TJobStatistics(*jobStatisticsNode);
+ }
+ TJobCounters jobCounters;
+ if (auto jobCountersNode = progressMap.FindPtr("total_job_counter")) {
+ jobCounters = TJobCounters(*jobCountersNode);
+ }
+ result.Progress = TOperationProgress{
+ .JobStatistics = std::move(jobStatistics),
+ .JobCounters = std::move(jobCounters),
+ .BuildTime = buildTime,
+ };
+ }
+ if (auto eventsNode = mapNode.FindPtr("events")) {
+ result.Events.ConstructInPlace().reserve(eventsNode->Size());
+ for (const auto& eventNode : eventsNode->AsList()) {
+ result.Events->push_back(TOperationEvent{
+ eventNode["state"].AsString(),
+ TInstant::ParseIso8601(eventNode["time"].AsString()),
+ });
+ }
+ }
+ if (auto alertsNode = mapNode.FindPtr("alerts")) {
+ result.Alerts.ConstructInPlace();
+ for (const auto& [alertType, alertError] : alertsNode->AsMap()) {
+ result.Alerts->emplace(alertType, TYtError(alertError));
+ }
+ }
+ return result;
+TJobAttributes ParseJobAttributes(const TNode& node)
+ const auto& mapNode = node.AsMap();
+ TJobAttributes result;
+ // Currently "get_job" returns "job_id" field and "list_jobs" returns "id" field.
+ auto idNode = mapNode.FindPtr("id");
+ if (!idNode) {
+ idNode = mapNode.FindPtr("job_id");
+ }
+ if (idNode) {
+ result.Id = GetGuid(idNode->AsString());
+ }
+ if (auto typeNode = mapNode.FindPtr("type")) {
+ result.Type = FromString<EJobType>(typeNode->AsString());
+ }
+ if (auto stateNode = mapNode.FindPtr("state")) {
+ result.State = FromString<EJobState>(stateNode->AsString());
+ }
+ if (auto addressNode = mapNode.FindPtr("address")) {
+ result.Address = addressNode->AsString();
+ }
+ if (auto taskNameNode = mapNode.FindPtr("task_name")) {
+ result.TaskName = taskNameNode->AsString();
+ }
+ if (auto startTimeNode = mapNode.FindPtr("start_time")) {
+ result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
+ }
+ if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
+ result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
+ }
+ if (auto progressNode = mapNode.FindPtr("progress")) {
+ result.Progress = progressNode->AsDouble();
+ }
+ if (auto stderrSizeNode = mapNode.FindPtr("stderr_size")) {
+ result.StderrSize = stderrSizeNode->AsUint64();
+ }
+ if (auto errorNode = mapNode.FindPtr("error")) {
+ result.Error.ConstructInPlace(*errorNode);
+ }
+ if (auto briefStatisticsNode = mapNode.FindPtr("brief_statistics")) {
+ result.BriefStatistics = *briefStatisticsNode;
+ }
+ if (auto inputPathsNode = mapNode.FindPtr("input_paths")) {
+ const auto& inputPathNodesList = inputPathsNode->AsList();
+ result.InputPaths.ConstructInPlace();
+ result.InputPaths->reserve(inputPathNodesList.size());
+ for (const auto& inputPathNode : inputPathNodesList) {
+ TRichYPath path;
+ Deserialize(path, inputPathNode);
+ result.InputPaths->push_back(std::move(path));
+ }
+ }
+ if (auto coreInfosNode = mapNode.FindPtr("core_infos")) {
+ const auto& coreInfoNodesList = coreInfosNode->AsList();
+ result.CoreInfos.ConstructInPlace();
+ result.CoreInfos->reserve(coreInfoNodesList.size());
+ for (const auto& coreInfoNode : coreInfoNodesList) {
+ TCoreInfo coreInfo;
+ coreInfo.ProcessId = coreInfoNode["process_id"].AsInt64();
+ coreInfo.ExecutableName = coreInfoNode["executable_name"].AsString();
+ if (coreInfoNode.HasKey("size")) {
+ coreInfo.Size = coreInfoNode["size"].AsUint64();
+ }
+ if (coreInfoNode.HasKey("error")) {
+ coreInfo.Error.ConstructInPlace(coreInfoNode["error"]);
+ }
+ result.CoreInfos->push_back(std::move(coreInfo));
+ }
+ }
+ return result;
+TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
+ auto parseSingleResult = [] (const TNode::TMapType& node) {
+ TCheckPermissionResult result;
+ result.Action = ::FromString<ESecurityAction>(node.at("action").AsString());
+ if (auto objectId = node.FindPtr("object_id")) {
+ result.ObjectId = GetGuid(objectId->AsString());
+ }
+ if (auto objectName = node.FindPtr("object_name")) {
+ result.ObjectName = objectName->AsString();
+ }
+ if (auto subjectId = node.FindPtr("subject_id")) {
+ result.SubjectId = GetGuid(subjectId->AsString());
+ }
+ if (auto subjectName = node.FindPtr("subject_name")) {
+ result.SubjectName = subjectName->AsString();
+ }
+ return result;
+ };
+ const auto& mapNode = node.AsMap();
+ TCheckPermissionResponse result;
+ static_cast<TCheckPermissionResult&>(result) = parseSingleResult(mapNode);
+ if (auto columns = mapNode.FindPtr("columns")) {
+ result.Columns.reserve(columns->AsList().size());
+ for (const auto& columnNode : columns->AsList()) {
+ result.Columns.push_back(parseSingleResult(columnNode.AsMap()));
+ }
+ }
+ return result;
+TRichYPath CanonizeYPath(
+ const IRawClientPtr& rawClient,
+ const TRichYPath& path)
+ return CanonizeYPaths(rawClient, {path}).front();
+TVector<TRichYPath> CanonizeYPaths(
+ const IRawClientPtr& rawClient,
+ const TVector<TRichYPath>& paths)
+ auto batch = rawClient->CreateRawBatchRequest();
+ TVector<NThreading::TFuture<TRichYPath>> futures;
+ futures.reserve(paths.size());
+ for (const auto& path : paths) {
+ futures.push_back(batch->CanonizeYPath(path));
+ }
+ batch->ExecuteBatch();
+ TVector<TRichYPath> result;
+ result.reserve(futures.size());
+ for (auto& future : futures) {
+ result.push_back(future.ExtractValueSync());
+ }
+ return result;
+NHttpClient::IHttpResponsePtr SkyShareTable(
+ const TClientContext& context,
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options)
+ TMutationId mutationId;
+ THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
+ auto proxyName = context.ServerName.substr(0, context.ServerName.find('.'));
+ auto host = context.Config->SkynetApiHost;
+ if (host == "") {
+ host = "skynet." + proxyName + ".yt.yandex.net";
+ }
+ TSkyShareTableOptions patchedOptions = options;
+ if (context.Config->Pool && !patchedOptions.Pool_) {
+ patchedOptions.Pool(context.Config->Pool);
+ }
+ header.MergeParameters(SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, patchedOptions));
+ TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()});
+ return RequestWithoutRetry(skyApiHost, mutationId, header, "");
+TAuthorizationInfo WhoAmI(const TClientContext& context)
+ TMutationId mutationId;
+ THttpHeader header("GET", "auth/whoami", /*isApi*/ false);
+ auto requestResult = RequestWithoutRetry(context, mutationId, header);
+ TAuthorizationInfo result;
+ NJson::TJsonValue jsonValue;
+ bool ok = NJson::ReadJsonTree(requestResult->GetResponse(), &jsonValue, /*throwOnError*/ true);
+ result.Login = jsonValue["login"].GetString();
+ result.Realm = jsonValue["realm"].GetString();
+ return result;
+} // namespace NYT::NDetail::NRawClient