aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/raw_client
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/raw_client
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/raw_client')
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.cpp687
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.h190
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp1027
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h397
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp873
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h231
-rw-r--r--yt/cpp/mapreduce/raw_client/ya.make19
7 files changed, 3424 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
new file mode 100644
index 0000000000..be81f5a21a
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
@@ -0,0 +1,687 @@
+#include "raw_batch_request.h"
+
+#include "raw_requests.h"
+#include "rpc_parameters_serialization.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
+
+#include <library/cpp/yson/node/node.h>
+
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <util/generic/guid.h>
+#include <util/string/builder.h>
+
+#include <exception>
+
+namespace NYT::NDetail::NRawClient {
+
+using NThreading::TFuture;
+using NThreading::TPromise;
+using NThreading::NewPromise;
+
+////////////////////////////////////////////////////////////////////
+
+static TString RequestInfo(const TNode& request)
+{
+ return ::TStringBuilder()
+ << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
+}
+
+static void EnsureNothing(const TMaybe<TNode>& node)
+{
+ Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
+}
+
+static void EnsureSomething(const TMaybe<TNode>& node)
+{
+ Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
+}
+
+static void EnsureType(const TNode& node, TNode::EType type)
+{
+ Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
+ << "Expected: " << type << ", actual: " << node.GetType());
+}
+
+static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
+{
+ Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
+ EnsureType(*node, type);
+}
+
+////////////////////////////////////////////////////////////////////
+
+template <typename TReturnType>
+class TResponseParserBase
+ : public TRawBatchRequest::IResponseItemParser
+{
+public:
+ using TFutureResult = TFuture<TReturnType>;
+
+public:
+ TResponseParserBase()
+ : Result(NewPromise<TReturnType>())
+ { }
+
+ void SetException(std::exception_ptr e) override
+ {
+ Result.SetException(std::move(e));
+ }
+
+ TFuture<TReturnType> GetFuture()
+ {
+ return Result.GetFuture();
+ }
+
+protected:
+ TPromise<TReturnType> Result;
+};
+
+////////////////////////////////////////////////////////////////////
+
+
+class TGetResponseParser
+ : public TResponseParserBase<TNode>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureSomething(node);
+ Result.SetValue(std::move(*node));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TVoidResponseParser
+ : public TResponseParserBase<void>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureNothing(node);
+ Result.SetValue();
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TListResponseParser
+ : public TResponseParserBase<TNode::TListType>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::List);
+ Result.SetValue(std::move(node->AsList()));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TExistsResponseParser
+ : public TResponseParserBase<bool>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Bool);
+ Result.SetValue(std::move(node->AsBool()));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TGuidResponseParser
+ : public TResponseParserBase<TGUID>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ Result.SetValue(GetGuid(node->AsString()));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TCanonizeYPathResponseParser
+ : public TResponseParserBase<TRichYPath>
+{
+public:
+ explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
+ : OriginalNode_(PathToNode(original))
+ , PathPrefix_(std::move(pathPrefix))
+ { }
+
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+
+ for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
+ node->Attributes()[item.first] = item.second;
+ }
+ TRichYPath result;
+ Deserialize(result, *node);
+ result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
+ Result.SetValue(result);
+ }
+
+private:
+ TNode OriginalNode_;
+ TString PathPrefix_;
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TGetOperationResponseParser
+ : public TResponseParserBase<TOperationAttributes>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ Result.SetValue(ParseOperationAttributes(*node));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TTableColumnarStatisticsParser
+ : public TResponseParserBase<TVector<TTableColumnarStatistics>>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ TVector<TTableColumnarStatistics> statistics;
+ Deserialize(statistics, *node);
+ Result.SetValue(std::move(statistics));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TTablePartitionsParser
+ : public TResponseParserBase<TMultiTablePartitions>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ TMultiTablePartitions partitions;
+ Deserialize(partitions, *node);
+ Result.SetValue(std::move(partitions));
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TGetFileFromCacheParser
+ : public TResponseParserBase<TMaybe<TYPath>>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ if (node->AsString().empty()) {
+ Result.SetValue(Nothing());
+ } else {
+ Result.SetValue(node->AsString());
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TYPathParser
+ : public TResponseParserBase<TYPath>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ Result.SetValue(node->AsString());
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TCheckPermissionParser
+ : public TResponseParserBase<TCheckPermissionResponse>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ Result.SetValue(ParseCheckPermissionResponse(*node));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
+ : Parameters(std::move(parameters))
+ , ResponseParser(std::move(responseParser))
+ , NextTry()
+{ }
+
+TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
+ : Parameters(batchItem.Parameters)
+ , ResponseParser(batchItem.ResponseParser)
+ , NextTry(nextTry)
+{ }
+
+////////////////////////////////////////////////////////////////////
+
+TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config)
+ : Config_(config)
+{ }
+
+TRawBatchRequest::~TRawBatchRequest() = default;
+
+bool TRawBatchRequest::IsExecuted() const
+{
+ return Executed_;
+}
+
+void TRawBatchRequest::MarkExecuted()
+{
+ Executed_ = true;
+}
+
+template <typename TResponseParser>
+typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
+ const TString& command,
+ TNode parameters,
+ TMaybe<TNode> input)
+{
+ return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
+}
+
+template <typename TResponseParser>
+typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
+ const TString& command,
+ TNode parameters,
+ TMaybe<TNode> input,
+ ::TIntrusivePtr<TResponseParser> parser)
+{
+ Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
+ TNode request;
+ request["command"] = command;
+ request["parameters"] = std::move(parameters);
+ if (input) {
+ request["input"] = std::move(*input);
+ }
+ BatchItemList_.emplace_back(std::move(request), parser);
+ return parser->GetFuture();
+}
+
+void TRawBatchRequest::AddRequest(TBatchItem batchItem)
+{
+ Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
+ BatchItemList_.push_back(batchItem);
+}
+
+TFuture<TNodeId> TRawBatchRequest::Create(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ENodeType type,
+ const TCreateOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "create",
+ SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::Remove(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TRemoveOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "remove",
+ SerializeParamsForRemove(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<bool> TRawBatchRequest::Exists(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TExistsOptions& options)
+{
+ return AddRequest<TExistsResponseParser>(
+ "exists",
+ SerializeParamsForExists(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<TNode> TRawBatchRequest::Get(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ return AddRequest<TGetResponseParser>(
+ "get",
+ SerializeParamsForGet(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::Set(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TNode& node,
+ const TSetOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "set",
+ SerializeParamsForSet(transaction, Config_->Prefix, path, options),
+ node);
+}
+
+TFuture<TNode::TListType> TRawBatchRequest::List(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TListOptions& options)
+{
+ return AddRequest<TListResponseParser>(
+ "list",
+ SerializeParamsForList(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<TNodeId> TRawBatchRequest::Copy(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "copy",
+ SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options),
+ Nothing());
+}
+
+TFuture<TNodeId> TRawBatchRequest::Move(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "move",
+ SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options),
+ Nothing());
+}
+
+TFuture<TNodeId> TRawBatchRequest::Link(
+ const TTransactionId& transaction,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "link",
+ SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options),
+ Nothing());
+}
+
+TFuture<TLockId> TRawBatchRequest::Lock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "lock",
+ SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::Unlock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TUnlockOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "unlock",
+ SerializeParamsForUnlock(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache(
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions& options)
+{
+ return AddRequest<TGetFileFromCacheParser>(
+ "get_file_from_cache",
+ SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
+ Nothing());
+}
+
+TFuture<TYPath> TRawBatchRequest::PutFileToCache(
+ const TTransactionId& transactionId,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options)
+{
+ return AddRequest<TYPathParser>(
+ "put_file_to_cache",
+ SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options),
+ Nothing());
+}
+
+TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission(
+ const TString& user,
+ EPermission permission,
+ const TYPath& path,
+ const TCheckPermissionOptions& options)
+{
+ return AddRequest<TCheckPermissionParser>(
+ "check_permission",
+ SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<TOperationAttributes> TRawBatchRequest::GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options)
+{
+ return AddRequest<TGetOperationResponseParser>(
+ "get_operation",
+ SerializeParamsForGetOperation(operationId, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId)
+{
+ return AddRequest<TVoidResponseParser>(
+ "abort_op",
+ SerializeParamsForAbortOperation(operationId),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId)
+{
+ return AddRequest<TVoidResponseParser>(
+ "complete_op",
+ SerializeParamsForCompleteOperation(operationId),
+ Nothing());
+}
+TFuture<void> TRawBatchRequest::SuspendOperation(
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "suspend_operation",
+ SerializeParamsForSuspendOperation(operationId, options),
+ Nothing());
+}
+TFuture<void> TRawBatchRequest::ResumeOperation(
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "resume_operation",
+ SerializeParamsForResumeOperation(operationId, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "update_op_parameters",
+ SerializeParamsForUpdateOperationParameters(operationId, options),
+ Nothing());
+}
+
+TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path)
+{
+ if (path.Path_.find_first_of("<>{}[]") != TString::npos) {
+ return AddRequest<TCanonizeYPathResponseParser>(
+ "parse_ypath",
+ SerializeParamsForParseYPath(path),
+ Nothing(),
+ MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, path));
+ } else {
+ TRichYPath result = path;
+ result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix);
+ return NThreading::MakeFuture(result);
+ }
+}
+
+TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options)
+{
+ return AddRequest<TTableColumnarStatisticsParser>(
+ "get_table_columnar_statistics",
+ SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
+ Nothing());
+}
+
+TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options)
+{
+ return AddRequest<TTablePartitionsParser>(
+ "partition_tables",
+ SerializeParamsForGetTablePartitions(transaction, paths, options),
+ Nothing());
+}
+
+void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
+{
+ Y_VERIFY(result);
+ Y_VERIFY(nextTry);
+
+ *nextTry = TInstant();
+ maxSize = Min(maxSize, BatchItemList_.size());
+ *result = TNode::CreateList();
+ for (size_t i = 0; i < maxSize; ++i) {
+ YT_LOG_DEBUG("ExecuteBatch preparing: %v",
+ RequestInfo(BatchItemList_[i].Parameters));
+
+ result->Add(BatchItemList_[i].Parameters);
+ if (BatchItemList_[i].NextTry > *nextTry) {
+ *nextTry = BatchItemList_[i].NextTry;
+ }
+ }
+}
+
+void TRawBatchRequest::ParseResponse(
+ const TResponseInfo& requestResult,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ TRawBatchRequest* retryBatch,
+ TInstant now)
+{
+ TNode node = NodeFromYsonString(requestResult.Response);
+ return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now);
+}
+
+void TRawBatchRequest::ParseResponse(
+ TNode node,
+ const TString& requestId,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ TRawBatchRequest* retryBatch,
+ TInstant now)
+{
+ Y_VERIFY(retryBatch);
+
+ EnsureType(node, TNode::List);
+ auto& responseList = node.AsList();
+ const auto size = responseList.size();
+ Y_ENSURE(size <= BatchItemList_.size(),
+ "Size of server response exceeds size of batch request;"
+ " size of batch: " << BatchItemList_.size() <<
+ " size of server response: " << size << '.');
+
+ for (size_t i = 0; i != size; ++i) {
+ try {
+ EnsureType(responseList[i], TNode::Map);
+ auto& responseNode = responseList[i].AsMap();
+ const auto outputIt = responseNode.find("output");
+ if (outputIt != responseNode.end()) {
+ BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
+ } else {
+ const auto errorIt = responseNode.find("error");
+ if (errorIt == responseNode.end()) {
+ BatchItemList_[i].ResponseParser->SetResponse(Nothing());
+ } else {
+ TErrorResponse error(400, requestId);
+ error.SetError(TYtError(errorIt->second));
+ if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
+ YT_LOG_INFO(
+ "Batch subrequest (%s) failed, will retry, error: %s",
+ RequestInfo(BatchItemList_[i].Parameters),
+ error.what());
+ retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
+ } else {
+ YT_LOG_ERROR(
+ "Batch subrequest (%s) failed, error: %s",
+ RequestInfo(BatchItemList_[i].Parameters),
+ error.what());
+ BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
+ }
+ }
+ }
+ } catch (const std::exception& e) {
+ // We don't expect other exceptions, so we don't catch (...)
+ BatchItemList_[i].ResponseParser->SetException(std::current_exception());
+ }
+ }
+ BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
+}
+
+void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const
+{
+ for (const auto& batchItem : BatchItemList_) {
+ batchItem.ResponseParser->SetException(e);
+ }
+}
+
+size_t TRawBatchRequest::BatchSize() const
+{
+ return BatchItemList_.size();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail::NRawClient
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.h b/yt/cpp/mapreduce/raw_client/raw_batch_request.h
new file mode 100644
index 0000000000..7ed5bebf5e
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.h
@@ -0,0 +1,190 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/common/fwd.h>
+
+#include <yt/cpp/mapreduce/interface/batch_request.h>
+#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/interface/node.h>
+#include <yt/cpp/mapreduce/interface/retry_policy.h>
+
+#include <yt/cpp/mapreduce/http/requests.h>
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/deque.h>
+
+#include <exception>
+
+namespace NYT::NDetail {
+ struct TResponseInfo;
+}
+
+namespace NYT::NDetail::NRawClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRawBatchRequest
+ : public TThrRefBase
+{
+public:
+ struct IResponseItemParser
+ : public TThrRefBase
+ {
+ ~IResponseItemParser() = default;
+
+ virtual void SetResponse(TMaybe<TNode> node) = 0;
+ virtual void SetException(std::exception_ptr e) = 0;
+ };
+
+public:
+ TRawBatchRequest(const TConfigPtr& config);
+ ~TRawBatchRequest();
+
+ bool IsExecuted() const;
+ void MarkExecuted();
+
+ void FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const;
+
+ size_t BatchSize() const;
+
+ void ParseResponse(
+ const TResponseInfo& requestResult,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ TRawBatchRequest* retryBatch,
+ TInstant now = TInstant::Now());
+ void ParseResponse(
+ TNode response,
+ const TString& requestId,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ TRawBatchRequest* retryBatch,
+ TInstant now = TInstant::Now());
+ void SetErrorResult(std::exception_ptr e) const;
+
+ ::NThreading::TFuture<TNodeId> Create(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ENodeType type,
+ const TCreateOptions& options);
+ ::NThreading::TFuture<void> Remove(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TRemoveOptions& options);
+ ::NThreading::TFuture<bool> Exists(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TExistsOptions& options);
+ ::NThreading::TFuture<TNode> Get(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TGetOptions& options);
+ ::NThreading::TFuture<void> Set(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TNode& value,
+ const TSetOptions& options);
+ ::NThreading::TFuture<TNode::TListType> List(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TListOptions& options);
+ ::NThreading::TFuture<TNodeId> Copy(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options);
+ ::NThreading::TFuture<TNodeId> Move(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options);
+ ::NThreading::TFuture<TNodeId> Link(
+ const TTransactionId& transaction,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options);
+ ::NThreading::TFuture<TLockId> Lock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options);
+ ::NThreading::TFuture<void> Unlock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TUnlockOptions& options);
+ ::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache(
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions& options);
+ ::NThreading::TFuture<TYPath> PutFileToCache(
+ const TTransactionId& transactionId,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options);
+ ::NThreading::TFuture<TCheckPermissionResponse> CheckPermission(
+ const TString& user,
+ EPermission permission,
+ const TYPath& path,
+ const TCheckPermissionOptions& options);
+ ::NThreading::TFuture<TOperationAttributes> GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options);
+ ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId);
+ ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId);
+ ::NThreading::TFuture<void> SuspendOperation(
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options);
+ ::NThreading::TFuture<void> ResumeOperation(
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options);
+ ::NThreading::TFuture<void> UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options);
+ ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path);
+ ::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options);
+ ::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options);
+
+private:
+ struct TBatchItem {
+ TNode Parameters;
+ ::TIntrusivePtr<IResponseItemParser> ResponseParser;
+ TInstant NextTry;
+
+ TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser);
+
+ TBatchItem(const TBatchItem& batchItem, TInstant nextTry);
+ };
+
+private:
+ template <typename TResponseParser>
+ typename TResponseParser::TFutureResult AddRequest(
+ const TString& command,
+ TNode parameters,
+ TMaybe<TNode> input);
+
+ template <typename TResponseParser>
+ typename TResponseParser::TFutureResult AddRequest(
+ const TString& command,
+ TNode parameters,
+ TMaybe<TNode> input,
+ ::TIntrusivePtr<TResponseParser> parser);
+
+ void AddRequest(TBatchItem batchItem);
+
+private:
+ TConfigPtr Config_;
+
+ TDeque<TBatchItem> BatchItemList_;
+ bool Executed_ = false;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail::NRawClient
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
new file mode 100644
index 0000000000..26120759fd
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -0,0 +1,1027 @@
+#include "raw_requests.h"
+
+#include "raw_batch_request.h"
+#include "rpc_parameters_serialization.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+
+#include <yt/cpp/mapreduce/http/fwd.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/helpers.h>
+#include <yt/cpp/mapreduce/http/http_client.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
+#include <yt/cpp/mapreduce/interface/tvm.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <util/generic/guid.h>
+#include <util/generic/scope.h>
+
+namespace NYT::NDetail::NRawClient {
+
+///////////////////////////////////////////////////////////////////////////////
+
+void ExecuteBatch(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ TRawBatchRequest& batchRequest,
+ const TExecuteBatchOptions& options)
+{
+ if (batchRequest.IsExecuted()) {
+ ythrow yexception() << "Cannot execute batch request since it is already executed";
+ }
+ Y_DEFER {
+ batchRequest.MarkExecuted();
+ };
+
+ const auto concurrency = options.Concurrency_.GetOrElse(50);
+ const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
+
+ if (!retryPolicy) {
+ retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
+ }
+
+ while (batchRequest.BatchSize()) {
+ TRawBatchRequest retryBatch(context.Config);
+
+ while (batchRequest.BatchSize()) {
+ auto parameters = TNode::CreateMap();
+ TInstant nextTry;
+ batchRequest.FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
+ if (nextTry) {
+ SleepUntil(nextTry);
+ }
+ parameters["concurrency"] = concurrency;
+ auto body = NodeToYsonString(parameters);
+ THttpHeader header("POST", "execute_batch");
+ header.AddMutationId();
+ NDetail::TResponseInfo result;
+ try {
+ result = RetryRequestWithPolicy(retryPolicy, context, header, body);
+ } catch (const std::exception& e) {
+ batchRequest.SetErrorResult(std::current_exception());
+ retryBatch.SetErrorResult(std::current_exception());
+ throw;
+ }
+ batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch);
+ }
+
+ batchRequest = std::move(retryBatch);
+ }
+}
+
+TNode Get(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ THttpHeader header("GET", "get");
+ header.MergeParameters(SerializeParamsForGet(transactionId, context.Config->Prefix, path, options));
+ return NodeFromYsonString(RetryRequestWithPolicy(retryPolicy, context, header).Response);
+}
+
+TNode TryGet(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ try {
+ return Get(retryPolicy, context, transactionId, path, options);
+ } catch (const TErrorResponse& error) {
+ if (!error.IsResolveError()) {
+ throw;
+ }
+ return TNode();
+ }
+}
+
+void Set(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode& value,
+ const TSetOptions& options)
+{
+ THttpHeader header("PUT", "set");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForSet(transactionId, context.Config->Prefix, path, options));
+ auto body = NodeToYsonString(value);
+ RetryRequestWithPolicy(retryPolicy, context, header, body);
+}
+
+void MultisetAttributes(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode::TMapType& value,
+ const TMultisetAttributesOptions& options)
+{
+ THttpHeader header("PUT", "api/v4/multiset_attributes", false);
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForMultisetAttributes(transactionId, context.Config->Prefix, path, options));
+
+ auto body = NodeToYsonString(value);
+ RetryRequestWithPolicy(retryPolicy, context, header, body);
+}
+
+bool Exists(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TExistsOptions& options)
+{
+ THttpHeader header("GET", "exists");
+ header.MergeParameters(SerializeParamsForExists(transactionId, context.Config->Prefix, path, options));
+ return ParseBoolFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
+}
+
+TNodeId Create(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const ENodeType& type,
+ const TCreateOptions& options)
+{
+ THttpHeader header("POST", "create");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForCreate(transactionId, context.Config->Prefix, path, type, options));
+ return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
+}
+
+TNodeId Copy(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options)
+{
+ THttpHeader header("POST", "copy");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
+ return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
+}
+
+TNodeId Move(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options)
+{
+ THttpHeader header("POST", "move");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
+ return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
+}
+
+void Remove(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TRemoveOptions& options)
+{
+ THttpHeader header("POST", "remove");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForRemove(transactionId, context.Config->Prefix, path, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+TNode::TListType List(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TListOptions& options)
+{
+ THttpHeader header("GET", "list");
+
+ TYPath updatedPath = AddPathPrefix(path, context.Config->Prefix);
+ // Translate "//" to "/"
+ // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config"
+ if (path.empty() && updatedPath.EndsWith('/')) {
+ updatedPath.pop_back();
+ }
+ header.MergeParameters(SerializeParamsForList(transactionId, context.Config->Prefix, updatedPath, options));
+ auto result = RetryRequestWithPolicy(retryPolicy, context, header);
+ return NodeFromYsonString(result.Response).AsList();
+}
+
+TNodeId Link(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options)
+{
+ THttpHeader header("POST", "link");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForLink(transactionId, context.Config->Prefix, targetPath, linkPath, options));
+ return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
+}
+
+TLockId Lock(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options)
+{
+ THttpHeader header("POST", "lock");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForLock(transactionId, context.Config->Prefix, path, mode, options));
+ return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
+}
+
+void Unlock(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TUnlockOptions& options)
+{
+ THttpHeader header("POST", "unlock");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForUnlock(transactionId, context.Config->Prefix, path, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void Concatenate(
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& sourcePaths,
+ const TRichYPath& destinationPath,
+ const TConcatenateOptions& options)
+{
+ THttpHeader header("POST", "concatenate");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForConcatenate(transactionId, context.Config->Prefix, sourcePaths, destinationPath, options));
+ RequestWithoutRetry(context, header);
+}
+
+void PingTx(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId)
+{
+ THttpHeader header("POST", "ping_tx");
+ header.MergeParameters(SerializeParamsForPingTx(transactionId));
+ TRequestConfig requestConfig;
+ requestConfig.HttpConfig = NHttpClient::THttpConfig{
+ .SocketTimeout = context.Config->PingTimeout
+ };
+ RetryRequestWithPolicy(retryPolicy, context, header, {}, requestConfig);
+}
+
+TOperationAttributes ParseOperationAttributes(const TNode& node)
+{
+ const auto& mapNode = node.AsMap();
+ TOperationAttributes result;
+
+ if (auto idNode = mapNode.FindPtr("id")) {
+ result.Id = GetGuid(idNode->AsString());
+ }
+
+ if (auto typeNode = mapNode.FindPtr("type")) {
+ result.Type = FromString<EOperationType>(typeNode->AsString());
+ } else if (auto operationTypeNode = mapNode.FindPtr("operation_type")) {
+ // COMPAT(levysotsky): "operation_type" is a deprecated synonim for "type".
+ // This branch should be removed when all clusters are updated.
+ result.Type = FromString<EOperationType>(operationTypeNode->AsString());
+ }
+
+ if (auto stateNode = mapNode.FindPtr("state")) {
+ result.State = stateNode->AsString();
+ // We don't use FromString here, because OS_IN_PROGRESS unites many states: "initializing", "running", etc.
+ if (*result.State == "completed") {
+ result.BriefState = EOperationBriefState::Completed;
+ } else if (*result.State == "aborted") {
+ result.BriefState = EOperationBriefState::Aborted;
+ } else if (*result.State == "failed") {
+ result.BriefState = EOperationBriefState::Failed;
+ } else {
+ result.BriefState = EOperationBriefState::InProgress;
+ }
+ }
+ if (auto authenticatedUserNode = mapNode.FindPtr("authenticated_user")) {
+ result.AuthenticatedUser = authenticatedUserNode->AsString();
+ }
+ if (auto startTimeNode = mapNode.FindPtr("start_time")) {
+ result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
+ }
+ if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
+ result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
+ }
+ auto briefProgressNode = mapNode.FindPtr("brief_progress");
+ if (briefProgressNode && briefProgressNode->HasKey("jobs")) {
+ result.BriefProgress.ConstructInPlace();
+ static auto load = [] (const TNode& item) {
+ // Backward compatibility with old YT versions
+ return item.IsInt64() ? item.AsInt64() : item["total"].AsInt64();
+ };
+ const auto& jobs = (*briefProgressNode)["jobs"];
+ result.BriefProgress->Aborted = load(jobs["aborted"]);
+ result.BriefProgress->Completed = load(jobs["completed"]);
+ result.BriefProgress->Running = jobs["running"].AsInt64();
+ result.BriefProgress->Total = jobs["total"].AsInt64();
+ result.BriefProgress->Failed = jobs["failed"].AsInt64();
+ result.BriefProgress->Lost = jobs["lost"].AsInt64();
+ result.BriefProgress->Pending = jobs["pending"].AsInt64();
+ }
+ if (auto briefSpecNode = mapNode.FindPtr("brief_spec")) {
+ result.BriefSpec = *briefSpecNode;
+ }
+ if (auto specNode = mapNode.FindPtr("spec")) {
+ result.Spec = *specNode;
+ }
+ if (auto fullSpecNode = mapNode.FindPtr("full_spec")) {
+ result.FullSpec = *fullSpecNode;
+ }
+ if (auto unrecognizedSpecNode = mapNode.FindPtr("unrecognized_spec")) {
+ result.UnrecognizedSpec = *unrecognizedSpecNode;
+ }
+ if (auto suspendedNode = mapNode.FindPtr("suspended")) {
+ result.Suspended = suspendedNode->AsBool();
+ }
+ if (auto resultNode = mapNode.FindPtr("result")) {
+ result.Result.ConstructInPlace();
+ auto error = TYtError((*resultNode)["error"]);
+ if (error.GetCode() != 0) {
+ result.Result->Error = std::move(error);
+ }
+ }
+ if (auto progressNode = mapNode.FindPtr("progress")) {
+ const auto& progressMap = progressNode->AsMap();
+ TMaybe<TInstant> buildTime;
+ if (auto buildTimeNode = progressMap.FindPtr("build_time")) {
+ buildTime = TInstant::ParseIso8601(buildTimeNode->AsString());
+ }
+ TJobStatistics jobStatistics;
+ if (auto jobStatisticsNode = progressMap.FindPtr("job_statistics")) {
+ jobStatistics = TJobStatistics(*jobStatisticsNode);
+ }
+ TJobCounters jobCounters;
+ if (auto jobCountersNode = progressMap.FindPtr("total_job_counter")) {
+ jobCounters = TJobCounters(*jobCountersNode);
+ }
+ result.Progress = TOperationProgress{
+ .JobStatistics = std::move(jobStatistics),
+ .JobCounters = std::move(jobCounters),
+ .BuildTime = buildTime,
+ };
+ }
+ if (auto eventsNode = mapNode.FindPtr("events")) {
+ result.Events.ConstructInPlace().reserve(eventsNode->Size());
+ for (const auto& eventNode : eventsNode->AsList()) {
+ result.Events->push_back(TOperationEvent{
+ eventNode["state"].AsString(),
+ TInstant::ParseIso8601(eventNode["time"].AsString()),
+ });
+ }
+ }
+ if (auto alertsNode = mapNode.FindPtr("alerts")) {
+ result.Alerts.ConstructInPlace();
+ for (const auto& [alertType, alertError] : alertsNode->AsMap()) {
+ result.Alerts->emplace(alertType, TYtError(alertError));
+ }
+ }
+
+ return result;
+}
+
+TOperationAttributes GetOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TGetOperationOptions& options)
+{
+ THttpHeader header("GET", "get_operation");
+ header.MergeParameters(SerializeParamsForGetOperation(operationId, options));
+ auto result = RetryRequestWithPolicy(retryPolicy, context, header);
+ return ParseOperationAttributes(NodeFromYsonString(result.Response));
+}
+
+void AbortOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId)
+{
+ THttpHeader header("POST", "abort_op");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForAbortOperation(operationId));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void CompleteOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId)
+{
+ THttpHeader header("POST", "complete_op");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForCompleteOperation(operationId));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void SuspendOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options)
+{
+ THttpHeader header("POST", "suspend_op");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForSuspendOperation(operationId, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void ResumeOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options)
+{
+ THttpHeader header("POST", "resume_op");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForResumeOperation(operationId, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+template <typename TKey>
+static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
+{
+ THashMap<TKey, i64> counts;
+ for (const auto& entry : countsNode.AsMap()) {
+ counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
+ }
+ return counts;
+}
+
+TListOperationsResult ListOperations(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TListOperationsOptions& options)
+{
+ THttpHeader header("GET", "list_operations");
+ header.MergeParameters(SerializeParamsForListOperations(options));
+ auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+
+ TListOperationsResult result;
+ for (const auto& operationNode : resultNode["operations"].AsList()) {
+ result.Operations.push_back(ParseOperationAttributes(operationNode));
+ }
+
+ if (resultNode.HasKey("pool_counts")) {
+ result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
+ }
+ if (resultNode.HasKey("user_counts")) {
+ result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
+ }
+ if (resultNode.HasKey("type_counts")) {
+ result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
+ }
+ if (resultNode.HasKey("state_counts")) {
+ result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
+ }
+ if (resultNode.HasKey("failed_jobs_count")) {
+ result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
+ }
+
+ result.Incomplete = resultNode["incomplete"].AsBool();
+
+ return result;
+}
+
+void UpdateOperationParameters(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options)
+{
+ THttpHeader header("POST", "update_op_parameters");
+ header.MergeParameters(SerializeParamsForUpdateOperationParameters(operationId, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+TJobAttributes ParseJobAttributes(const TNode& node)
+{
+ const auto& mapNode = node.AsMap();
+ TJobAttributes result;
+
+ // Currently "get_job" returns "job_id" field and "list_jobs" returns "id" field.
+ auto idNode = mapNode.FindPtr("id");
+ if (!idNode) {
+ idNode = mapNode.FindPtr("job_id");
+ }
+ if (idNode) {
+ result.Id = GetGuid(idNode->AsString());
+ }
+
+ if (auto typeNode = mapNode.FindPtr("type")) {
+ result.Type = FromString<EJobType>(typeNode->AsString());
+ }
+ if (auto stateNode = mapNode.FindPtr("state")) {
+ result.State = FromString<EJobState>(stateNode->AsString());
+ }
+ if (auto addressNode = mapNode.FindPtr("address")) {
+ result.Address = addressNode->AsString();
+ }
+ if (auto taskNameNode = mapNode.FindPtr("task_name")) {
+ result.TaskName = taskNameNode->AsString();
+ }
+ if (auto startTimeNode = mapNode.FindPtr("start_time")) {
+ result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
+ }
+ if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
+ result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
+ }
+ if (auto progressNode = mapNode.FindPtr("progress")) {
+ result.Progress = progressNode->AsDouble();
+ }
+ if (auto stderrSizeNode = mapNode.FindPtr("stderr_size")) {
+ result.StderrSize = stderrSizeNode->AsUint64();
+ }
+ if (auto errorNode = mapNode.FindPtr("error")) {
+ result.Error.ConstructInPlace(*errorNode);
+ }
+ if (auto briefStatisticsNode = mapNode.FindPtr("brief_statistics")) {
+ result.BriefStatistics = *briefStatisticsNode;
+ }
+ if (auto inputPathsNode = mapNode.FindPtr("input_paths")) {
+ const auto& inputPathNodesList = inputPathsNode->AsList();
+ result.InputPaths.ConstructInPlace();
+ result.InputPaths->reserve(inputPathNodesList.size());
+ for (const auto& inputPathNode : inputPathNodesList) {
+ TRichYPath path;
+ Deserialize(path, inputPathNode);
+ result.InputPaths->push_back(std::move(path));
+ }
+ }
+ if (auto coreInfosNode = mapNode.FindPtr("core_infos")) {
+ const auto& coreInfoNodesList = coreInfosNode->AsList();
+ result.CoreInfos.ConstructInPlace();
+ result.CoreInfos->reserve(coreInfoNodesList.size());
+ for (const auto& coreInfoNode : coreInfoNodesList) {
+ TCoreInfo coreInfo;
+ coreInfo.ProcessId = coreInfoNode["process_id"].AsInt64();
+ coreInfo.ExecutableName = coreInfoNode["executable_name"].AsString();
+ if (coreInfoNode.HasKey("size")) {
+ coreInfo.Size = coreInfoNode["size"].AsUint64();
+ }
+ if (coreInfoNode.HasKey("error")) {
+ coreInfo.Error.ConstructInPlace(coreInfoNode["error"]);
+ }
+ result.CoreInfos->push_back(std::move(coreInfo));
+ }
+ }
+ return result;
+}
+
+TJobAttributes GetJob(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobOptions& options)
+{
+ THttpHeader header("GET", "get_job");
+ header.MergeParameters(SerializeParamsForGetJob(operationId, jobId, options));
+ auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+ return ParseJobAttributes(resultNode);
+}
+
+TListJobsResult ListJobs(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TListJobsOptions& options)
+{
+ THttpHeader header("GET", "list_jobs");
+ header.MergeParameters(SerializeParamsForListJobs(operationId, options));
+ auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+
+ TListJobsResult result;
+
+ const auto& jobNodesList = resultNode["jobs"].AsList();
+ result.Jobs.reserve(jobNodesList.size());
+ for (const auto& jobNode : jobNodesList) {
+ result.Jobs.push_back(ParseJobAttributes(jobNode));
+ }
+
+ if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
+ result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
+ }
+ if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
+ result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
+ }
+ if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
+ result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
+ }
+
+ return result;
+}
+
+class TResponseReader
+ : public IFileReader
+{
+public:
+ TResponseReader(const TClientContext& context, THttpHeader header)
+ {
+ if (context.ServiceTicketAuth) {
+ header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(context.Token);
+ }
+
+ auto hostName = GetProxyForHeavyRequest(context);
+ auto requestId = CreateGuidAsString();
+
+ Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
+ ResponseStream_ = Response_->GetResponseStream();
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ return ResponseStream_->Read(buf, len);
+ }
+
+ size_t DoSkip(size_t len) override
+ {
+ return ResponseStream_->Skip(len);
+ }
+
+private:
+ THttpRequest Request_;
+ NHttpClient::IHttpResponsePtr Response_;
+ IInputStream* ResponseStream_;
+};
+
+IFileReaderPtr GetJobInput(
+ const TClientContext& context,
+ const TJobId& jobId,
+ const TGetJobInputOptions& /* options */)
+{
+ THttpHeader header("GET", "get_job_input");
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ return new TResponseReader(context, std::move(header));
+}
+
+IFileReaderPtr GetJobFailContext(
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobFailContextOptions& /* options */)
+{
+ THttpHeader header("GET", "get_job_fail_context");
+ header.AddOperationId(operationId);
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ return new TResponseReader(context, std::move(header));
+}
+
+TString GetJobStderrWithRetries(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& /* options */)
+{
+ THttpHeader header("GET", "get_job_stderr");
+ header.AddOperationId(operationId);
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header, {}, config);
+ return responseInfo.Response;
+}
+
+IFileReaderPtr GetJobStderr(
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& /* options */)
+{
+ THttpHeader header("GET", "get_job_stderr");
+ header.AddOperationId(operationId);
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ return new TResponseReader(context, std::move(header));
+}
+
+TMaybe<TYPath> GetFileFromCache(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions& options)
+{
+ THttpHeader header("GET", "get_file_from_cache");
+ header.MergeParameters(SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options));
+ auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
+ auto path = NodeFromYsonString(responseInfo.Response).AsString();
+ return path.empty() ? Nothing() : TMaybe<TYPath>(path);
+}
+
+TYPath PutFileToCache(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options)
+{
+ THttpHeader header("POST", "put_file_to_cache");
+ header.MergeParameters(SerializeParamsForPutFileToCache(transactionId, context.Config->Prefix, filePath, md5Signature, cachePath, options));
+ auto result = RetryRequestWithPolicy(retryPolicy, context, header);
+ return NodeFromYsonString(result.Response).AsString();
+}
+
+TNode::TListType SkyShareTable(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options)
+{
+ THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
+
+ auto proxyName = context.ServerName.substr(0, context.ServerName.find('.'));
+
+ auto host = context.Config->SkynetApiHost;
+ if (host == "") {
+ host = "skynet." + proxyName + ".yt.yandex.net";
+ }
+
+ header.MergeParameters(SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, options));
+ TClientContext skyApiHost({ .ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient() });
+ TResponseInfo response = {};
+
+ // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu
+ // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK).
+ while (response.HttpCode != 200) {
+ response = RetryRequestWithPolicy(retryPolicy, skyApiHost, header, "");
+ TWaitProxy::Get()->Sleep(TDuration::Seconds(5));
+ }
+
+ if (options.KeyColumns_) {
+ return NodeFromJsonString(response.Response)["torrents"].AsList();
+ } else {
+ TNode torrent;
+
+ torrent["key"] = TNode::CreateList();
+ torrent["rbtorrent"] = response.Response;
+
+ return TNode::TListType{ torrent };
+ }
+}
+
+TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
+{
+ auto parseSingleResult = [] (const TNode::TMapType& node) {
+ TCheckPermissionResult result;
+ result.Action = ::FromString<ESecurityAction>(node.at("action").AsString());
+ if (auto objectId = node.FindPtr("object_id")) {
+ result.ObjectId = GetGuid(objectId->AsString());
+ }
+ if (auto objectName = node.FindPtr("object_name")) {
+ result.ObjectName = objectName->AsString();
+ }
+ if (auto subjectId = node.FindPtr("subject_id")) {
+ result.SubjectId = GetGuid(subjectId->AsString());
+ }
+ if (auto subjectName = node.FindPtr("subject_name")) {
+ result.SubjectName = subjectName->AsString();
+ }
+ return result;
+ };
+
+ const auto& mapNode = node.AsMap();
+ TCheckPermissionResponse result;
+ static_cast<TCheckPermissionResult&>(result) = parseSingleResult(mapNode);
+ if (auto columns = mapNode.FindPtr("columns")) {
+ result.Columns.reserve(columns->AsList().size());
+ for (const auto& columnNode : columns->AsList()) {
+ result.Columns.push_back(parseSingleResult(columnNode.AsMap()));
+ }
+ }
+ return result;
+}
+
+TCheckPermissionResponse CheckPermission(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TString& user,
+ EPermission permission,
+ const TYPath& path,
+ const TCheckPermissionOptions& options)
+{
+ THttpHeader header("GET", "check_permission");
+ header.MergeParameters(SerializeParamsForCheckPermission(user, permission, context.Config->Prefix, path, options));
+ auto response = RetryRequestWithPolicy(retryPolicy, context, header);
+ return ParseCheckPermissionResponse(NodeFromYsonString(response.Response));
+}
+
+TVector<TTabletInfo> GetTabletInfos(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TYPath& path,
+ const TVector<int>& tabletIndexes,
+ const TGetTabletInfosOptions& options)
+{
+ THttpHeader header("POST", "api/v4/get_tablet_infos", false);
+ header.MergeParameters(SerializeParamsForGetTabletInfos(context.Config->Prefix, path, tabletIndexes, options));
+ auto response = RetryRequestWithPolicy(retryPolicy, context, header);
+ TVector<TTabletInfo> result;
+ Deserialize(result, *NodeFromYsonString(response.Response).AsMap().FindPtr("tablets"));
+ return result;
+}
+
+TVector<TTableColumnarStatistics> GetTableColumnarStatistics(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options)
+{
+ THttpHeader header("GET", "get_table_columnar_statistics");
+ header.MergeParameters(SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto requestResult = RetryRequestWithPolicy(retryPolicy, context, header, {}, config);
+ auto response = NodeFromYsonString(requestResult.Response);
+ TVector<TTableColumnarStatistics> result;
+ Deserialize(result, response);
+ return result;
+}
+
+TMultiTablePartitions GetTablePartitions(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options)
+{
+ THttpHeader header("GET", "partition_tables");
+ header.MergeParameters(SerializeParamsForGetTablePartitions(transactionId, paths, options));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto requestResult = RetryRequestWithPolicy(retryPolicy, context, header, {}, config);
+ auto response = NodeFromYsonString(requestResult.Response);
+ TMultiTablePartitions result;
+ Deserialize(result, response);
+ return result;
+}
+
+TRichYPath CanonizeYPath(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TRichYPath& path)
+{
+ return CanonizeYPaths(retryPolicy, context, {path}).front();
+}
+
+TVector<TRichYPath> CanonizeYPaths(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TVector<TRichYPath>& paths)
+{
+ TRawBatchRequest batch(context.Config);
+ TVector<NThreading::TFuture<TRichYPath>> futures;
+ futures.reserve(paths.size());
+ for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
+ futures.push_back(batch.CanonizeYPath(paths[i]));
+ }
+ ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{});
+ TVector<TRichYPath> result;
+ result.reserve(futures.size());
+ for (auto& future : futures) {
+ result.push_back(future.ExtractValueSync());
+ }
+ return result;
+}
+
+void AlterTable(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TAlterTableOptions& options)
+{
+ THttpHeader header("POST", "alter_table");
+ header.AddMutationId();
+ header.MergeParameters(SerializeParamsForAlterTable(transactionId, context.Config->Prefix, path, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void AlterTableReplica(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TReplicaId& replicaId,
+ const TAlterTableReplicaOptions& options)
+{
+ THttpHeader header("POST", "alter_table_replica");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void DeleteRows(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TYPath& path,
+ const TNode::TListType& keys,
+ const TDeleteRowsOptions& options)
+{
+ THttpHeader header("PUT", "delete_rows");
+ header.SetInputFormat(TFormat::YsonBinary());
+ header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(context.Config->Prefix, path, options));
+
+ auto body = NodeListToYsonString(keys);
+ TRequestConfig requestConfig;
+ requestConfig.IsHeavy = true;
+ RetryRequestWithPolicy(retryPolicy, context, header, body, requestConfig);
+}
+
+void FreezeTable(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TYPath& path,
+ const TFreezeTableOptions& options)
+{
+ THttpHeader header("POST", "freeze_table");
+ header.MergeParameters(SerializeParamsForFreezeTable(context.Config->Prefix, path, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void UnfreezeTable(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TYPath& path,
+ const TUnfreezeTableOptions& options)
+{
+ THttpHeader header("POST", "unfreeze_table");
+ header.MergeParameters(SerializeParamsForUnfreezeTable(context.Config->Prefix, path, options));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void AbortTransaction(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId)
+{
+ THttpHeader header("POST", "abort_tx");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+void CommitTransaction(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId)
+{
+ THttpHeader header("POST", "commit_tx");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId));
+ RetryRequestWithPolicy(retryPolicy, context, header);
+}
+
+TTransactionId StartTransaction(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& parentTransactionId,
+ const TStartTransactionOptions& options)
+{
+ THttpHeader header("POST", "start_tx");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, context.Config->TxTimeout, options));
+ return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail::NRawClient
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
new file mode 100644
index 0000000000..05fcbade76
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -0,0 +1,397 @@
+#pragma once
+
+#include "raw_batch_request.h"
+
+#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/interface/client_method_options.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class IRequestRetryPolicy;
+struct TClientContext;
+struct TExecuteBatchOptions;
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail::NRawClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TOperationAttributes ParseOperationAttributes(const TNode& node);
+
+TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node);
+
+////////////////////////////////////////////////////////////////////////////////
+
+//
+// marks `batchRequest' as executed
+void ExecuteBatch(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ TRawBatchRequest& batchRequest,
+ const TExecuteBatchOptions& options = TExecuteBatchOptions());
+
+//
+// Cypress
+//
+
+TNode Get(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options = TGetOptions());
+
+TNode TryGet(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options);
+
+void Set(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode& value,
+ const TSetOptions& options = TSetOptions());
+
+void MultisetAttributes(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode::TMapType& value,
+ const TMultisetAttributesOptions& options = TMultisetAttributesOptions());
+
+bool Exists(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TExistsOptions& options = TExistsOptions());
+
+TNodeId Create(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const ENodeType& type,
+ const TCreateOptions& options = TCreateOptions());
+
+TNodeId Copy(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options = TCopyOptions());
+
+TNodeId Move(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options = TMoveOptions());
+
+void Remove(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TRemoveOptions& options = TRemoveOptions());
+
+TNode::TListType List(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TListOptions& options = TListOptions());
+
+TNodeId Link(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options = TLinkOptions());
+
+TLockId Lock(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options = TLockOptions());
+
+void Unlock(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TUnlockOptions& options = TUnlockOptions());
+
+void Concatenate(
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& sourcePaths,
+ const TRichYPath& destinationPath,
+ const TConcatenateOptions& options = TConcatenateOptions());
+
+//
+// Transactions
+//
+
+void PingTx(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId);
+
+//
+// Operations
+//
+
+TOperationAttributes GetOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TGetOperationOptions& options = TGetOperationOptions());
+
+void AbortOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId);
+
+void CompleteOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId);
+
+void SuspendOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options = TSuspendOperationOptions());
+
+void ResumeOperation(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options = TResumeOperationOptions());
+
+TListOperationsResult ListOperations(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TListOperationsOptions& options = TListOperationsOptions());
+
+void UpdateOperationParameters(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options = TUpdateOperationParametersOptions());
+
+//
+// Jobs
+//
+
+TJobAttributes GetJob(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobOptions& options = TGetJobOptions());
+
+TListJobsResult ListJobs(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TListJobsOptions& options = TListJobsOptions());
+
+::TIntrusivePtr<IFileReader> GetJobInput(
+ const TClientContext& context,
+ const TJobId& jobId,
+ const TGetJobInputOptions& options = TGetJobInputOptions());
+
+::TIntrusivePtr<IFileReader> GetJobFailContext(
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobFailContextOptions& options = TGetJobFailContextOptions());
+
+TString GetJobStderrWithRetries(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& /* options */ = TGetJobStderrOptions());
+
+::TIntrusivePtr<IFileReader> GetJobStderr(
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& options = TGetJobStderrOptions());
+
+//
+// File cache
+//
+
+TMaybe<TYPath> GetFileFromCache(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions& options = TGetFileFromCacheOptions());
+
+TYPath PutFileToCache(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options = TPutFileToCacheOptions());
+
+//
+// SkyShare
+//
+
+TNode::TListType SkyShareTable(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options);
+
+//
+// Misc
+//
+
+TCheckPermissionResponse CheckPermission(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TString& user,
+ EPermission permission,
+ const TYPath& path,
+ const TCheckPermissionOptions& options = TCheckPermissionOptions());
+
+TVector<TTabletInfo> GetTabletInfos(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TYPath& path,
+ const TVector<int>& tabletIndexes,
+ const TGetTabletInfosOptions& options);
+
+TVector<TTableColumnarStatistics> GetTableColumnarStatistics(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options);
+
+TMultiTablePartitions GetTablePartitions(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options);
+
+TRichYPath CanonizeYPath(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TRichYPath& path);
+
+TVector<TRichYPath> CanonizeYPaths(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TVector<TRichYPath>& paths);
+
+//
+// Tables
+//
+
+void AlterTable(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TAlterTableOptions& options);
+
+void AlterTableReplica(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TReplicaId& replicaId,
+ const TAlterTableReplicaOptions& options);
+
+void DeleteRows(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TYPath& path,
+ const TNode::TListType& keys,
+ const TDeleteRowsOptions& options);
+
+void FreezeTable(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TYPath& path,
+ const TFreezeTableOptions& options);
+
+void UnfreezeTable(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TYPath& path,
+ const TUnfreezeTableOptions& options);
+
+
+// Transactions
+void AbortTransaction(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId);
+
+void CommitTransaction(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId);
+
+TTransactionId StartTransaction(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& parentId,
+ const TStartTransactionOptions& options);
+
+////////////////////////////////////////////////////////////////////////////////
+
+template<typename TSrc, typename TBatchAdder>
+auto BatchTransform(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TSrc& src,
+ TBatchAdder batchAdder,
+ const TExecuteBatchOptions& executeBatchOptions = {})
+{
+ TRawBatchRequest batch(context.Config);
+ using TFuture = decltype(batchAdder(batch, *std::begin(src)));
+ TVector<TFuture> futures;
+ for (const auto& el : src) {
+ futures.push_back(batchAdder(batch, el));
+ }
+ ExecuteBatch(retryPolicy, context, batch, executeBatchOptions);
+ using TDst = decltype(futures[0].ExtractValueSync());
+ TVector<TDst> result;
+ result.reserve(std::size(src));
+ for (auto& future : futures) {
+ result.push_back(future.ExtractValueSync());
+ }
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail::NRawClient
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
new file mode 100644
index 0000000000..1936266d0d
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
@@ -0,0 +1,873 @@
+#include "rpc_parameters_serialization.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/client_method_options.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
+
+#include <library/cpp/yson/node/node.h>
+#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/yson/node/node_builder.h>
+
+#include <util/generic/guid.h>
+#include <util/string/cast.h>
+
+namespace NYT::NDetail::NRawClient {
+
+using ::ToString;
+
+////////////////////////////////////////////////////////////////////
+
+static void SetTransactionIdParam(TNode* node, const TTransactionId& transactionId)
+{
+ if (transactionId != TTransactionId()) {
+ (*node)["transaction_id"] = GetGuidAsString(transactionId);
+ }
+}
+
+static void SetOperationIdParam(TNode* node, const TOperationId& operationId)
+{
+ (*node)["operation_id"] = GetGuidAsString(operationId);
+}
+
+static void SetPathParam(TNode* node, const TString& pathPrefix, const TYPath& path)
+{
+ (*node)["path"] = AddPathPrefix(path, pathPrefix);
+}
+
+static TNode SerializeAttributeFilter(const TAttributeFilter& attributeFilter)
+{
+ TNode result = TNode::CreateList();
+ for (const auto& attribute : attributeFilter.Attributes_) {
+ result.Add(attribute);
+ }
+ return result;
+}
+
+static TNode SerializeAttributeFilter(const TOperationAttributeFilter& attributeFilter)
+{
+ TNode result = TNode::CreateList();
+ for (const auto& attribute : attributeFilter.Attributes_) {
+ result.Add(ToString(attribute));
+ }
+ return result;
+}
+
+template <typename TOptions>
+static void SetFirstLastTabletIndex(TNode* node, const TOptions& options)
+{
+ if (options.FirstTabletIndex_) {
+ (*node)["first_tablet_index"] = *options.FirstTabletIndex_;
+ }
+ if (options.LastTabletIndex_) {
+ (*node)["last_tablet_index"] = *options.LastTabletIndex_;
+ }
+}
+
+static TString GetDefaultTransactionTitle()
+{
+ const auto processState = TProcessState::Get();
+ TStringStream res;
+
+ res << "User transaction. Created by: " << processState->UserName << " on " << processState->FqdnHostName
+ << " client: " << processState->ClientVersion << " pid: " << processState->Pid;
+ if (!processState->CommandLine.empty()) {
+ res << " program: " << processState->CommandLine[0];
+ } else {
+ res << " command line is unknown probably NYT::Initialize was never called";
+ }
+
+#ifndef NDEBUG
+ res << " build: debug";
+#endif
+
+ return res.Str();
+}
+
+template <typename T>
+void SerializeMasterReadOptions(TNode* node, const TMasterReadOptions<T>& options)
+{
+ if (options.ReadFrom_) {
+ (*node)["read_from"] = ToString(*options.ReadFrom_);
+ }
+}
+
+////////////////////////////////////////////////////////////////////
+
+TNode SerializeParamsForCreate(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ ENodeType type,
+ const TCreateOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ result["recursive"] = options.Recursive_;
+ result["type"] = ToString(type);
+ result["ignore_existing"] = options.IgnoreExisting_;
+ result["force"] = options.Force_;
+ if (options.Attributes_) {
+ result["attributes"] = *options.Attributes_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForRemove(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TRemoveOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ result["recursive"] = options.Recursive_;
+ result["force"] = options.Force_;
+ return result;
+}
+
+TNode SerializeParamsForExists(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TExistsOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ SerializeMasterReadOptions(&result, options);
+ return result;
+}
+
+TNode SerializeParamsForGet(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ SerializeMasterReadOptions(&result, options);
+ if (options.AttributeFilter_) {
+ result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_);
+ }
+ if (options.MaxSize_) {
+ result["max_size"] = *options.MaxSize_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForSet(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TSetOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ result["recursive"] = options.Recursive_;
+ if (options.Force_) {
+ result["force"] = *options.Force_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForMultisetAttributes(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ [[maybe_unused]] const TMultisetAttributesOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ return result;
+}
+
+TNode SerializeParamsForList(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TListOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ SerializeMasterReadOptions(&result, options);
+ if (options.MaxSize_) {
+ result["max_size"] = *options.MaxSize_;
+ }
+ if (options.AttributeFilter_) {
+ result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_);
+ }
+ return result;
+}
+
+TNode SerializeParamsForCopy(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ result["source_path"] = AddPathPrefix(sourcePath, pathPrefix);
+ result["destination_path"] = AddPathPrefix(destinationPath, pathPrefix);
+ result["recursive"] = options.Recursive_;
+ result["force"] = options.Force_;
+ result["preserve_account"] = options.PreserveAccount_;
+ if (options.PreserveExpirationTime_) {
+ result["preserve_expiration_time"] = *options.PreserveExpirationTime_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForMove(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ result["source_path"] = AddPathPrefix(sourcePath, pathPrefix);
+ result["destination_path"] = AddPathPrefix(destinationPath, pathPrefix);
+ result["recursive"] = options.Recursive_;
+ result["force"] = options.Force_;
+ result["preserve_account"] = options.PreserveAccount_;
+ if (options.PreserveExpirationTime_) {
+ result["preserve_expiration_time"] = *options.PreserveExpirationTime_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForLink(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ result["target_path"] = AddPathPrefix(targetPath, pathPrefix);
+ result["link_path"] = AddPathPrefix(linkPath, pathPrefix);
+ result["recursive"] = options.Recursive_;
+ result["ignore_existing"] = options.IgnoreExisting_;
+ result["force"] = options.Force_;
+ if (options.Attributes_) {
+ result["attributes"] = *options.Attributes_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForLock(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ result["mode"] = ToString(mode);
+ result["waitable"] = options.Waitable_;
+ if (options.AttributeKey_) {
+ result["attribute_key"] = *options.AttributeKey_;
+ }
+ if (options.ChildKey_) {
+ result["child_key"] = *options.ChildKey_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForUnlock(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TUnlockOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ Y_UNUSED(options);
+ return result;
+}
+
+TNode SerializeParamsForConcatenate(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TVector<TRichYPath>& sourcePaths,
+ const TRichYPath& destinationPath,
+ const TConcatenateOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ {
+ auto actualDestination = destinationPath;
+ actualDestination.Path(AddPathPrefix(actualDestination.Path_, pathPrefix));
+ if (options.Append_) {
+ actualDestination.Append(*options.Append_);
+ }
+ result["destination_path"] = PathToNode(actualDestination);
+ }
+ auto& sourcePathsNode = result["source_paths"];
+ for (const auto& path : sourcePaths) {
+ auto actualSource = path;
+ actualSource.Path(AddPathPrefix(actualSource.Path_, pathPrefix));
+ sourcePathsNode.Add(PathToNode(actualSource));
+ }
+ return result;
+}
+
+TNode SerializeParamsForPingTx(
+ const TTransactionId& transactionId)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ return result;
+}
+
+TNode SerializeParamsForListOperations(
+ const TListOperationsOptions& options)
+{
+ TNode result = TNode::CreateMap();
+ if (options.FromTime_) {
+ result["from_time"] = ToString(*options.FromTime_);
+ }
+ if (options.ToTime_) {
+ result["to_time"] = ToString(*options.ToTime_);
+ }
+ if (options.CursorTime_) {
+ result["cursor_time"] = ToString(*options.CursorTime_);
+ }
+ if (options.CursorDirection_) {
+ result["cursor_direction"] = ToString(*options.CursorDirection_);
+ }
+ if (options.Pool_) {
+ result["pool"] = *options.Pool_;
+ }
+ if (options.Filter_) {
+ result["filter"] = *options.Filter_;
+ }
+ if (options.User_) {
+ result["user"] = *options.User_;
+ }
+ if (options.State_) {
+ result["state"] = *options.State_;
+ }
+ if (options.Type_) {
+ result["type"] = ToString(*options.Type_);
+ }
+ if (options.WithFailedJobs_) {
+ result["with_failed_jobs"] = *options.WithFailedJobs_;
+ }
+ if (options.IncludeCounters_) {
+ result["include_counters"] = *options.IncludeCounters_;
+ }
+ if (options.IncludeArchive_) {
+ result["include_archive"] = *options.IncludeArchive_;
+ }
+ if (options.Limit_) {
+ result["limit"] = *options.Limit_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForGetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ if (options.AttributeFilter_) {
+ result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_);
+ }
+ return result;
+}
+
+TNode SerializeParamsForAbortOperation(const TOperationId& operationId)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ return result;
+}
+
+TNode SerializeParamsForCompleteOperation(const TOperationId& operationId)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ return result;
+}
+
+TNode SerializeParamsForSuspendOperation(
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ if (options.AbortRunningJobs_) {
+ result["abort_running_jobs"] = *options.AbortRunningJobs_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForResumeOperation(
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ Y_UNUSED(options);
+ return result;
+}
+
+TNode SerializeParamsForUpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ TNode& parameters = result["parameters"];
+ if (options.Pool_) {
+ parameters["pool"] = *options.Pool_;
+ }
+ if (options.Weight_) {
+ parameters["weight"] = *options.Weight_;
+ }
+ if (!options.Owners_.empty()) {
+ parameters["owners"] = TNode::CreateList();
+ for (const auto& owner : options.Owners_) {
+ parameters["owners"].Add(owner);
+ }
+ }
+ if (options.SchedulingOptionsPerPoolTree_) {
+ parameters["scheduling_options_per_pool_tree"] = TNode::CreateMap();
+ for (const auto& entry : options.SchedulingOptionsPerPoolTree_->Options_) {
+ auto schedulingOptionsNode = TNode::CreateMap();
+ const auto& schedulingOptions = entry.second;
+ if (schedulingOptions.Pool_) {
+ schedulingOptionsNode["pool"] = *schedulingOptions.Pool_;
+ }
+ if (schedulingOptions.Weight_) {
+ schedulingOptionsNode["weight"] = *schedulingOptions.Weight_;
+ }
+ if (schedulingOptions.ResourceLimits_) {
+ auto resourceLimitsNode = TNode::CreateMap();
+ const auto& resourceLimits = *schedulingOptions.ResourceLimits_;
+ if (resourceLimits.UserSlots_) {
+ resourceLimitsNode["user_slots"] = *resourceLimits.UserSlots_;
+ }
+ if (resourceLimits.Memory_) {
+ resourceLimitsNode["memory"] = *resourceLimits.Memory_;
+ }
+ if (resourceLimits.Cpu_) {
+ resourceLimitsNode["cpu"] = *resourceLimits.Cpu_;
+ }
+ if (resourceLimits.Network_) {
+ resourceLimitsNode["network"] = *resourceLimits.Network_;
+ }
+ schedulingOptionsNode["resource_limits"] = std::move(resourceLimitsNode);
+ }
+ parameters["scheduling_options_per_pool_tree"][entry.first] = std::move(schedulingOptionsNode);
+ }
+ }
+ return result;
+}
+
+TNode SerializeParamsForGetJob(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobOptions& /* options */)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ result["job_id"] = GetGuidAsString(jobId);
+ return result;
+}
+
+TNode SerializeParamsForListJobs(
+ const TOperationId& operationId,
+ const TListJobsOptions& options)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+
+ if (options.Type_) {
+ result["type"] = ToString(*options.Type_);
+ }
+ if (options.State_) {
+ result["state"] = ToString(*options.State_);
+ }
+ if (options.Address_) {
+ result["address"] = *options.Address_;
+ }
+ if (options.WithStderr_) {
+ result["with_stderr"] = *options.WithStderr_;
+ }
+ if (options.WithSpec_) {
+ result["with_spec"] = *options.WithSpec_;
+ }
+ if (options.WithFailContext_) {
+ result["with_fail_context"] = *options.WithFailContext_;
+ }
+
+ if (options.SortField_) {
+ result["sort_field"] = ToString(*options.SortField_);
+ }
+ if (options.SortOrder_) {
+ result["sort_order"] = ToString(*options.SortOrder_);
+ }
+
+ if (options.Offset_) {
+ result["offset"] = *options.Offset_;
+ }
+ if (options.Limit_) {
+ result["limit"] = *options.Limit_;
+ }
+
+ if (options.IncludeCypress_) {
+ result["include_cypress"] = *options.IncludeCypress_;
+ }
+ if (options.IncludeArchive_) {
+ result["include_archive"] = *options.IncludeArchive_;
+ }
+ if (options.IncludeControllerAgent_) {
+ result["include_controller_agent"] = *options.IncludeControllerAgent_;
+ }
+ return result;
+}
+
+TNode SerializeParametersForInsertRows(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TInsertRowsOptions& options)
+{
+ TNode result;
+ SetPathParam(&result, pathPrefix, path);
+ if (options.Aggregate_) {
+ result["aggregate"] = *options.Aggregate_;
+ }
+ if (options.Update_) {
+ result["update"] = *options.Update_;
+ }
+ if (options.Atomicity_) {
+ result["atomicity"] = ToString(*options.Atomicity_);
+ }
+ if (options.Durability_) {
+ result["durability"] = ToString(*options.Durability_);
+ }
+ if (options.RequireSyncReplica_) {
+ result["require_sync_replica"] = *options.RequireSyncReplica_;
+ }
+ return result;
+}
+
+TNode SerializeParametersForDeleteRows(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TDeleteRowsOptions& options)
+{
+ TNode result;
+ SetPathParam(&result, pathPrefix, path);
+ if (options.Atomicity_) {
+ result["atomicity"] = ToString(*options.Atomicity_);
+ }
+ if (options.Durability_) {
+ result["durability"] = ToString(*options.Durability_);
+ }
+ if (options.RequireSyncReplica_) {
+ result["require_sync_replica"] = *options.RequireSyncReplica_;
+ }
+ return result;
+}
+
+TNode SerializeParametersForTrimRows(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TTrimRowsOptions& /* options*/)
+{
+ TNode result;
+ SetPathParam(&result, pathPrefix, path);
+ return result;
+}
+
+TNode SerializeParamsForParseYPath(const TRichYPath& path)
+{
+ TNode result;
+ result["path"] = PathToNode(path);
+ return result;
+}
+
+TNode SerializeParamsForEnableTableReplica(
+ const TReplicaId& replicaId)
+{
+ TNode result;
+ result["replica_id"] = GetGuidAsString(replicaId);
+ return result;
+}
+
+TNode SerializeParamsForDisableTableReplica(
+ const TReplicaId& replicaId)
+{
+ TNode result;
+ result["replica_id"] = GetGuidAsString(replicaId);
+ return result;
+}
+
+TNode SerializeParamsForAlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options)
+{
+ TNode result;
+ result["replica_id"] = GetGuidAsString(replicaId);
+ if (options.Enabled_) {
+ result["enabled"] = *options.Enabled_;
+ }
+ if (options.Mode_) {
+ result["mode"] = ToString(*options.Mode_);
+ }
+ return result;
+}
+
+TNode SerializeParamsForFreezeTable(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TFreezeTableOptions& options)
+{
+ TNode result;
+ SetPathParam(&result, pathPrefix, path);
+ SetFirstLastTabletIndex(&result, options);
+ return result;
+}
+
+TNode SerializeParamsForUnfreezeTable(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TUnfreezeTableOptions& options)
+{
+ TNode result;
+ SetPathParam(&result, pathPrefix, path);
+ SetFirstLastTabletIndex(&result, options);
+ return result;
+}
+
+TNode SerializeParamsForAlterTable(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TAlterTableOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, path);
+ if (options.Dynamic_) {
+ result["dynamic"] = *options.Dynamic_;
+ }
+ if (options.Schema_) {
+ TNode schema;
+ {
+ TNodeBuilder builder(&schema);
+ Serialize(*options.Schema_, &builder);
+ }
+ result["schema"] = schema;
+ }
+ if (options.UpstreamReplicaId_) {
+ result["upstream_replica_id"] = GetGuidAsString(*options.UpstreamReplicaId_);
+ }
+ return result;
+}
+
+TNode SerializeParamsForGetTableColumnarStatistics(
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ for (const auto& path : paths) {
+ result["paths"].Add(PathToNode(path));
+ }
+ if (options.FetcherMode_) {
+ result["fetcher_mode"] = ToString(*options.FetcherMode_);
+ }
+ return result;
+}
+
+TNode SerializeParamsForGetTablePartitions(
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ for (const auto& path : paths) {
+ result["paths"].Add(PathToNode(path));
+ }
+ result["partition_mode"] = ToString(options.PartitionMode_);
+ result["data_weight_per_partition"] = options.DataWeightPerPartition_;
+ if (options.MaxPartitionCount_) {
+ result["max_partition_count"] = *options.MaxPartitionCount_;
+ }
+ result["adjust_data_weight_per_partition"] = options.AdjustDataWeightPerPartition_;
+ return result;
+}
+
+TNode SerializeParamsForGetFileFromCache(
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions&)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ result["md5"] = md5Signature;
+ result["cache_path"] = cachePath;
+ return result;
+}
+
+TNode SerializeParamsForPutFileToCache(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ SetPathParam(&result, pathPrefix, filePath);
+ result["md5"] = md5Signature;
+ result["cache_path"] = cachePath;
+ if (options.PreserveExpirationTimeout_) {
+ result["preserve_expiration_timeout"] = *options.PreserveExpirationTimeout_;
+ }
+ return result;
+}
+
+TNode SerializeParamsForSkyShareTable(
+ const TString& serverName,
+ const TString& pathPrefix,
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options)
+{
+ TNode result;
+
+ if (tablePaths.size() == 1) {
+ SetPathParam(&result, pathPrefix, tablePaths[0]);
+ } else {
+ auto pathList = TNode::CreateList();
+ for (const auto& p : tablePaths) {
+ pathList.Add(AddPathPrefix(p, pathPrefix));
+ }
+ result["paths"] = pathList;
+ }
+ result["cluster"] = serverName;
+
+ if (options.KeyColumns_) {
+ auto keyColumnsList = TNode::CreateList();
+ for (const auto& s : options.KeyColumns_->Parts_) {
+ if (s.empty()) {
+ continue;
+ }
+ keyColumnsList.Add(s);
+ }
+ result["key_columns"] = keyColumnsList;
+ }
+
+ if (options.EnableFastbone_) {
+ result["enable_fastbone"] = *options.EnableFastbone_;
+ }
+
+ return result;
+}
+
+TNode SerializeParamsForCheckPermission(
+ const TString& user,
+ EPermission permission,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TCheckPermissionOptions& options)
+{
+ TNode result;
+ SetPathParam(&result, pathPrefix, path);
+ result["path"] = path;
+ result["user"] = user;
+ result["permission"] = ToString(permission);
+ if (!options.Columns_.empty()) {
+ result["columns"] = TNode::CreateList();
+ result["columns"].AsList().assign(options.Columns_.begin(), options.Columns_.end());
+ }
+ return result;
+}
+
+TNode SerializeParamsForGetTabletInfos(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TVector<int>& tabletIndexes,
+ const TGetTabletInfosOptions& options)
+{
+ Y_UNUSED(options);
+ TNode result;
+ SetPathParam(&result, pathPrefix, path);
+ result["tablet_indexes"] = TNode::CreateList();
+ result["tablet_indexes"].AsList().assign(tabletIndexes.begin(), tabletIndexes.end());
+ return result;
+}
+
+TNode SerializeParamsForAbortTransaction(const TTransactionId& transactionId)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ return result;
+}
+
+TNode SerializeParamsForCommitTransaction(const TTransactionId& transactionId)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ return result;
+}
+
+TNode SerializeParamsForStartTransaction(
+ const TTransactionId& parentTransactionId,
+ TDuration txTimeout,
+ const TStartTransactionOptions& options)
+{
+ TNode result;
+
+ SetTransactionIdParam(&result, parentTransactionId);
+ result["timeout"] = static_cast<i64>((options.Timeout_.GetOrElse(txTimeout).MilliSeconds()));
+ if (options.Deadline_) {
+ result["deadline"] = ToString(options.Deadline_);
+ }
+
+ if (options.PingAncestors_) {
+ result["ping_ancestor_transactions"] = true;
+ }
+
+ if (options.Attributes_ && !options.Attributes_->IsMap()) {
+ ythrow TApiUsageError() << "Attributes must be a Map node";
+ }
+
+ auto attributes = options.Attributes_.GetOrElse(TNode::CreateMap());
+ if (options.Title_) {
+ attributes["title"] = *options.Title_;
+ } else if (!attributes.HasKey("title")) {
+ attributes["title"] = GetDefaultTransactionTitle();
+ }
+ result["attributes"] = attributes;
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail::NRawClient
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
new file mode 100644
index 0000000000..a60e3ea369
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
@@ -0,0 +1,231 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/interface/client_method_options.h>
+
+namespace NYT::NDetail::NRawClient {
+
+////////////////////////////////////////////////////////////////////
+
+TNode SerializeParamsForCreate(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ ENodeType type,
+ const TCreateOptions& options);
+
+TNode SerializeParamsForRemove(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TRemoveOptions& options);
+
+TNode SerializeParamsForExists(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TExistsOptions& options);
+
+TNode SerializeParamsForGet(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TGetOptions& options);
+
+TNode SerializeParamsForSet(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TSetOptions& options);
+
+TNode SerializeParamsForMultisetAttributes(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TMultisetAttributesOptions& options);
+
+TNode SerializeParamsForList(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TListOptions& options);
+
+TNode SerializeParamsForCopy(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options);
+
+TNode SerializeParamsForMove(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options);
+
+TNode SerializeParamsForLink(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options);
+
+TNode SerializeParamsForLock(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options);
+
+TNode SerializeParamsForUnlock(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TUnlockOptions& options);
+
+TNode SerializeParamsForConcatenate(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TVector<TRichYPath>& sourcePaths,
+ const TRichYPath& destinationPath,
+ const TConcatenateOptions& options);
+
+TNode SerializeParamsForPingTx(
+ const TTransactionId& transactionId);
+
+TNode SerializeParamsForGetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options);
+
+TNode SerializeParamsForAbortOperation(
+ const TOperationId& operationId);
+
+TNode SerializeParamsForCompleteOperation(
+ const TOperationId& operationId);
+
+TNode SerializeParamsForSuspendOperation(
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options);
+
+TNode SerializeParamsForResumeOperation(
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options);
+
+TNode SerializeParamsForListOperations(
+ const TListOperationsOptions& options);
+
+TNode SerializeParamsForUpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options);
+
+TNode SerializeParamsForGetJob(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobOptions& options);
+
+TNode SerializeParamsForListJobs(
+ const TOperationId& operationId,
+ const TListJobsOptions& options);
+
+TNode SerializeParametersForInsertRows(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TInsertRowsOptions& options);
+
+TNode SerializeParametersForDeleteRows(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TDeleteRowsOptions& options);
+
+TNode SerializeParametersForTrimRows(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TTrimRowsOptions& options);
+
+TNode SerializeParamsForParseYPath(
+ const TRichYPath& path);
+
+TNode SerializeParamsForEnableTableReplica(
+ const TReplicaId& replicaId);
+
+TNode SerializeParamsForDisableTableReplica(
+ const TReplicaId& replicaId);
+
+TNode SerializeParamsForAlterTableReplica(
+ const TReplicaId& replicaId,
+ const TAlterTableReplicaOptions& options);
+
+TNode SerializeParamsForFreezeTable(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TFreezeTableOptions& options);
+
+TNode SerializeParamsForUnfreezeTable(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TUnfreezeTableOptions& options);
+
+TNode SerializeParamsForAlterTable(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TAlterTableOptions& options);
+
+TNode SerializeParamsForGetTableColumnarStatistics(
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options);
+
+TNode SerializeParamsForGetTablePartitions(
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options);
+
+TNode SerializeParamsForGetFileFromCache(
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions&);
+
+TNode SerializeParamsForPutFileToCache(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options);
+
+TNode SerializeParamsForSkyShareTable(
+ const TString& serverName,
+ const TString& pathPrefix,
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options);
+
+TNode SerializeParamsForCheckPermission(
+ const TString& user,
+ EPermission permission,
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TCheckPermissionOptions& options);
+
+TNode SerializeParamsForGetTabletInfos(
+ const TString& pathPrefix,
+ const TYPath& path,
+ const TVector<int>& tabletIndexes,
+ const TGetTabletInfosOptions& options);
+
+TNode SerializeParamsForAbortTransaction(
+ const TTransactionId& transactionId);
+
+TNode SerializeParamsForCommitTransaction(
+ const TTransactionId& transactionId);
+
+TNode SerializeParamsForStartTransaction(
+ const TTransactionId& parentTransactionId,
+ TDuration txTimeout,
+ const TStartTransactionOptions& options);
+
+////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail::NRawClient
diff --git a/yt/cpp/mapreduce/raw_client/ya.make b/yt/cpp/mapreduce/raw_client/ya.make
new file mode 100644
index 0000000000..0d03aae80c
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/ya.make
@@ -0,0 +1,19 @@
+LIBRARY()
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+SRCS(
+ raw_batch_request.cpp
+ raw_requests.cpp
+ rpc_parameters_serialization.cpp
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/common
+ yt/cpp/mapreduce/http
+ yt/cpp/mapreduce/interface
+ yt/cpp/mapreduce/interface/logging
+ library/cpp/yson/node
+)
+
+END()