diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/raw_client | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/raw_client')
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | 687 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.h | 190 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 1027 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 397 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp | 873 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h | 231 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/ya.make | 19 |
7 files changed, 3424 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp new file mode 100644 index 0000000000..be81f5a21a --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp @@ -0,0 +1,687 @@ +#include "raw_batch_request.h" + +#include "raw_requests.h" +#include "rpc_parameters_serialization.h" + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/cpp/mapreduce/interface/client.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/serialize.h> + +#include <library/cpp/yson/node/node.h> + +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <util/generic/guid.h> +#include <util/string/builder.h> + +#include <exception> + +namespace NYT::NDetail::NRawClient { + +using NThreading::TFuture; +using NThreading::TPromise; +using NThreading::NewPromise; + +//////////////////////////////////////////////////////////////////// + +static TString RequestInfo(const TNode& request) +{ + return ::TStringBuilder() + << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]); +} + +static void EnsureNothing(const TMaybe<TNode>& node) +{ + Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType()); +} + +static void EnsureSomething(const TMaybe<TNode>& node) +{ + Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response."); +} + +static void EnsureType(const TNode& node, TNode::EType type) +{ + Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. " + << "Expected: " << type << ", actual: " << node.GetType()); +} + +static void EnsureType(const TMaybe<TNode>& node, TNode::EType type) +{ + Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response."); + EnsureType(*node, type); +} + +//////////////////////////////////////////////////////////////////// + +template <typename TReturnType> +class TResponseParserBase + : public TRawBatchRequest::IResponseItemParser +{ +public: + using TFutureResult = TFuture<TReturnType>; + +public: + TResponseParserBase() + : Result(NewPromise<TReturnType>()) + { } + + void SetException(std::exception_ptr e) override + { + Result.SetException(std::move(e)); + } + + TFuture<TReturnType> GetFuture() + { + return Result.GetFuture(); + } + +protected: + TPromise<TReturnType> Result; +}; + +//////////////////////////////////////////////////////////////////// + + +class TGetResponseParser + : public TResponseParserBase<TNode> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureSomething(node); + Result.SetValue(std::move(*node)); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TVoidResponseParser + : public TResponseParserBase<void> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureNothing(node); + Result.SetValue(); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TListResponseParser + : public TResponseParserBase<TNode::TListType> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::List); + Result.SetValue(std::move(node->AsList())); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TExistsResponseParser + : public TResponseParserBase<bool> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Bool); + Result.SetValue(std::move(node->AsBool())); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TGuidResponseParser + : public TResponseParserBase<TGUID> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::String); + Result.SetValue(GetGuid(node->AsString())); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TCanonizeYPathResponseParser + : public TResponseParserBase<TRichYPath> +{ +public: + explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original) + : OriginalNode_(PathToNode(original)) + , PathPrefix_(std::move(pathPrefix)) + { } + + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::String); + + for (const auto& item : OriginalNode_.GetAttributes().AsMap()) { + node->Attributes()[item.first] = item.second; + } + TRichYPath result; + Deserialize(result, *node); + result.Path_ = AddPathPrefix(result.Path_, PathPrefix_); + Result.SetValue(result); + } + +private: + TNode OriginalNode_; + TString PathPrefix_; +}; + +//////////////////////////////////////////////////////////////////// + +class TGetOperationResponseParser + : public TResponseParserBase<TOperationAttributes> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Map); + Result.SetValue(ParseOperationAttributes(*node)); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TTableColumnarStatisticsParser + : public TResponseParserBase<TVector<TTableColumnarStatistics>> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Map); + TVector<TTableColumnarStatistics> statistics; + Deserialize(statistics, *node); + Result.SetValue(std::move(statistics)); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TTablePartitionsParser + : public TResponseParserBase<TMultiTablePartitions> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Map); + TMultiTablePartitions partitions; + Deserialize(partitions, *node); + Result.SetValue(std::move(partitions)); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TGetFileFromCacheParser + : public TResponseParserBase<TMaybe<TYPath>> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::String); + if (node->AsString().empty()) { + Result.SetValue(Nothing()); + } else { + Result.SetValue(node->AsString()); + } + } +}; + +//////////////////////////////////////////////////////////////////// + +class TYPathParser + : public TResponseParserBase<TYPath> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::String); + Result.SetValue(node->AsString()); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TCheckPermissionParser + : public TResponseParserBase<TCheckPermissionResponse> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Map); + Result.SetValue(ParseCheckPermissionResponse(*node)); + } +}; + +//////////////////////////////////////////////////////////////////// + +TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser) + : Parameters(std::move(parameters)) + , ResponseParser(std::move(responseParser)) + , NextTry() +{ } + +TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry) + : Parameters(batchItem.Parameters) + , ResponseParser(batchItem.ResponseParser) + , NextTry(nextTry) +{ } + +//////////////////////////////////////////////////////////////////// + +TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config) + : Config_(config) +{ } + +TRawBatchRequest::~TRawBatchRequest() = default; + +bool TRawBatchRequest::IsExecuted() const +{ + return Executed_; +} + +void TRawBatchRequest::MarkExecuted() +{ + Executed_ = true; +} + +template <typename TResponseParser> +typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest( + const TString& command, + TNode parameters, + TMaybe<TNode> input) +{ + return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>()); +} + +template <typename TResponseParser> +typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest( + const TString& command, + TNode parameters, + TMaybe<TNode> input, + ::TIntrusivePtr<TResponseParser> parser) +{ + Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); + TNode request; + request["command"] = command; + request["parameters"] = std::move(parameters); + if (input) { + request["input"] = std::move(*input); + } + BatchItemList_.emplace_back(std::move(request), parser); + return parser->GetFuture(); +} + +void TRawBatchRequest::AddRequest(TBatchItem batchItem) +{ + Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); + BatchItemList_.push_back(batchItem); +} + +TFuture<TNodeId> TRawBatchRequest::Create( + const TTransactionId& transaction, + const TYPath& path, + ENodeType type, + const TCreateOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "create", + SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::Remove( + const TTransactionId& transaction, + const TYPath& path, + const TRemoveOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "remove", + SerializeParamsForRemove(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<bool> TRawBatchRequest::Exists( + const TTransactionId& transaction, + const TYPath& path, + const TExistsOptions& options) +{ + return AddRequest<TExistsResponseParser>( + "exists", + SerializeParamsForExists(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<TNode> TRawBatchRequest::Get( + const TTransactionId& transaction, + const TYPath& path, + const TGetOptions& options) +{ + return AddRequest<TGetResponseParser>( + "get", + SerializeParamsForGet(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::Set( + const TTransactionId& transaction, + const TYPath& path, + const TNode& node, + const TSetOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "set", + SerializeParamsForSet(transaction, Config_->Prefix, path, options), + node); +} + +TFuture<TNode::TListType> TRawBatchRequest::List( + const TTransactionId& transaction, + const TYPath& path, + const TListOptions& options) +{ + return AddRequest<TListResponseParser>( + "list", + SerializeParamsForList(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<TNodeId> TRawBatchRequest::Copy( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "copy", + SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options), + Nothing()); +} + +TFuture<TNodeId> TRawBatchRequest::Move( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "move", + SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options), + Nothing()); +} + +TFuture<TNodeId> TRawBatchRequest::Link( + const TTransactionId& transaction, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "link", + SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options), + Nothing()); +} + +TFuture<TLockId> TRawBatchRequest::Lock( + const TTransactionId& transaction, + const TYPath& path, + ELockMode mode, + const TLockOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "lock", + SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::Unlock( + const TTransactionId& transaction, + const TYPath& path, + const TUnlockOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "unlock", + SerializeParamsForUnlock(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache( + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions& options) +{ + return AddRequest<TGetFileFromCacheParser>( + "get_file_from_cache", + SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options), + Nothing()); +} + +TFuture<TYPath> TRawBatchRequest::PutFileToCache( + const TTransactionId& transactionId, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options) +{ + return AddRequest<TYPathParser>( + "put_file_to_cache", + SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options), + Nothing()); +} + +TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission( + const TString& user, + EPermission permission, + const TYPath& path, + const TCheckPermissionOptions& options) +{ + return AddRequest<TCheckPermissionParser>( + "check_permission", + SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<TOperationAttributes> TRawBatchRequest::GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options) +{ + return AddRequest<TGetOperationResponseParser>( + "get_operation", + SerializeParamsForGetOperation(operationId, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId) +{ + return AddRequest<TVoidResponseParser>( + "abort_op", + SerializeParamsForAbortOperation(operationId), + Nothing()); +} + +TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId) +{ + return AddRequest<TVoidResponseParser>( + "complete_op", + SerializeParamsForCompleteOperation(operationId), + Nothing()); +} +TFuture<void> TRawBatchRequest::SuspendOperation( + const TOperationId& operationId, + const TSuspendOperationOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "suspend_operation", + SerializeParamsForSuspendOperation(operationId, options), + Nothing()); +} +TFuture<void> TRawBatchRequest::ResumeOperation( + const TOperationId& operationId, + const TResumeOperationOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "resume_operation", + SerializeParamsForResumeOperation(operationId, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "update_op_parameters", + SerializeParamsForUpdateOperationParameters(operationId, options), + Nothing()); +} + +TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path) +{ + if (path.Path_.find_first_of("<>{}[]") != TString::npos) { + return AddRequest<TCanonizeYPathResponseParser>( + "parse_ypath", + SerializeParamsForParseYPath(path), + Nothing(), + MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, path)); + } else { + TRichYPath result = path; + result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix); + return NThreading::MakeFuture(result); + } +} + +TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options) +{ + return AddRequest<TTableColumnarStatisticsParser>( + "get_table_columnar_statistics", + SerializeParamsForGetTableColumnarStatistics(transaction, paths, options), + Nothing()); +} + +TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options) +{ + return AddRequest<TTablePartitionsParser>( + "partition_tables", + SerializeParamsForGetTablePartitions(transaction, paths, options), + Nothing()); +} + +void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const +{ + Y_VERIFY(result); + Y_VERIFY(nextTry); + + *nextTry = TInstant(); + maxSize = Min(maxSize, BatchItemList_.size()); + *result = TNode::CreateList(); + for (size_t i = 0; i < maxSize; ++i) { + YT_LOG_DEBUG("ExecuteBatch preparing: %v", + RequestInfo(BatchItemList_[i].Parameters)); + + result->Add(BatchItemList_[i].Parameters); + if (BatchItemList_[i].NextTry > *nextTry) { + *nextTry = BatchItemList_[i].NextTry; + } + } +} + +void TRawBatchRequest::ParseResponse( + const TResponseInfo& requestResult, + const IRequestRetryPolicyPtr& retryPolicy, + TRawBatchRequest* retryBatch, + TInstant now) +{ + TNode node = NodeFromYsonString(requestResult.Response); + return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now); +} + +void TRawBatchRequest::ParseResponse( + TNode node, + const TString& requestId, + const IRequestRetryPolicyPtr& retryPolicy, + TRawBatchRequest* retryBatch, + TInstant now) +{ + Y_VERIFY(retryBatch); + + EnsureType(node, TNode::List); + auto& responseList = node.AsList(); + const auto size = responseList.size(); + Y_ENSURE(size <= BatchItemList_.size(), + "Size of server response exceeds size of batch request;" + " size of batch: " << BatchItemList_.size() << + " size of server response: " << size << '.'); + + for (size_t i = 0; i != size; ++i) { + try { + EnsureType(responseList[i], TNode::Map); + auto& responseNode = responseList[i].AsMap(); + const auto outputIt = responseNode.find("output"); + if (outputIt != responseNode.end()) { + BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second)); + } else { + const auto errorIt = responseNode.find("error"); + if (errorIt == responseNode.end()) { + BatchItemList_[i].ResponseParser->SetResponse(Nothing()); + } else { + TErrorResponse error(400, requestId); + error.SetError(TYtError(errorIt->second)); + if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) { + YT_LOG_INFO( + "Batch subrequest (%s) failed, will retry, error: %s", + RequestInfo(BatchItemList_[i].Parameters), + error.what()); + retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); + } else { + YT_LOG_ERROR( + "Batch subrequest (%s) failed, error: %s", + RequestInfo(BatchItemList_[i].Parameters), + error.what()); + BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error)); + } + } + } + } catch (const std::exception& e) { + // We don't expect other exceptions, so we don't catch (...) + BatchItemList_[i].ResponseParser->SetException(std::current_exception()); + } + } + BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size); +} + +void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const +{ + for (const auto& batchItem : BatchItemList_) { + batchItem.ResponseParser->SetException(e); + } +} + +size_t TRawBatchRequest::BatchSize() const +{ + return BatchItemList_.size(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail::NRawClient diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.h b/yt/cpp/mapreduce/raw_client/raw_batch_request.h new file mode 100644 index 0000000000..7ed5bebf5e --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.h @@ -0,0 +1,190 @@ +#pragma once + +#include <yt/cpp/mapreduce/common/fwd.h> + +#include <yt/cpp/mapreduce/interface/batch_request.h> +#include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/node.h> +#include <yt/cpp/mapreduce/interface/retry_policy.h> + +#include <yt/cpp/mapreduce/http/requests.h> + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/ptr.h> +#include <util/generic/deque.h> + +#include <exception> + +namespace NYT::NDetail { + struct TResponseInfo; +} + +namespace NYT::NDetail::NRawClient { + +//////////////////////////////////////////////////////////////////////////////// + +class TRawBatchRequest + : public TThrRefBase +{ +public: + struct IResponseItemParser + : public TThrRefBase + { + ~IResponseItemParser() = default; + + virtual void SetResponse(TMaybe<TNode> node) = 0; + virtual void SetException(std::exception_ptr e) = 0; + }; + +public: + TRawBatchRequest(const TConfigPtr& config); + ~TRawBatchRequest(); + + bool IsExecuted() const; + void MarkExecuted(); + + void FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const; + + size_t BatchSize() const; + + void ParseResponse( + const TResponseInfo& requestResult, + const IRequestRetryPolicyPtr& retryPolicy, + TRawBatchRequest* retryBatch, + TInstant now = TInstant::Now()); + void ParseResponse( + TNode response, + const TString& requestId, + const IRequestRetryPolicyPtr& retryPolicy, + TRawBatchRequest* retryBatch, + TInstant now = TInstant::Now()); + void SetErrorResult(std::exception_ptr e) const; + + ::NThreading::TFuture<TNodeId> Create( + const TTransactionId& transaction, + const TYPath& path, + ENodeType type, + const TCreateOptions& options); + ::NThreading::TFuture<void> Remove( + const TTransactionId& transaction, + const TYPath& path, + const TRemoveOptions& options); + ::NThreading::TFuture<bool> Exists( + const TTransactionId& transaction, + const TYPath& path, + const TExistsOptions& options); + ::NThreading::TFuture<TNode> Get( + const TTransactionId& transaction, + const TYPath& path, + const TGetOptions& options); + ::NThreading::TFuture<void> Set( + const TTransactionId& transaction, + const TYPath& path, + const TNode& value, + const TSetOptions& options); + ::NThreading::TFuture<TNode::TListType> List( + const TTransactionId& transaction, + const TYPath& path, + const TListOptions& options); + ::NThreading::TFuture<TNodeId> Copy( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options); + ::NThreading::TFuture<TNodeId> Move( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options); + ::NThreading::TFuture<TNodeId> Link( + const TTransactionId& transaction, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options); + ::NThreading::TFuture<TLockId> Lock( + const TTransactionId& transaction, + const TYPath& path, + ELockMode mode, + const TLockOptions& options); + ::NThreading::TFuture<void> Unlock( + const TTransactionId& transaction, + const TYPath& path, + const TUnlockOptions& options); + ::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache( + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions& options); + ::NThreading::TFuture<TYPath> PutFileToCache( + const TTransactionId& transactionId, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options); + ::NThreading::TFuture<TCheckPermissionResponse> CheckPermission( + const TString& user, + EPermission permission, + const TYPath& path, + const TCheckPermissionOptions& options); + ::NThreading::TFuture<TOperationAttributes> GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options); + ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId); + ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId); + ::NThreading::TFuture<void> SuspendOperation( + const TOperationId& operationId, + const TSuspendOperationOptions& options); + ::NThreading::TFuture<void> ResumeOperation( + const TOperationId& operationId, + const TResumeOperationOptions& options); + ::NThreading::TFuture<void> UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options); + ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path); + ::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options); + ::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options); + +private: + struct TBatchItem { + TNode Parameters; + ::TIntrusivePtr<IResponseItemParser> ResponseParser; + TInstant NextTry; + + TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser); + + TBatchItem(const TBatchItem& batchItem, TInstant nextTry); + }; + +private: + template <typename TResponseParser> + typename TResponseParser::TFutureResult AddRequest( + const TString& command, + TNode parameters, + TMaybe<TNode> input); + + template <typename TResponseParser> + typename TResponseParser::TFutureResult AddRequest( + const TString& command, + TNode parameters, + TMaybe<TNode> input, + ::TIntrusivePtr<TResponseParser> parser); + + void AddRequest(TBatchItem batchItem); + +private: + TConfigPtr Config_; + + TDeque<TBatchItem> BatchItemList_; + bool Executed_ = false; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail::NRawClient diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp new file mode 100644 index 0000000000..26120759fd --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -0,0 +1,1027 @@ +#include "raw_requests.h" + +#include "raw_batch_request.h" +#include "rpc_parameters_serialization.h" + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/http/fwd.h> +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/helpers.h> +#include <yt/cpp/mapreduce/http/http_client.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/client.h> +#include <yt/cpp/mapreduce/interface/operation.h> +#include <yt/cpp/mapreduce/interface/serialize.h> +#include <yt/cpp/mapreduce/interface/tvm.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <util/generic/guid.h> +#include <util/generic/scope.h> + +namespace NYT::NDetail::NRawClient { + +/////////////////////////////////////////////////////////////////////////////// + +void ExecuteBatch( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + TRawBatchRequest& batchRequest, + const TExecuteBatchOptions& options) +{ + if (batchRequest.IsExecuted()) { + ythrow yexception() << "Cannot execute batch request since it is already executed"; + } + Y_DEFER { + batchRequest.MarkExecuted(); + }; + + const auto concurrency = options.Concurrency_.GetOrElse(50); + const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); + + if (!retryPolicy) { + retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); + } + + while (batchRequest.BatchSize()) { + TRawBatchRequest retryBatch(context.Config); + + while (batchRequest.BatchSize()) { + auto parameters = TNode::CreateMap(); + TInstant nextTry; + batchRequest.FillParameterList(batchPartMaxSize, ¶meters["requests"], &nextTry); + if (nextTry) { + SleepUntil(nextTry); + } + parameters["concurrency"] = concurrency; + auto body = NodeToYsonString(parameters); + THttpHeader header("POST", "execute_batch"); + header.AddMutationId(); + NDetail::TResponseInfo result; + try { + result = RetryRequestWithPolicy(retryPolicy, context, header, body); + } catch (const std::exception& e) { + batchRequest.SetErrorResult(std::current_exception()); + retryBatch.SetErrorResult(std::current_exception()); + throw; + } + batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch); + } + + batchRequest = std::move(retryBatch); + } +} + +TNode Get( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options) +{ + THttpHeader header("GET", "get"); + header.MergeParameters(SerializeParamsForGet(transactionId, context.Config->Prefix, path, options)); + return NodeFromYsonString(RetryRequestWithPolicy(retryPolicy, context, header).Response); +} + +TNode TryGet( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options) +{ + try { + return Get(retryPolicy, context, transactionId, path, options); + } catch (const TErrorResponse& error) { + if (!error.IsResolveError()) { + throw; + } + return TNode(); + } +} + +void Set( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TNode& value, + const TSetOptions& options) +{ + THttpHeader header("PUT", "set"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForSet(transactionId, context.Config->Prefix, path, options)); + auto body = NodeToYsonString(value); + RetryRequestWithPolicy(retryPolicy, context, header, body); +} + +void MultisetAttributes( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TNode::TMapType& value, + const TMultisetAttributesOptions& options) +{ + THttpHeader header("PUT", "api/v4/multiset_attributes", false); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForMultisetAttributes(transactionId, context.Config->Prefix, path, options)); + + auto body = NodeToYsonString(value); + RetryRequestWithPolicy(retryPolicy, context, header, body); +} + +bool Exists( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TExistsOptions& options) +{ + THttpHeader header("GET", "exists"); + header.MergeParameters(SerializeParamsForExists(transactionId, context.Config->Prefix, path, options)); + return ParseBoolFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); +} + +TNodeId Create( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const ENodeType& type, + const TCreateOptions& options) +{ + THttpHeader header("POST", "create"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForCreate(transactionId, context.Config->Prefix, path, type, options)); + return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); +} + +TNodeId Copy( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options) +{ + THttpHeader header("POST", "copy"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options)); + return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); +} + +TNodeId Move( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options) +{ + THttpHeader header("POST", "move"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options)); + return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); +} + +void Remove( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TRemoveOptions& options) +{ + THttpHeader header("POST", "remove"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForRemove(transactionId, context.Config->Prefix, path, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +TNode::TListType List( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TListOptions& options) +{ + THttpHeader header("GET", "list"); + + TYPath updatedPath = AddPathPrefix(path, context.Config->Prefix); + // Translate "//" to "/" + // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config" + if (path.empty() && updatedPath.EndsWith('/')) { + updatedPath.pop_back(); + } + header.MergeParameters(SerializeParamsForList(transactionId, context.Config->Prefix, updatedPath, options)); + auto result = RetryRequestWithPolicy(retryPolicy, context, header); + return NodeFromYsonString(result.Response).AsList(); +} + +TNodeId Link( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options) +{ + THttpHeader header("POST", "link"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForLink(transactionId, context.Config->Prefix, targetPath, linkPath, options)); + return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); +} + +TLockId Lock( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + ELockMode mode, + const TLockOptions& options) +{ + THttpHeader header("POST", "lock"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForLock(transactionId, context.Config->Prefix, path, mode, options)); + return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); +} + +void Unlock( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TUnlockOptions& options) +{ + THttpHeader header("POST", "unlock"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForUnlock(transactionId, context.Config->Prefix, path, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void Concatenate( + const TClientContext& context, + const TTransactionId& transactionId, + const TVector<TRichYPath>& sourcePaths, + const TRichYPath& destinationPath, + const TConcatenateOptions& options) +{ + THttpHeader header("POST", "concatenate"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForConcatenate(transactionId, context.Config->Prefix, sourcePaths, destinationPath, options)); + RequestWithoutRetry(context, header); +} + +void PingTx( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId) +{ + THttpHeader header("POST", "ping_tx"); + header.MergeParameters(SerializeParamsForPingTx(transactionId)); + TRequestConfig requestConfig; + requestConfig.HttpConfig = NHttpClient::THttpConfig{ + .SocketTimeout = context.Config->PingTimeout + }; + RetryRequestWithPolicy(retryPolicy, context, header, {}, requestConfig); +} + +TOperationAttributes ParseOperationAttributes(const TNode& node) +{ + const auto& mapNode = node.AsMap(); + TOperationAttributes result; + + if (auto idNode = mapNode.FindPtr("id")) { + result.Id = GetGuid(idNode->AsString()); + } + + if (auto typeNode = mapNode.FindPtr("type")) { + result.Type = FromString<EOperationType>(typeNode->AsString()); + } else if (auto operationTypeNode = mapNode.FindPtr("operation_type")) { + // COMPAT(levysotsky): "operation_type" is a deprecated synonim for "type". + // This branch should be removed when all clusters are updated. + result.Type = FromString<EOperationType>(operationTypeNode->AsString()); + } + + if (auto stateNode = mapNode.FindPtr("state")) { + result.State = stateNode->AsString(); + // We don't use FromString here, because OS_IN_PROGRESS unites many states: "initializing", "running", etc. + if (*result.State == "completed") { + result.BriefState = EOperationBriefState::Completed; + } else if (*result.State == "aborted") { + result.BriefState = EOperationBriefState::Aborted; + } else if (*result.State == "failed") { + result.BriefState = EOperationBriefState::Failed; + } else { + result.BriefState = EOperationBriefState::InProgress; + } + } + if (auto authenticatedUserNode = mapNode.FindPtr("authenticated_user")) { + result.AuthenticatedUser = authenticatedUserNode->AsString(); + } + if (auto startTimeNode = mapNode.FindPtr("start_time")) { + result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString()); + } + if (auto finishTimeNode = mapNode.FindPtr("finish_time")) { + result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString()); + } + auto briefProgressNode = mapNode.FindPtr("brief_progress"); + if (briefProgressNode && briefProgressNode->HasKey("jobs")) { + result.BriefProgress.ConstructInPlace(); + static auto load = [] (const TNode& item) { + // Backward compatibility with old YT versions + return item.IsInt64() ? item.AsInt64() : item["total"].AsInt64(); + }; + const auto& jobs = (*briefProgressNode)["jobs"]; + result.BriefProgress->Aborted = load(jobs["aborted"]); + result.BriefProgress->Completed = load(jobs["completed"]); + result.BriefProgress->Running = jobs["running"].AsInt64(); + result.BriefProgress->Total = jobs["total"].AsInt64(); + result.BriefProgress->Failed = jobs["failed"].AsInt64(); + result.BriefProgress->Lost = jobs["lost"].AsInt64(); + result.BriefProgress->Pending = jobs["pending"].AsInt64(); + } + if (auto briefSpecNode = mapNode.FindPtr("brief_spec")) { + result.BriefSpec = *briefSpecNode; + } + if (auto specNode = mapNode.FindPtr("spec")) { + result.Spec = *specNode; + } + if (auto fullSpecNode = mapNode.FindPtr("full_spec")) { + result.FullSpec = *fullSpecNode; + } + if (auto unrecognizedSpecNode = mapNode.FindPtr("unrecognized_spec")) { + result.UnrecognizedSpec = *unrecognizedSpecNode; + } + if (auto suspendedNode = mapNode.FindPtr("suspended")) { + result.Suspended = suspendedNode->AsBool(); + } + if (auto resultNode = mapNode.FindPtr("result")) { + result.Result.ConstructInPlace(); + auto error = TYtError((*resultNode)["error"]); + if (error.GetCode() != 0) { + result.Result->Error = std::move(error); + } + } + if (auto progressNode = mapNode.FindPtr("progress")) { + const auto& progressMap = progressNode->AsMap(); + TMaybe<TInstant> buildTime; + if (auto buildTimeNode = progressMap.FindPtr("build_time")) { + buildTime = TInstant::ParseIso8601(buildTimeNode->AsString()); + } + TJobStatistics jobStatistics; + if (auto jobStatisticsNode = progressMap.FindPtr("job_statistics")) { + jobStatistics = TJobStatistics(*jobStatisticsNode); + } + TJobCounters jobCounters; + if (auto jobCountersNode = progressMap.FindPtr("total_job_counter")) { + jobCounters = TJobCounters(*jobCountersNode); + } + result.Progress = TOperationProgress{ + .JobStatistics = std::move(jobStatistics), + .JobCounters = std::move(jobCounters), + .BuildTime = buildTime, + }; + } + if (auto eventsNode = mapNode.FindPtr("events")) { + result.Events.ConstructInPlace().reserve(eventsNode->Size()); + for (const auto& eventNode : eventsNode->AsList()) { + result.Events->push_back(TOperationEvent{ + eventNode["state"].AsString(), + TInstant::ParseIso8601(eventNode["time"].AsString()), + }); + } + } + if (auto alertsNode = mapNode.FindPtr("alerts")) { + result.Alerts.ConstructInPlace(); + for (const auto& [alertType, alertError] : alertsNode->AsMap()) { + result.Alerts->emplace(alertType, TYtError(alertError)); + } + } + + return result; +} + +TOperationAttributes GetOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TGetOperationOptions& options) +{ + THttpHeader header("GET", "get_operation"); + header.MergeParameters(SerializeParamsForGetOperation(operationId, options)); + auto result = RetryRequestWithPolicy(retryPolicy, context, header); + return ParseOperationAttributes(NodeFromYsonString(result.Response)); +} + +void AbortOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId) +{ + THttpHeader header("POST", "abort_op"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForAbortOperation(operationId)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void CompleteOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId) +{ + THttpHeader header("POST", "complete_op"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForCompleteOperation(operationId)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void SuspendOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TSuspendOperationOptions& options) +{ + THttpHeader header("POST", "suspend_op"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForSuspendOperation(operationId, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void ResumeOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TResumeOperationOptions& options) +{ + THttpHeader header("POST", "resume_op"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForResumeOperation(operationId, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +template <typename TKey> +static THashMap<TKey, i64> GetCounts(const TNode& countsNode) +{ + THashMap<TKey, i64> counts; + for (const auto& entry : countsNode.AsMap()) { + counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64()); + } + return counts; +} + +TListOperationsResult ListOperations( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TListOperationsOptions& options) +{ + THttpHeader header("GET", "list_operations"); + header.MergeParameters(SerializeParamsForListOperations(options)); + auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + + TListOperationsResult result; + for (const auto& operationNode : resultNode["operations"].AsList()) { + result.Operations.push_back(ParseOperationAttributes(operationNode)); + } + + if (resultNode.HasKey("pool_counts")) { + result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]); + } + if (resultNode.HasKey("user_counts")) { + result.UserCounts = GetCounts<TString>(resultNode["user_counts"]); + } + if (resultNode.HasKey("type_counts")) { + result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]); + } + if (resultNode.HasKey("state_counts")) { + result.StateCounts = GetCounts<TString>(resultNode["state_counts"]); + } + if (resultNode.HasKey("failed_jobs_count")) { + result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64(); + } + + result.Incomplete = resultNode["incomplete"].AsBool(); + + return result; +} + +void UpdateOperationParameters( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options) +{ + THttpHeader header("POST", "update_op_parameters"); + header.MergeParameters(SerializeParamsForUpdateOperationParameters(operationId, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +TJobAttributes ParseJobAttributes(const TNode& node) +{ + const auto& mapNode = node.AsMap(); + TJobAttributes result; + + // Currently "get_job" returns "job_id" field and "list_jobs" returns "id" field. + auto idNode = mapNode.FindPtr("id"); + if (!idNode) { + idNode = mapNode.FindPtr("job_id"); + } + if (idNode) { + result.Id = GetGuid(idNode->AsString()); + } + + if (auto typeNode = mapNode.FindPtr("type")) { + result.Type = FromString<EJobType>(typeNode->AsString()); + } + if (auto stateNode = mapNode.FindPtr("state")) { + result.State = FromString<EJobState>(stateNode->AsString()); + } + if (auto addressNode = mapNode.FindPtr("address")) { + result.Address = addressNode->AsString(); + } + if (auto taskNameNode = mapNode.FindPtr("task_name")) { + result.TaskName = taskNameNode->AsString(); + } + if (auto startTimeNode = mapNode.FindPtr("start_time")) { + result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString()); + } + if (auto finishTimeNode = mapNode.FindPtr("finish_time")) { + result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString()); + } + if (auto progressNode = mapNode.FindPtr("progress")) { + result.Progress = progressNode->AsDouble(); + } + if (auto stderrSizeNode = mapNode.FindPtr("stderr_size")) { + result.StderrSize = stderrSizeNode->AsUint64(); + } + if (auto errorNode = mapNode.FindPtr("error")) { + result.Error.ConstructInPlace(*errorNode); + } + if (auto briefStatisticsNode = mapNode.FindPtr("brief_statistics")) { + result.BriefStatistics = *briefStatisticsNode; + } + if (auto inputPathsNode = mapNode.FindPtr("input_paths")) { + const auto& inputPathNodesList = inputPathsNode->AsList(); + result.InputPaths.ConstructInPlace(); + result.InputPaths->reserve(inputPathNodesList.size()); + for (const auto& inputPathNode : inputPathNodesList) { + TRichYPath path; + Deserialize(path, inputPathNode); + result.InputPaths->push_back(std::move(path)); + } + } + if (auto coreInfosNode = mapNode.FindPtr("core_infos")) { + const auto& coreInfoNodesList = coreInfosNode->AsList(); + result.CoreInfos.ConstructInPlace(); + result.CoreInfos->reserve(coreInfoNodesList.size()); + for (const auto& coreInfoNode : coreInfoNodesList) { + TCoreInfo coreInfo; + coreInfo.ProcessId = coreInfoNode["process_id"].AsInt64(); + coreInfo.ExecutableName = coreInfoNode["executable_name"].AsString(); + if (coreInfoNode.HasKey("size")) { + coreInfo.Size = coreInfoNode["size"].AsUint64(); + } + if (coreInfoNode.HasKey("error")) { + coreInfo.Error.ConstructInPlace(coreInfoNode["error"]); + } + result.CoreInfos->push_back(std::move(coreInfo)); + } + } + return result; +} + +TJobAttributes GetJob( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobOptions& options) +{ + THttpHeader header("GET", "get_job"); + header.MergeParameters(SerializeParamsForGetJob(operationId, jobId, options)); + auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + return ParseJobAttributes(resultNode); +} + +TListJobsResult ListJobs( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TListJobsOptions& options) +{ + THttpHeader header("GET", "list_jobs"); + header.MergeParameters(SerializeParamsForListJobs(operationId, options)); + auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + + TListJobsResult result; + + const auto& jobNodesList = resultNode["jobs"].AsList(); + result.Jobs.reserve(jobNodesList.size()); + for (const auto& jobNode : jobNodesList) { + result.Jobs.push_back(ParseJobAttributes(jobNode)); + } + + if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) { + result.CypressJobCount = resultNode["cypress_job_count"].AsInt64(); + } + if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) { + result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64(); + } + if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) { + result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64(); + } + + return result; +} + +class TResponseReader + : public IFileReader +{ +public: + TResponseReader(const TClientContext& context, THttpHeader header) + { + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + + auto hostName = GetProxyForHeavyRequest(context); + auto requestId = CreateGuidAsString(); + + Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); + ResponseStream_ = Response_->GetResponseStream(); + } + +private: + size_t DoRead(void* buf, size_t len) override + { + return ResponseStream_->Read(buf, len); + } + + size_t DoSkip(size_t len) override + { + return ResponseStream_->Skip(len); + } + +private: + THttpRequest Request_; + NHttpClient::IHttpResponsePtr Response_; + IInputStream* ResponseStream_; +}; + +IFileReaderPtr GetJobInput( + const TClientContext& context, + const TJobId& jobId, + const TGetJobInputOptions& /* options */) +{ + THttpHeader header("GET", "get_job_input"); + header.AddParameter("job_id", GetGuidAsString(jobId)); + return new TResponseReader(context, std::move(header)); +} + +IFileReaderPtr GetJobFailContext( + const TClientContext& context, + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobFailContextOptions& /* options */) +{ + THttpHeader header("GET", "get_job_fail_context"); + header.AddOperationId(operationId); + header.AddParameter("job_id", GetGuidAsString(jobId)); + return new TResponseReader(context, std::move(header)); +} + +TString GetJobStderrWithRetries( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& /* options */) +{ + THttpHeader header("GET", "get_job_stderr"); + header.AddOperationId(operationId); + header.AddParameter("job_id", GetGuidAsString(jobId)); + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header, {}, config); + return responseInfo.Response; +} + +IFileReaderPtr GetJobStderr( + const TClientContext& context, + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& /* options */) +{ + THttpHeader header("GET", "get_job_stderr"); + header.AddOperationId(operationId); + header.AddParameter("job_id", GetGuidAsString(jobId)); + return new TResponseReader(context, std::move(header)); +} + +TMaybe<TYPath> GetFileFromCache( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions& options) +{ + THttpHeader header("GET", "get_file_from_cache"); + header.MergeParameters(SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options)); + auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); + auto path = NodeFromYsonString(responseInfo.Response).AsString(); + return path.empty() ? Nothing() : TMaybe<TYPath>(path); +} + +TYPath PutFileToCache( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options) +{ + THttpHeader header("POST", "put_file_to_cache"); + header.MergeParameters(SerializeParamsForPutFileToCache(transactionId, context.Config->Prefix, filePath, md5Signature, cachePath, options)); + auto result = RetryRequestWithPolicy(retryPolicy, context, header); + return NodeFromYsonString(result.Response).AsString(); +} + +TNode::TListType SkyShareTable( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options) +{ + THttpHeader header("POST", "api/v1/share", /*IsApi*/ false); + + auto proxyName = context.ServerName.substr(0, context.ServerName.find('.')); + + auto host = context.Config->SkynetApiHost; + if (host == "") { + host = "skynet." + proxyName + ".yt.yandex.net"; + } + + header.MergeParameters(SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, options)); + TClientContext skyApiHost({ .ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient() }); + TResponseInfo response = {}; + + // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu + // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK). + while (response.HttpCode != 200) { + response = RetryRequestWithPolicy(retryPolicy, skyApiHost, header, ""); + TWaitProxy::Get()->Sleep(TDuration::Seconds(5)); + } + + if (options.KeyColumns_) { + return NodeFromJsonString(response.Response)["torrents"].AsList(); + } else { + TNode torrent; + + torrent["key"] = TNode::CreateList(); + torrent["rbtorrent"] = response.Response; + + return TNode::TListType{ torrent }; + } +} + +TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node) +{ + auto parseSingleResult = [] (const TNode::TMapType& node) { + TCheckPermissionResult result; + result.Action = ::FromString<ESecurityAction>(node.at("action").AsString()); + if (auto objectId = node.FindPtr("object_id")) { + result.ObjectId = GetGuid(objectId->AsString()); + } + if (auto objectName = node.FindPtr("object_name")) { + result.ObjectName = objectName->AsString(); + } + if (auto subjectId = node.FindPtr("subject_id")) { + result.SubjectId = GetGuid(subjectId->AsString()); + } + if (auto subjectName = node.FindPtr("subject_name")) { + result.SubjectName = subjectName->AsString(); + } + return result; + }; + + const auto& mapNode = node.AsMap(); + TCheckPermissionResponse result; + static_cast<TCheckPermissionResult&>(result) = parseSingleResult(mapNode); + if (auto columns = mapNode.FindPtr("columns")) { + result.Columns.reserve(columns->AsList().size()); + for (const auto& columnNode : columns->AsList()) { + result.Columns.push_back(parseSingleResult(columnNode.AsMap())); + } + } + return result; +} + +TCheckPermissionResponse CheckPermission( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TString& user, + EPermission permission, + const TYPath& path, + const TCheckPermissionOptions& options) +{ + THttpHeader header("GET", "check_permission"); + header.MergeParameters(SerializeParamsForCheckPermission(user, permission, context.Config->Prefix, path, options)); + auto response = RetryRequestWithPolicy(retryPolicy, context, header); + return ParseCheckPermissionResponse(NodeFromYsonString(response.Response)); +} + +TVector<TTabletInfo> GetTabletInfos( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TYPath& path, + const TVector<int>& tabletIndexes, + const TGetTabletInfosOptions& options) +{ + THttpHeader header("POST", "api/v4/get_tablet_infos", false); + header.MergeParameters(SerializeParamsForGetTabletInfos(context.Config->Prefix, path, tabletIndexes, options)); + auto response = RetryRequestWithPolicy(retryPolicy, context, header); + TVector<TTabletInfo> result; + Deserialize(result, *NodeFromYsonString(response.Response).AsMap().FindPtr("tablets")); + return result; +} + +TVector<TTableColumnarStatistics> GetTableColumnarStatistics( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options) +{ + THttpHeader header("GET", "get_table_columnar_statistics"); + header.MergeParameters(SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options)); + TRequestConfig config; + config.IsHeavy = true; + auto requestResult = RetryRequestWithPolicy(retryPolicy, context, header, {}, config); + auto response = NodeFromYsonString(requestResult.Response); + TVector<TTableColumnarStatistics> result; + Deserialize(result, response); + return result; +} + +TMultiTablePartitions GetTablePartitions( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options) +{ + THttpHeader header("GET", "partition_tables"); + header.MergeParameters(SerializeParamsForGetTablePartitions(transactionId, paths, options)); + TRequestConfig config; + config.IsHeavy = true; + auto requestResult = RetryRequestWithPolicy(retryPolicy, context, header, {}, config); + auto response = NodeFromYsonString(requestResult.Response); + TMultiTablePartitions result; + Deserialize(result, response); + return result; +} + +TRichYPath CanonizeYPath( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TRichYPath& path) +{ + return CanonizeYPaths(retryPolicy, context, {path}).front(); +} + +TVector<TRichYPath> CanonizeYPaths( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TVector<TRichYPath>& paths) +{ + TRawBatchRequest batch(context.Config); + TVector<NThreading::TFuture<TRichYPath>> futures; + futures.reserve(paths.size()); + for (int i = 0; i < static_cast<int>(paths.size()); ++i) { + futures.push_back(batch.CanonizeYPath(paths[i])); + } + ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{}); + TVector<TRichYPath> result; + result.reserve(futures.size()); + for (auto& future : futures) { + result.push_back(future.ExtractValueSync()); + } + return result; +} + +void AlterTable( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TAlterTableOptions& options) +{ + THttpHeader header("POST", "alter_table"); + header.AddMutationId(); + header.MergeParameters(SerializeParamsForAlterTable(transactionId, context.Config->Prefix, path, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void AlterTableReplica( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TReplicaId& replicaId, + const TAlterTableReplicaOptions& options) +{ + THttpHeader header("POST", "alter_table_replica"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void DeleteRows( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TYPath& path, + const TNode::TListType& keys, + const TDeleteRowsOptions& options) +{ + THttpHeader header("PUT", "delete_rows"); + header.SetInputFormat(TFormat::YsonBinary()); + header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(context.Config->Prefix, path, options)); + + auto body = NodeListToYsonString(keys); + TRequestConfig requestConfig; + requestConfig.IsHeavy = true; + RetryRequestWithPolicy(retryPolicy, context, header, body, requestConfig); +} + +void FreezeTable( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TYPath& path, + const TFreezeTableOptions& options) +{ + THttpHeader header("POST", "freeze_table"); + header.MergeParameters(SerializeParamsForFreezeTable(context.Config->Prefix, path, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void UnfreezeTable( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TYPath& path, + const TUnfreezeTableOptions& options) +{ + THttpHeader header("POST", "unfreeze_table"); + header.MergeParameters(SerializeParamsForUnfreezeTable(context.Config->Prefix, path, options)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void AbortTransaction( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId) +{ + THttpHeader header("POST", "abort_tx"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +void CommitTransaction( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId) +{ + THttpHeader header("POST", "commit_tx"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId)); + RetryRequestWithPolicy(retryPolicy, context, header); +} + +TTransactionId StartTransaction( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& parentTransactionId, + const TStartTransactionOptions& options) +{ + THttpHeader header("POST", "start_tx"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, context.Config->TxTimeout, options)); + return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail::NRawClient diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h new file mode 100644 index 0000000000..05fcbade76 --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -0,0 +1,397 @@ +#pragma once + +#include "raw_batch_request.h" + +#include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/operation.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class IRequestRetryPolicy; +struct TClientContext; +struct TExecuteBatchOptions; + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail::NRawClient { + +//////////////////////////////////////////////////////////////////////////////// + +TOperationAttributes ParseOperationAttributes(const TNode& node); + +TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node); + +//////////////////////////////////////////////////////////////////////////////// + +// +// marks `batchRequest' as executed +void ExecuteBatch( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + TRawBatchRequest& batchRequest, + const TExecuteBatchOptions& options = TExecuteBatchOptions()); + +// +// Cypress +// + +TNode Get( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options = TGetOptions()); + +TNode TryGet( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TGetOptions& options); + +void Set( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TNode& value, + const TSetOptions& options = TSetOptions()); + +void MultisetAttributes( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TNode::TMapType& value, + const TMultisetAttributesOptions& options = TMultisetAttributesOptions()); + +bool Exists( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TExistsOptions& options = TExistsOptions()); + +TNodeId Create( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const ENodeType& type, + const TCreateOptions& options = TCreateOptions()); + +TNodeId Copy( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options = TCopyOptions()); + +TNodeId Move( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options = TMoveOptions()); + +void Remove( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TRemoveOptions& options = TRemoveOptions()); + +TNode::TListType List( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TListOptions& options = TListOptions()); + +TNodeId Link( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options = TLinkOptions()); + +TLockId Lock( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + ELockMode mode, + const TLockOptions& options = TLockOptions()); + +void Unlock( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TUnlockOptions& options = TUnlockOptions()); + +void Concatenate( + const TClientContext& context, + const TTransactionId& transactionId, + const TVector<TRichYPath>& sourcePaths, + const TRichYPath& destinationPath, + const TConcatenateOptions& options = TConcatenateOptions()); + +// +// Transactions +// + +void PingTx( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId); + +// +// Operations +// + +TOperationAttributes GetOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TGetOperationOptions& options = TGetOperationOptions()); + +void AbortOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId); + +void CompleteOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId); + +void SuspendOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TSuspendOperationOptions& options = TSuspendOperationOptions()); + +void ResumeOperation( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TResumeOperationOptions& options = TResumeOperationOptions()); + +TListOperationsResult ListOperations( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TListOperationsOptions& options = TListOperationsOptions()); + +void UpdateOperationParameters( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options = TUpdateOperationParametersOptions()); + +// +// Jobs +// + +TJobAttributes GetJob( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobOptions& options = TGetJobOptions()); + +TListJobsResult ListJobs( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TListJobsOptions& options = TListJobsOptions()); + +::TIntrusivePtr<IFileReader> GetJobInput( + const TClientContext& context, + const TJobId& jobId, + const TGetJobInputOptions& options = TGetJobInputOptions()); + +::TIntrusivePtr<IFileReader> GetJobFailContext( + const TClientContext& context, + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobFailContextOptions& options = TGetJobFailContextOptions()); + +TString GetJobStderrWithRetries( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& /* options */ = TGetJobStderrOptions()); + +::TIntrusivePtr<IFileReader> GetJobStderr( + const TClientContext& context, + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& options = TGetJobStderrOptions()); + +// +// File cache +// + +TMaybe<TYPath> GetFileFromCache( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions& options = TGetFileFromCacheOptions()); + +TYPath PutFileToCache( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options = TPutFileToCacheOptions()); + +// +// SkyShare +// + +TNode::TListType SkyShareTable( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options); + +// +// Misc +// + +TCheckPermissionResponse CheckPermission( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TString& user, + EPermission permission, + const TYPath& path, + const TCheckPermissionOptions& options = TCheckPermissionOptions()); + +TVector<TTabletInfo> GetTabletInfos( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TYPath& path, + const TVector<int>& tabletIndexes, + const TGetTabletInfosOptions& options); + +TVector<TTableColumnarStatistics> GetTableColumnarStatistics( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options); + +TMultiTablePartitions GetTablePartitions( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options); + +TRichYPath CanonizeYPath( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TRichYPath& path); + +TVector<TRichYPath> CanonizeYPaths( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TVector<TRichYPath>& paths); + +// +// Tables +// + +void AlterTable( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path, + const TAlterTableOptions& options); + +void AlterTableReplica( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TReplicaId& replicaId, + const TAlterTableReplicaOptions& options); + +void DeleteRows( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TYPath& path, + const TNode::TListType& keys, + const TDeleteRowsOptions& options); + +void FreezeTable( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TYPath& path, + const TFreezeTableOptions& options); + +void UnfreezeTable( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TYPath& path, + const TUnfreezeTableOptions& options); + + +// Transactions +void AbortTransaction( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId); + +void CommitTransaction( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId); + +TTransactionId StartTransaction( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& parentId, + const TStartTransactionOptions& options); + +//////////////////////////////////////////////////////////////////////////////// + +template<typename TSrc, typename TBatchAdder> +auto BatchTransform( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TSrc& src, + TBatchAdder batchAdder, + const TExecuteBatchOptions& executeBatchOptions = {}) +{ + TRawBatchRequest batch(context.Config); + using TFuture = decltype(batchAdder(batch, *std::begin(src))); + TVector<TFuture> futures; + for (const auto& el : src) { + futures.push_back(batchAdder(batch, el)); + } + ExecuteBatch(retryPolicy, context, batch, executeBatchOptions); + using TDst = decltype(futures[0].ExtractValueSync()); + TVector<TDst> result; + result.reserve(std::size(src)); + for (auto& future : futures) { + result.push_back(future.ExtractValueSync()); + } + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail::NRawClient +} // namespace NYT diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp new file mode 100644 index 0000000000..1936266d0d --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -0,0 +1,873 @@ +#include "rpc_parameters_serialization.h" + +#include <yt/cpp/mapreduce/common/helpers.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/operation.h> +#include <yt/cpp/mapreduce/interface/serialize.h> + +#include <library/cpp/yson/node/node.h> +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yson/node/node_builder.h> + +#include <util/generic/guid.h> +#include <util/string/cast.h> + +namespace NYT::NDetail::NRawClient { + +using ::ToString; + +//////////////////////////////////////////////////////////////////// + +static void SetTransactionIdParam(TNode* node, const TTransactionId& transactionId) +{ + if (transactionId != TTransactionId()) { + (*node)["transaction_id"] = GetGuidAsString(transactionId); + } +} + +static void SetOperationIdParam(TNode* node, const TOperationId& operationId) +{ + (*node)["operation_id"] = GetGuidAsString(operationId); +} + +static void SetPathParam(TNode* node, const TString& pathPrefix, const TYPath& path) +{ + (*node)["path"] = AddPathPrefix(path, pathPrefix); +} + +static TNode SerializeAttributeFilter(const TAttributeFilter& attributeFilter) +{ + TNode result = TNode::CreateList(); + for (const auto& attribute : attributeFilter.Attributes_) { + result.Add(attribute); + } + return result; +} + +static TNode SerializeAttributeFilter(const TOperationAttributeFilter& attributeFilter) +{ + TNode result = TNode::CreateList(); + for (const auto& attribute : attributeFilter.Attributes_) { + result.Add(ToString(attribute)); + } + return result; +} + +template <typename TOptions> +static void SetFirstLastTabletIndex(TNode* node, const TOptions& options) +{ + if (options.FirstTabletIndex_) { + (*node)["first_tablet_index"] = *options.FirstTabletIndex_; + } + if (options.LastTabletIndex_) { + (*node)["last_tablet_index"] = *options.LastTabletIndex_; + } +} + +static TString GetDefaultTransactionTitle() +{ + const auto processState = TProcessState::Get(); + TStringStream res; + + res << "User transaction. Created by: " << processState->UserName << " on " << processState->FqdnHostName + << " client: " << processState->ClientVersion << " pid: " << processState->Pid; + if (!processState->CommandLine.empty()) { + res << " program: " << processState->CommandLine[0]; + } else { + res << " command line is unknown probably NYT::Initialize was never called"; + } + +#ifndef NDEBUG + res << " build: debug"; +#endif + + return res.Str(); +} + +template <typename T> +void SerializeMasterReadOptions(TNode* node, const TMasterReadOptions<T>& options) +{ + if (options.ReadFrom_) { + (*node)["read_from"] = ToString(*options.ReadFrom_); + } +} + +//////////////////////////////////////////////////////////////////// + +TNode SerializeParamsForCreate( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + ENodeType type, + const TCreateOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + result["recursive"] = options.Recursive_; + result["type"] = ToString(type); + result["ignore_existing"] = options.IgnoreExisting_; + result["force"] = options.Force_; + if (options.Attributes_) { + result["attributes"] = *options.Attributes_; + } + return result; +} + +TNode SerializeParamsForRemove( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TRemoveOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + result["recursive"] = options.Recursive_; + result["force"] = options.Force_; + return result; +} + +TNode SerializeParamsForExists( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TExistsOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + SerializeMasterReadOptions(&result, options); + return result; +} + +TNode SerializeParamsForGet( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TGetOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + SerializeMasterReadOptions(&result, options); + if (options.AttributeFilter_) { + result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_); + } + if (options.MaxSize_) { + result["max_size"] = *options.MaxSize_; + } + return result; +} + +TNode SerializeParamsForSet( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TSetOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + result["recursive"] = options.Recursive_; + if (options.Force_) { + result["force"] = *options.Force_; + } + return result; +} + +TNode SerializeParamsForMultisetAttributes( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + [[maybe_unused]] const TMultisetAttributesOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + return result; +} + +TNode SerializeParamsForList( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TListOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + SerializeMasterReadOptions(&result, options); + if (options.MaxSize_) { + result["max_size"] = *options.MaxSize_; + } + if (options.AttributeFilter_) { + result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_); + } + return result; +} + +TNode SerializeParamsForCopy( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["source_path"] = AddPathPrefix(sourcePath, pathPrefix); + result["destination_path"] = AddPathPrefix(destinationPath, pathPrefix); + result["recursive"] = options.Recursive_; + result["force"] = options.Force_; + result["preserve_account"] = options.PreserveAccount_; + if (options.PreserveExpirationTime_) { + result["preserve_expiration_time"] = *options.PreserveExpirationTime_; + } + return result; +} + +TNode SerializeParamsForMove( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["source_path"] = AddPathPrefix(sourcePath, pathPrefix); + result["destination_path"] = AddPathPrefix(destinationPath, pathPrefix); + result["recursive"] = options.Recursive_; + result["force"] = options.Force_; + result["preserve_account"] = options.PreserveAccount_; + if (options.PreserveExpirationTime_) { + result["preserve_expiration_time"] = *options.PreserveExpirationTime_; + } + return result; +} + +TNode SerializeParamsForLink( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["target_path"] = AddPathPrefix(targetPath, pathPrefix); + result["link_path"] = AddPathPrefix(linkPath, pathPrefix); + result["recursive"] = options.Recursive_; + result["ignore_existing"] = options.IgnoreExisting_; + result["force"] = options.Force_; + if (options.Attributes_) { + result["attributes"] = *options.Attributes_; + } + return result; +} + +TNode SerializeParamsForLock( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + ELockMode mode, + const TLockOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + result["mode"] = ToString(mode); + result["waitable"] = options.Waitable_; + if (options.AttributeKey_) { + result["attribute_key"] = *options.AttributeKey_; + } + if (options.ChildKey_) { + result["child_key"] = *options.ChildKey_; + } + return result; +} + +TNode SerializeParamsForUnlock( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TUnlockOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + Y_UNUSED(options); + return result; +} + +TNode SerializeParamsForConcatenate( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TVector<TRichYPath>& sourcePaths, + const TRichYPath& destinationPath, + const TConcatenateOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + { + auto actualDestination = destinationPath; + actualDestination.Path(AddPathPrefix(actualDestination.Path_, pathPrefix)); + if (options.Append_) { + actualDestination.Append(*options.Append_); + } + result["destination_path"] = PathToNode(actualDestination); + } + auto& sourcePathsNode = result["source_paths"]; + for (const auto& path : sourcePaths) { + auto actualSource = path; + actualSource.Path(AddPathPrefix(actualSource.Path_, pathPrefix)); + sourcePathsNode.Add(PathToNode(actualSource)); + } + return result; +} + +TNode SerializeParamsForPingTx( + const TTransactionId& transactionId) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + return result; +} + +TNode SerializeParamsForListOperations( + const TListOperationsOptions& options) +{ + TNode result = TNode::CreateMap(); + if (options.FromTime_) { + result["from_time"] = ToString(*options.FromTime_); + } + if (options.ToTime_) { + result["to_time"] = ToString(*options.ToTime_); + } + if (options.CursorTime_) { + result["cursor_time"] = ToString(*options.CursorTime_); + } + if (options.CursorDirection_) { + result["cursor_direction"] = ToString(*options.CursorDirection_); + } + if (options.Pool_) { + result["pool"] = *options.Pool_; + } + if (options.Filter_) { + result["filter"] = *options.Filter_; + } + if (options.User_) { + result["user"] = *options.User_; + } + if (options.State_) { + result["state"] = *options.State_; + } + if (options.Type_) { + result["type"] = ToString(*options.Type_); + } + if (options.WithFailedJobs_) { + result["with_failed_jobs"] = *options.WithFailedJobs_; + } + if (options.IncludeCounters_) { + result["include_counters"] = *options.IncludeCounters_; + } + if (options.IncludeArchive_) { + result["include_archive"] = *options.IncludeArchive_; + } + if (options.Limit_) { + result["limit"] = *options.Limit_; + } + return result; +} + +TNode SerializeParamsForGetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options) +{ + TNode result; + SetOperationIdParam(&result, operationId); + if (options.AttributeFilter_) { + result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_); + } + return result; +} + +TNode SerializeParamsForAbortOperation(const TOperationId& operationId) +{ + TNode result; + SetOperationIdParam(&result, operationId); + return result; +} + +TNode SerializeParamsForCompleteOperation(const TOperationId& operationId) +{ + TNode result; + SetOperationIdParam(&result, operationId); + return result; +} + +TNode SerializeParamsForSuspendOperation( + const TOperationId& operationId, + const TSuspendOperationOptions& options) +{ + TNode result; + SetOperationIdParam(&result, operationId); + if (options.AbortRunningJobs_) { + result["abort_running_jobs"] = *options.AbortRunningJobs_; + } + return result; +} + +TNode SerializeParamsForResumeOperation( + const TOperationId& operationId, + const TResumeOperationOptions& options) +{ + TNode result; + SetOperationIdParam(&result, operationId); + Y_UNUSED(options); + return result; +} + +TNode SerializeParamsForUpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options) +{ + TNode result; + SetOperationIdParam(&result, operationId); + TNode& parameters = result["parameters"]; + if (options.Pool_) { + parameters["pool"] = *options.Pool_; + } + if (options.Weight_) { + parameters["weight"] = *options.Weight_; + } + if (!options.Owners_.empty()) { + parameters["owners"] = TNode::CreateList(); + for (const auto& owner : options.Owners_) { + parameters["owners"].Add(owner); + } + } + if (options.SchedulingOptionsPerPoolTree_) { + parameters["scheduling_options_per_pool_tree"] = TNode::CreateMap(); + for (const auto& entry : options.SchedulingOptionsPerPoolTree_->Options_) { + auto schedulingOptionsNode = TNode::CreateMap(); + const auto& schedulingOptions = entry.second; + if (schedulingOptions.Pool_) { + schedulingOptionsNode["pool"] = *schedulingOptions.Pool_; + } + if (schedulingOptions.Weight_) { + schedulingOptionsNode["weight"] = *schedulingOptions.Weight_; + } + if (schedulingOptions.ResourceLimits_) { + auto resourceLimitsNode = TNode::CreateMap(); + const auto& resourceLimits = *schedulingOptions.ResourceLimits_; + if (resourceLimits.UserSlots_) { + resourceLimitsNode["user_slots"] = *resourceLimits.UserSlots_; + } + if (resourceLimits.Memory_) { + resourceLimitsNode["memory"] = *resourceLimits.Memory_; + } + if (resourceLimits.Cpu_) { + resourceLimitsNode["cpu"] = *resourceLimits.Cpu_; + } + if (resourceLimits.Network_) { + resourceLimitsNode["network"] = *resourceLimits.Network_; + } + schedulingOptionsNode["resource_limits"] = std::move(resourceLimitsNode); + } + parameters["scheduling_options_per_pool_tree"][entry.first] = std::move(schedulingOptionsNode); + } + } + return result; +} + +TNode SerializeParamsForGetJob( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobOptions& /* options */) +{ + TNode result; + SetOperationIdParam(&result, operationId); + result["job_id"] = GetGuidAsString(jobId); + return result; +} + +TNode SerializeParamsForListJobs( + const TOperationId& operationId, + const TListJobsOptions& options) +{ + TNode result; + SetOperationIdParam(&result, operationId); + + if (options.Type_) { + result["type"] = ToString(*options.Type_); + } + if (options.State_) { + result["state"] = ToString(*options.State_); + } + if (options.Address_) { + result["address"] = *options.Address_; + } + if (options.WithStderr_) { + result["with_stderr"] = *options.WithStderr_; + } + if (options.WithSpec_) { + result["with_spec"] = *options.WithSpec_; + } + if (options.WithFailContext_) { + result["with_fail_context"] = *options.WithFailContext_; + } + + if (options.SortField_) { + result["sort_field"] = ToString(*options.SortField_); + } + if (options.SortOrder_) { + result["sort_order"] = ToString(*options.SortOrder_); + } + + if (options.Offset_) { + result["offset"] = *options.Offset_; + } + if (options.Limit_) { + result["limit"] = *options.Limit_; + } + + if (options.IncludeCypress_) { + result["include_cypress"] = *options.IncludeCypress_; + } + if (options.IncludeArchive_) { + result["include_archive"] = *options.IncludeArchive_; + } + if (options.IncludeControllerAgent_) { + result["include_controller_agent"] = *options.IncludeControllerAgent_; + } + return result; +} + +TNode SerializeParametersForInsertRows( + const TString& pathPrefix, + const TYPath& path, + const TInsertRowsOptions& options) +{ + TNode result; + SetPathParam(&result, pathPrefix, path); + if (options.Aggregate_) { + result["aggregate"] = *options.Aggregate_; + } + if (options.Update_) { + result["update"] = *options.Update_; + } + if (options.Atomicity_) { + result["atomicity"] = ToString(*options.Atomicity_); + } + if (options.Durability_) { + result["durability"] = ToString(*options.Durability_); + } + if (options.RequireSyncReplica_) { + result["require_sync_replica"] = *options.RequireSyncReplica_; + } + return result; +} + +TNode SerializeParametersForDeleteRows( + const TString& pathPrefix, + const TYPath& path, + const TDeleteRowsOptions& options) +{ + TNode result; + SetPathParam(&result, pathPrefix, path); + if (options.Atomicity_) { + result["atomicity"] = ToString(*options.Atomicity_); + } + if (options.Durability_) { + result["durability"] = ToString(*options.Durability_); + } + if (options.RequireSyncReplica_) { + result["require_sync_replica"] = *options.RequireSyncReplica_; + } + return result; +} + +TNode SerializeParametersForTrimRows( + const TString& pathPrefix, + const TYPath& path, + const TTrimRowsOptions& /* options*/) +{ + TNode result; + SetPathParam(&result, pathPrefix, path); + return result; +} + +TNode SerializeParamsForParseYPath(const TRichYPath& path) +{ + TNode result; + result["path"] = PathToNode(path); + return result; +} + +TNode SerializeParamsForEnableTableReplica( + const TReplicaId& replicaId) +{ + TNode result; + result["replica_id"] = GetGuidAsString(replicaId); + return result; +} + +TNode SerializeParamsForDisableTableReplica( + const TReplicaId& replicaId) +{ + TNode result; + result["replica_id"] = GetGuidAsString(replicaId); + return result; +} + +TNode SerializeParamsForAlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options) +{ + TNode result; + result["replica_id"] = GetGuidAsString(replicaId); + if (options.Enabled_) { + result["enabled"] = *options.Enabled_; + } + if (options.Mode_) { + result["mode"] = ToString(*options.Mode_); + } + return result; +} + +TNode SerializeParamsForFreezeTable( + const TString& pathPrefix, + const TYPath& path, + const TFreezeTableOptions& options) +{ + TNode result; + SetPathParam(&result, pathPrefix, path); + SetFirstLastTabletIndex(&result, options); + return result; +} + +TNode SerializeParamsForUnfreezeTable( + const TString& pathPrefix, + const TYPath& path, + const TUnfreezeTableOptions& options) +{ + TNode result; + SetPathParam(&result, pathPrefix, path); + SetFirstLastTabletIndex(&result, options); + return result; +} + +TNode SerializeParamsForAlterTable( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TAlterTableOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, path); + if (options.Dynamic_) { + result["dynamic"] = *options.Dynamic_; + } + if (options.Schema_) { + TNode schema; + { + TNodeBuilder builder(&schema); + Serialize(*options.Schema_, &builder); + } + result["schema"] = schema; + } + if (options.UpstreamReplicaId_) { + result["upstream_replica_id"] = GetGuidAsString(*options.UpstreamReplicaId_); + } + return result; +} + +TNode SerializeParamsForGetTableColumnarStatistics( + const TTransactionId& transactionId, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + for (const auto& path : paths) { + result["paths"].Add(PathToNode(path)); + } + if (options.FetcherMode_) { + result["fetcher_mode"] = ToString(*options.FetcherMode_); + } + return result; +} + +TNode SerializeParamsForGetTablePartitions( + const TTransactionId& transactionId, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + for (const auto& path : paths) { + result["paths"].Add(PathToNode(path)); + } + result["partition_mode"] = ToString(options.PartitionMode_); + result["data_weight_per_partition"] = options.DataWeightPerPartition_; + if (options.MaxPartitionCount_) { + result["max_partition_count"] = *options.MaxPartitionCount_; + } + result["adjust_data_weight_per_partition"] = options.AdjustDataWeightPerPartition_; + return result; +} + +TNode SerializeParamsForGetFileFromCache( + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions&) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["md5"] = md5Signature; + result["cache_path"] = cachePath; + return result; +} + +TNode SerializeParamsForPutFileToCache( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + SetPathParam(&result, pathPrefix, filePath); + result["md5"] = md5Signature; + result["cache_path"] = cachePath; + if (options.PreserveExpirationTimeout_) { + result["preserve_expiration_timeout"] = *options.PreserveExpirationTimeout_; + } + return result; +} + +TNode SerializeParamsForSkyShareTable( + const TString& serverName, + const TString& pathPrefix, + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options) +{ + TNode result; + + if (tablePaths.size() == 1) { + SetPathParam(&result, pathPrefix, tablePaths[0]); + } else { + auto pathList = TNode::CreateList(); + for (const auto& p : tablePaths) { + pathList.Add(AddPathPrefix(p, pathPrefix)); + } + result["paths"] = pathList; + } + result["cluster"] = serverName; + + if (options.KeyColumns_) { + auto keyColumnsList = TNode::CreateList(); + for (const auto& s : options.KeyColumns_->Parts_) { + if (s.empty()) { + continue; + } + keyColumnsList.Add(s); + } + result["key_columns"] = keyColumnsList; + } + + if (options.EnableFastbone_) { + result["enable_fastbone"] = *options.EnableFastbone_; + } + + return result; +} + +TNode SerializeParamsForCheckPermission( + const TString& user, + EPermission permission, + const TString& pathPrefix, + const TYPath& path, + const TCheckPermissionOptions& options) +{ + TNode result; + SetPathParam(&result, pathPrefix, path); + result["path"] = path; + result["user"] = user; + result["permission"] = ToString(permission); + if (!options.Columns_.empty()) { + result["columns"] = TNode::CreateList(); + result["columns"].AsList().assign(options.Columns_.begin(), options.Columns_.end()); + } + return result; +} + +TNode SerializeParamsForGetTabletInfos( + const TString& pathPrefix, + const TYPath& path, + const TVector<int>& tabletIndexes, + const TGetTabletInfosOptions& options) +{ + Y_UNUSED(options); + TNode result; + SetPathParam(&result, pathPrefix, path); + result["tablet_indexes"] = TNode::CreateList(); + result["tablet_indexes"].AsList().assign(tabletIndexes.begin(), tabletIndexes.end()); + return result; +} + +TNode SerializeParamsForAbortTransaction(const TTransactionId& transactionId) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + return result; +} + +TNode SerializeParamsForCommitTransaction(const TTransactionId& transactionId) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + return result; +} + +TNode SerializeParamsForStartTransaction( + const TTransactionId& parentTransactionId, + TDuration txTimeout, + const TStartTransactionOptions& options) +{ + TNode result; + + SetTransactionIdParam(&result, parentTransactionId); + result["timeout"] = static_cast<i64>((options.Timeout_.GetOrElse(txTimeout).MilliSeconds())); + if (options.Deadline_) { + result["deadline"] = ToString(options.Deadline_); + } + + if (options.PingAncestors_) { + result["ping_ancestor_transactions"] = true; + } + + if (options.Attributes_ && !options.Attributes_->IsMap()) { + ythrow TApiUsageError() << "Attributes must be a Map node"; + } + + auto attributes = options.Attributes_.GetOrElse(TNode::CreateMap()); + if (options.Title_) { + attributes["title"] = *options.Title_; + } else if (!attributes.HasKey("title")) { + attributes["title"] = GetDefaultTransactionTitle(); + } + result["attributes"] = attributes; + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail::NRawClient diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h new file mode 100644 index 0000000000..a60e3ea369 --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -0,0 +1,231 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/client_method_options.h> + +namespace NYT::NDetail::NRawClient { + +//////////////////////////////////////////////////////////////////// + +TNode SerializeParamsForCreate( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + ENodeType type, + const TCreateOptions& options); + +TNode SerializeParamsForRemove( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TRemoveOptions& options); + +TNode SerializeParamsForExists( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TExistsOptions& options); + +TNode SerializeParamsForGet( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TGetOptions& options); + +TNode SerializeParamsForSet( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TSetOptions& options); + +TNode SerializeParamsForMultisetAttributes( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TMultisetAttributesOptions& options); + +TNode SerializeParamsForList( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TListOptions& options); + +TNode SerializeParamsForCopy( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options); + +TNode SerializeParamsForMove( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options); + +TNode SerializeParamsForLink( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options); + +TNode SerializeParamsForLock( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + ELockMode mode, + const TLockOptions& options); + +TNode SerializeParamsForUnlock( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TUnlockOptions& options); + +TNode SerializeParamsForConcatenate( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TVector<TRichYPath>& sourcePaths, + const TRichYPath& destinationPath, + const TConcatenateOptions& options); + +TNode SerializeParamsForPingTx( + const TTransactionId& transactionId); + +TNode SerializeParamsForGetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options); + +TNode SerializeParamsForAbortOperation( + const TOperationId& operationId); + +TNode SerializeParamsForCompleteOperation( + const TOperationId& operationId); + +TNode SerializeParamsForSuspendOperation( + const TOperationId& operationId, + const TSuspendOperationOptions& options); + +TNode SerializeParamsForResumeOperation( + const TOperationId& operationId, + const TResumeOperationOptions& options); + +TNode SerializeParamsForListOperations( + const TListOperationsOptions& options); + +TNode SerializeParamsForUpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options); + +TNode SerializeParamsForGetJob( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobOptions& options); + +TNode SerializeParamsForListJobs( + const TOperationId& operationId, + const TListJobsOptions& options); + +TNode SerializeParametersForInsertRows( + const TString& pathPrefix, + const TYPath& path, + const TInsertRowsOptions& options); + +TNode SerializeParametersForDeleteRows( + const TString& pathPrefix, + const TYPath& path, + const TDeleteRowsOptions& options); + +TNode SerializeParametersForTrimRows( + const TString& pathPrefix, + const TYPath& path, + const TTrimRowsOptions& options); + +TNode SerializeParamsForParseYPath( + const TRichYPath& path); + +TNode SerializeParamsForEnableTableReplica( + const TReplicaId& replicaId); + +TNode SerializeParamsForDisableTableReplica( + const TReplicaId& replicaId); + +TNode SerializeParamsForAlterTableReplica( + const TReplicaId& replicaId, + const TAlterTableReplicaOptions& options); + +TNode SerializeParamsForFreezeTable( + const TString& pathPrefix, + const TYPath& path, + const TFreezeTableOptions& options); + +TNode SerializeParamsForUnfreezeTable( + const TString& pathPrefix, + const TYPath& path, + const TUnfreezeTableOptions& options); + +TNode SerializeParamsForAlterTable( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& path, + const TAlterTableOptions& options); + +TNode SerializeParamsForGetTableColumnarStatistics( + const TTransactionId& transactionId, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options); + +TNode SerializeParamsForGetTablePartitions( + const TTransactionId& transactionId, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options); + +TNode SerializeParamsForGetFileFromCache( + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions&); + +TNode SerializeParamsForPutFileToCache( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options); + +TNode SerializeParamsForSkyShareTable( + const TString& serverName, + const TString& pathPrefix, + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options); + +TNode SerializeParamsForCheckPermission( + const TString& user, + EPermission permission, + const TString& pathPrefix, + const TYPath& path, + const TCheckPermissionOptions& options); + +TNode SerializeParamsForGetTabletInfos( + const TString& pathPrefix, + const TYPath& path, + const TVector<int>& tabletIndexes, + const TGetTabletInfosOptions& options); + +TNode SerializeParamsForAbortTransaction( + const TTransactionId& transactionId); + +TNode SerializeParamsForCommitTransaction( + const TTransactionId& transactionId); + +TNode SerializeParamsForStartTransaction( + const TTransactionId& parentTransactionId, + TDuration txTimeout, + const TStartTransactionOptions& options); + +//////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail::NRawClient diff --git a/yt/cpp/mapreduce/raw_client/ya.make b/yt/cpp/mapreduce/raw_client/ya.make new file mode 100644 index 0000000000..0d03aae80c --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +SRCS( + raw_batch_request.cpp + raw_requests.cpp + rpc_parameters_serialization.cpp +) + +PEERDIR( + yt/cpp/mapreduce/common + yt/cpp/mapreduce/http + yt/cpp/mapreduce/interface + yt/cpp/mapreduce/interface/logging + library/cpp/yson/node +) + +END() |