aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http_client/raw_client.cpp
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2025-01-22 08:47:22 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-01-22 09:04:11 +0300
commit044fc00c5520ec73b6146427ce9f1cf80ec6a95f (patch)
tree6d8b56e510374542ad49e5588c25d95701d7cf02 /yt/cpp/mapreduce/http_client/raw_client.cpp
parent8f9ae59afa6108d373d287e973a7597c0a89143e (diff)
downloadydb-044fc00c5520ec73b6146427ce9f1cf80ec6a95f.tar.gz
YT-23616: Rename raw_client to http_client
commit_hash:df330f3a0c0ca36d9bcf801fd96b964f1be6383a
Diffstat (limited to 'yt/cpp/mapreduce/http_client/raw_client.cpp')
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.cpp936
1 files changed, 936 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp
new file mode 100644
index 0000000000..3586751191
--- /dev/null
+++ b/yt/cpp/mapreduce/http_client/raw_client.cpp
@@ -0,0 +1,936 @@
+#include "raw_client.h"
+
+#include "raw_requests.h"
+#include "rpc_parameters_serialization.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+
+#include <yt/cpp/mapreduce/http/helpers.h>
+#include <yt/cpp/mapreduce/http/http.h>
+#include <yt/cpp/mapreduce/http/requests.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <yt/cpp/mapreduce/interface/fluent.h>
+#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+#include <yt/cpp/mapreduce/interface/tvm.h>
+
+#include <yt/cpp/mapreduce/io/helpers.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+THttpRawClient::THttpRawClient(const TClientContext& context)
+ : Context_(context)
+{ }
+
+TNode THttpRawClient::Get(
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get");
+ header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options));
+ return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+TNode THttpRawClient::TryGet(
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ try {
+ return Get(transactionId, path, options);
+ } catch (const TErrorResponse& error) {
+ if (!error.IsResolveError()) {
+ throw;
+ }
+ return {};
+ }
+}
+
+void THttpRawClient::Set(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode& value,
+ const TSetOptions& options)
+{
+ THttpHeader header("PUT", "set");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options));
+ auto body = NodeToYsonString(value);
+ RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
+}
+
+bool THttpRawClient::Exists(
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TExistsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "exists");
+ header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options));
+ return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+void THttpRawClient::MultisetAttributes(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode::TMapType& value,
+ const TMultisetAttributesOptions& options)
+{
+ THttpHeader header("PUT", "api/v4/multiset_attributes", false);
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options));
+ auto body = NodeToYsonString(value);
+ RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
+}
+
+TNodeId THttpRawClient::Create(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const ENodeType& type,
+ const TCreateOptions& options)
+{
+ THttpHeader header("POST", "create");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+TNodeId THttpRawClient::CopyWithoutRetries(
+ const TTransactionId& transactionId,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "copy");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+TNodeId THttpRawClient::CopyInsideMasterCell(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options)
+{
+ THttpHeader header("POST", "copy");
+ header.AddMutationId();
+ auto params = NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
+
+ // Make cross cell copying disable.
+ params["enable_cross_cell_copying"] = false;
+ header.MergeParameters(params);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+TNodeId THttpRawClient::MoveWithoutRetries(
+ const TTransactionId& transactionId,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "move");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+TNodeId THttpRawClient::MoveInsideMasterCell(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options)
+{
+ THttpHeader header("POST", "move");
+ header.AddMutationId();
+ auto params = NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
+
+ // Make cross cell copying disable.
+ params["enable_cross_cell_copying"] = false;
+ header.MergeParameters(params);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+void THttpRawClient::Remove(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TRemoveOptions& options)
+{
+ THttpHeader header("POST", "remove");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForRemove(transactionId, Context_.Config->Prefix, path, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+TNode::TListType THttpRawClient::List(
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TListOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "list");
+
+ TYPath updatedPath = AddPathPrefix(path, Context_.Config->Prefix);
+ // Translate "//" to "/"
+ // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config"
+ if (path.empty() && updatedPath.EndsWith('/')) {
+ updatedPath.pop_back();
+ }
+ header.MergeParameters(NRawClient::SerializeParamsForList(transactionId, Context_.Config->Prefix, updatedPath, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ return NodeFromYsonString(responseInfo->GetResponse()).AsList();
+}
+
+TNodeId THttpRawClient::Link(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options)
+{
+ THttpHeader header("POST", "link");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForLink(transactionId, Context_.Config->Prefix, targetPath, linkPath, options));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+TLockId THttpRawClient::Lock(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options)
+{
+ THttpHeader header("POST", "lock");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForLock(transactionId, Context_.Config->Prefix, path, mode, options));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+void THttpRawClient::Unlock(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TUnlockOptions& options)
+{
+ THttpHeader header("POST", "unlock");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForUnlock(transactionId, Context_.Config->Prefix, path, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::Concatenate(
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& sourcePaths,
+ const TRichYPath& destinationPath,
+ const TConcatenateOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "concatenate");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForConcatenate(transactionId, Context_.Config->Prefix, sourcePaths, destinationPath, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+TTransactionId THttpRawClient::StartTransaction(
+ TMutationId& mutationId,
+ const TTransactionId& parentTransactionId,
+ const TStartTransactionOptions& options)
+{
+ THttpHeader header("POST", "start_tx");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, Context_.Config->TxTimeout, options));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+void THttpRawClient::PingTransaction(const TTransactionId& transactionId)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "ping_tx");
+ header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId));
+ TRequestConfig requestConfig;
+ requestConfig.HttpConfig = NHttpClient::THttpConfig{
+ .SocketTimeout = Context_.Config->PingTimeout
+ };
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::AbortTransaction(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId)
+{
+ THttpHeader header("POST", "abort_tx");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::CommitTransaction(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId)
+{
+ THttpHeader header("POST", "commit_tx");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+TOperationId THttpRawClient::StartOperation(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec)
+{
+ THttpHeader header("POST", "start_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
+TOperationAttributes THttpRawClient::GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_operation");
+ header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
+}
+
+TOperationAttributes THttpRawClient::GetOperation(
+ const TString& alias,
+ const TGetOperationOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_operation");
+ header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
+}
+
+void THttpRawClient::AbortOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId)
+{
+ THttpHeader header("POST", "abort_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::CompleteOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId)
+{
+ THttpHeader header("POST", "complete_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::SuspendOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options)
+{
+ THttpHeader header("POST", "suspend_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::ResumeOperation(
+ TMutationId& mutationId,
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options)
+{
+ THttpHeader header("POST", "resume_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+template <typename TKey>
+static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
+{
+ THashMap<TKey, i64> counts;
+ for (const auto& entry : countsNode.AsMap()) {
+ counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
+ }
+ return counts;
+}
+
+TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "list_operations");
+ header.MergeParameters(NRawClient::SerializeParamsForListOperations(options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
+
+ const auto& operationNodesList = resultNode["operations"].AsList();
+
+ TListOperationsResult result;
+ result.Operations.reserve(operationNodesList.size());
+ for (const auto& operationNode : operationNodesList) {
+ result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode));
+ }
+
+ if (resultNode.HasKey("pool_counts")) {
+ result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
+ }
+ if (resultNode.HasKey("user_counts")) {
+ result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
+ }
+ if (resultNode.HasKey("type_counts")) {
+ result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
+ }
+ if (resultNode.HasKey("state_counts")) {
+ result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
+ }
+ if (resultNode.HasKey("failed_jobs_count")) {
+ result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
+ }
+
+ result.Incomplete = resultNode["incomplete"].AsBool();
+
+ return result;
+}
+
+void THttpRawClient::UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "update_op_parameters");
+ header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options));
+ RequestWithoutRetry(Context_, mutationId, header);
+}
+
+NYson::TYsonString THttpRawClient::GetJob(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_job");
+ header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ return NYson::TYsonString(responseInfo->GetResponse());
+}
+
+TListJobsResult THttpRawClient::ListJobs(
+ const TOperationId& operationId,
+ const TListJobsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "list_jobs");
+ header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
+
+ const auto& jobNodesList = resultNode["jobs"].AsList();
+
+ TListJobsResult result;
+ result.Jobs.reserve(jobNodesList.size());
+ for (const auto& jobNode : jobNodesList) {
+ result.Jobs.push_back(NRawClient::ParseJobAttributes(jobNode));
+ }
+
+ if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
+ result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
+ }
+ if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
+ result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
+ }
+ if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
+ result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
+ }
+
+ return result;
+}
+
+IFileReaderPtr THttpRawClient::GetJobInput(
+ const TJobId& jobId,
+ const TGetJobInputOptions& /*options*/)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_job_input");
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
+IFileReaderPtr THttpRawClient::GetJobFailContext(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobFailContextOptions& /*options*/)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_job_fail_context");
+ header.AddOperationId(operationId);
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
+IFileReaderPtr THttpRawClient::GetJobStderr(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& /*options*/)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_job_stderr");
+ header.AddOperationId(operationId);
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
+TJobTraceEvent ParseJobTraceEvent(const TNode& node)
+{
+ const auto& mapNode = node.AsMap();
+ TJobTraceEvent result;
+
+ if (auto idNode = mapNode.FindPtr("operation_id")) {
+ result.OperationId = GetGuid(idNode->AsString());
+ }
+ if (auto idNode = mapNode.FindPtr("job_id")) {
+ result.JobId = GetGuid(idNode->AsString());
+ }
+ if (auto idNode = mapNode.FindPtr("trace_id")) {
+ result.TraceId = GetGuid(idNode->AsString());
+ }
+ if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
+ result.EventIndex = eventIndexNode->AsInt64();
+ }
+ if (auto eventNode = mapNode.FindPtr("event")) {
+ result.Event = eventNode->AsString();
+ }
+ if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
+ result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
+ }
+
+ return result;
+}
+
+std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_job_trace");
+ header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
+
+ const auto& traceEventNodesList = resultNode.AsList();
+
+ std::vector<TJobTraceEvent> result;
+ result.reserve(traceEventNodesList.size());
+ for (const auto& traceEventNode : traceEventNodesList) {
+ result.push_back(ParseJobTraceEvent(traceEventNode));
+ }
+
+ return result;
+}
+
+std::unique_ptr<IInputStream> THttpRawClient::ReadFile(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TFileReaderOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion));
+ header.AddTransactionId(transactionId);
+ header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
+ header.MergeParameters(FormIORequestParameters(path, options));
+ header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
+TMaybe<TYPath> THttpRawClient::GetFileFromCache(
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_file_from_cache");
+ header.MergeParameters(NRawClient::SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ auto resultNode = NodeFromYsonString(responseInfo->GetResponse()).AsString();
+ return resultNode.empty() ? Nothing() : TMaybe<TYPath>(resultNode);
+}
+
+TYPath THttpRawClient::PutFileToCache(
+ const TTransactionId& transactionId,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "put_file_to_cache");
+ header.MergeParameters(NRawClient::SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ return NodeFromYsonString(responseInfo->GetResponse()).AsString();
+}
+
+void THttpRawClient::MountTable(
+ TMutationId& mutationId,
+ const TYPath& path,
+ const TMountTableOptions& options)
+{
+ THttpHeader header("POST", "mount_table");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
+ if (options.CellId_) {
+ header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
+ }
+ header.AddParameter("freeze", options.Freeze_);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::UnmountTable(
+ TMutationId& mutationId,
+ const TYPath& path,
+ const TUnmountTableOptions& options)
+{
+ THttpHeader header("POST", "unmount_table");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
+ header.AddParameter("force", options.Force_);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::RemountTable(
+ TMutationId& mutationId,
+ const TYPath& path,
+ const TRemountTableOptions& options)
+{
+ THttpHeader header("POST", "remount_table");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::ReshardTableByPivotKeys(
+ TMutationId& mutationId,
+ const TYPath& path,
+ const TVector<TKey>& keys,
+ const TReshardTableOptions& options)
+{
+ THttpHeader header("POST", "reshard_table");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
+ header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::ReshardTableByTabletCount(
+ TMutationId& mutationId,
+ const TYPath& path,
+ i64 tabletCount,
+ const TReshardTableOptions& options)
+{
+ THttpHeader header("POST", "reshard_table");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
+ header.AddParameter("tablet_count", tabletCount);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::InsertRows(
+ const TYPath& path,
+ const TNode::TListType& rows,
+ const TInsertRowsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("PUT", "insert_rows");
+ header.SetInputFormat(TFormat::YsonBinary());
+ header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
+ auto body = NodeListToYsonString(rows);
+ TRequestConfig config;
+ config.IsHeavy = true;
+ RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
+}
+
+void THttpRawClient::TrimRows(
+ const TYPath& path,
+ i64 tabletIndex,
+ i64 rowCount,
+ const TTrimRowsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "trim_rows");
+ header.AddParameter("trimmed_row_count", rowCount);
+ header.AddParameter("tablet_index", tabletIndex);
+ header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
+}
+
+TNode::TListType THttpRawClient::LookupRows(
+ const TYPath& path,
+ const TNode::TListType& keys,
+ const TLookupRowsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("PUT", "lookup_rows");
+ header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
+ header.SetInputFormat(TFormat::YsonBinary());
+ header.SetOutputFormat(TFormat::YsonBinary());
+
+ header.MergeParameters(BuildYsonNodeFluently().BeginMap()
+ .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
+ fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
+ })
+ .Item("keep_missing_rows").Value(options.KeepMissingRows_)
+ .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
+ fluent.Item("versioned").Value(*options.Versioned_);
+ })
+ .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
+ fluent.Item("column_names").Value(*options.Columns_);
+ })
+ .EndMap());
+
+ auto body = NodeListToYsonString(keys);
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, body, config);
+ return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
+}
+
+TNode::TListType THttpRawClient::SelectRows(
+ const TString& query,
+ const TSelectRowsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "select_rows");
+ header.SetInputFormat(TFormat::YsonBinary());
+ header.SetOutputFormat(TFormat::YsonBinary());
+
+ header.MergeParameters(BuildYsonNodeFluently().BeginMap()
+ .Item("query").Value(query)
+ .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
+ fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
+ })
+ .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
+ fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
+ })
+ .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
+ fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
+ })
+ .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
+ .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
+ .Item("verbose_logging").Value(options.VerboseLogging_)
+ .Item("enable_code_cache").Value(options.EnableCodeCache_)
+ .EndMap());
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
+}
+
+std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableReaderOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
+ header.SetOutputFormat(format);
+ header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
+ header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, options));
+ header.MergeParameters(FormIORequestParameters(path, options));
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
+std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TKey& key,
+ const TBlobTableReaderOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "read_blob_table");
+ header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
+ header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
+ header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options));
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
+void THttpRawClient::AlterTable(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TAlterTableOptions& options)
+{
+ THttpHeader header("POST", "alter_table");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForAlterTable(transactionId, Context_.Config->Prefix, path, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::AlterTableReplica(
+ TMutationId& mutationId,
+ const TReplicaId& replicaId,
+ const TAlterTableReplicaOptions& options)
+{
+ THttpHeader header("POST", "alter_table_replica");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::DeleteRows(
+ const TYPath& path,
+ const TNode::TListType& keys,
+ const TDeleteRowsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("PUT", "delete_rows");
+ header.SetInputFormat(TFormat::YsonBinary());
+ header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(Context_.Config->Prefix, path, options));
+
+ auto body = NodeListToYsonString(keys);
+ TRequestConfig config;
+ config.IsHeavy = true;
+ RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
+}
+
+void THttpRawClient::FreezeTable(
+ const TYPath& path,
+ const TFreezeTableOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "freeze_table");
+ header.MergeParameters(NRawClient::SerializeParamsForFreezeTable(Context_.Config->Prefix, path, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+void THttpRawClient::UnfreezeTable(
+ const TYPath& path,
+ const TUnfreezeTableOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "unfreeze_table");
+ header.MergeParameters(NRawClient::SerializeParamsForUnfreezeTable(Context_.Config->Prefix, path, options));
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
+}
+
+TCheckPermissionResponse THttpRawClient::CheckPermission(
+ const TString& user,
+ EPermission permission,
+ const TYPath& path,
+ const TCheckPermissionOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "check_permission");
+ header.MergeParameters(NRawClient::SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo->GetResponse()));
+}
+
+TVector<TTabletInfo> THttpRawClient::GetTabletInfos(
+ const TYPath& path,
+ const TVector<int>& tabletIndexes,
+ const TGetTabletInfosOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "api/v4/get_tablet_infos", /*isApi*/ false);
+ header.MergeParameters(NRawClient::SerializeParamsForGetTabletInfos(Context_.Config->Prefix, path, tabletIndexes, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ TVector<TTabletInfo> result;
+ Deserialize(result, *NodeFromYsonString(responseInfo->GetResponse()).AsMap().FindPtr("tablets"));
+ return result;
+}
+
+TVector<TTableColumnarStatistics> THttpRawClient::GetTableColumnarStatistics(
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_table_columnar_statistics");
+ header.MergeParameters(NRawClient::SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ TVector<TTableColumnarStatistics> result;
+ Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
+ return result;
+}
+
+TMultiTablePartitions THttpRawClient::GetTablePartitions(
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "partition_tables");
+ header.MergeParameters(NRawClient::SerializeParamsForGetTablePartitions(transactionId, paths, options));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ TMultiTablePartitions result;
+ Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
+ return result;
+}
+
+ui64 THttpRawClient::GenerateTimestamp()
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "generate_timestamp");
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return NodeFromYsonString(responseInfo->GetResponse()).AsUint64();
+}
+
+IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest()
+{
+ return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail