path: root/yt/cpp/mapreduce/http_client/raw_batch_request.cpp
diff options
authorhiddenpath <hiddenpath@yandex-team.com>2025-01-22 08:47:22 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-01-22 09:04:11 +0300
commit044fc00c5520ec73b6146427ce9f1cf80ec6a95f (patch)
tree6d8b56e510374542ad49e5588c25d95701d7cf02 /yt/cpp/mapreduce/http_client/raw_batch_request.cpp
parent8f9ae59afa6108d373d287e973a7597c0a89143e (diff)
YT-23616: Rename raw_client to http_client
Diffstat (limited to 'yt/cpp/mapreduce/http_client/raw_batch_request.cpp')
1 files changed, 736 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http_client/raw_batch_request.cpp b/yt/cpp/mapreduce/http_client/raw_batch_request.cpp
new file mode 100644
index 0000000000..08fee25b79
--- /dev/null
+++ b/yt/cpp/mapreduce/http_client/raw_batch_request.cpp
@@ -0,0 +1,736 @@
+#include "raw_batch_request.h"
+#include "raw_requests.h"
+#include "rpc_parameters_serialization.h"
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
+#include <library/cpp/yson/node/node.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <util/generic/guid.h>
+#include <util/generic/scope.h>
+#include <util/string/builder.h>
+#include <exception>
+namespace NYT::NDetail::NRawClient {
+using NThreading::TFuture;
+using NThreading::TPromise;
+using NThreading::NewPromise;
+static TString RequestInfo(const TNode& request)
+ return ::TStringBuilder()
+ << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
+static void EnsureNothing(const TMaybe<TNode>& node)
+ Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
+static void EnsureSomething(const TMaybe<TNode>& node)
+ Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
+static void EnsureType(const TNode& node, TNode::EType type)
+ Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
+ << "Expected: " << type << ", actual: " << node.GetType());
+static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
+ Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
+ EnsureType(*node, type);
+template <typename TReturnType>
+class TResponseParserBase
+ : public THttpRawBatchRequest::IResponseItemParser
+ using TFutureResult = TFuture<TReturnType>;
+ TResponseParserBase()
+ : Result_(NewPromise<TReturnType>())
+ { }
+ void SetException(std::exception_ptr e) override
+ {
+ Result_.SetException(std::move(e));
+ }
+ TFuture<TReturnType> GetFuture()
+ {
+ return Result_.GetFuture();
+ }
+ TPromise<TReturnType> Result_;
+class TGetResponseParser
+ : public TResponseParserBase<TNode>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureSomething(node);
+ Result_.SetValue(std::move(*node));
+ }
+class TVoidResponseParser
+ : public TResponseParserBase<void>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureNothing(node);
+ Result_.SetValue();
+ }
+class TListResponseParser
+ : public TResponseParserBase<TNode::TListType>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::List);
+ Result_.SetValue(std::move(node->AsList()));
+ }
+class TExistsResponseParser
+ : public TResponseParserBase<bool>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Bool);
+ Result_.SetValue(std::move(node->AsBool()));
+ }
+class TGuidResponseParser
+ : public TResponseParserBase<TGUID>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ Result_.SetValue(GetGuid(node->AsString()));
+ }
+class TCanonizeYPathResponseParser
+ : public TResponseParserBase<TRichYPath>
+ explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
+ : OriginalNode_(PathToNode(original))
+ , PathPrefix_(std::move(pathPrefix))
+ { }
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
+ node->Attributes()[item.first] = item.second;
+ }
+ TRichYPath result;
+ Deserialize(result, *node);
+ result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
+ Result_.SetValue(result);
+ }
+ TNode OriginalNode_;
+ TString PathPrefix_;
+class TGetOperationResponseParser
+ : public TResponseParserBase<TOperationAttributes>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ Result_.SetValue(ParseOperationAttributes(*node));
+ }
+class TTableColumnarStatisticsParser
+ : public TResponseParserBase<TVector<TTableColumnarStatistics>>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::List);
+ TVector<TTableColumnarStatistics> statistics;
+ Deserialize(statistics, *node);
+ Result_.SetValue(std::move(statistics));
+ }
+class TTablePartitionsParser
+ : public TResponseParserBase<TMultiTablePartitions>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ TMultiTablePartitions partitions;
+ Deserialize(partitions, *node);
+ Result_.SetValue(std::move(partitions));
+ }
+class TGetFileFromCacheParser
+ : public TResponseParserBase<TMaybe<TYPath>>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ if (node->AsString().empty()) {
+ Result_.SetValue(Nothing());
+ } else {
+ Result_.SetValue(node->AsString());
+ }
+ }
+class TYPathParser
+ : public TResponseParserBase<TYPath>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ Result_.SetValue(node->AsString());
+ }
+class TCheckPermissionParser
+ : public TResponseParserBase<TCheckPermissionResponse>
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ Result_.SetValue(ParseCheckPermissionResponse(*node));
+ }
+THttpRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
+ : Parameters(std::move(parameters))
+ , ResponseParser(std::move(responseParser))
+ , NextTry()
+{ }
+THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
+ : Parameters(batchItem.Parameters)
+ , ResponseParser(batchItem.ResponseParser)
+ , NextTry(nextTry)
+{ }
+THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy)
+ : Context_(context)
+ , RequestRetryPolicy_(std::move(retryPolicy))
+{ }
+THttpRawBatchRequest::~THttpRawBatchRequest() = default;
+void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
+ if (IsExecuted()) {
+ ythrow yexception() << "Cannot execute batch request since it is already executed";
+ }
+ MarkExecuted();
+ };
+ const auto concurrency = options.Concurrency_.GetOrElse(50);
+ const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
+ if (!RequestRetryPolicy_) {
+ RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config);
+ }
+ while (BatchSize()) {
+ auto parameters = TNode::CreateMap();
+ TInstant nextTry;
+ FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
+ if (nextTry) {
+ SleepUntil(nextTry);
+ }
+ parameters["concurrency"] = concurrency;
+ auto body = NodeToYsonString(parameters);
+ THttpHeader header("POST", "execute_batch");
+ header.AddMutationId();
+ TResponseInfo result;
+ try {
+ result = RequestWithRetry<TResponseInfo>(
+ RequestRetryPolicy_,
+ [this, &header, &body] (TMutationId& mutationId) {
+ auto response = RequestWithoutRetry(Context_, mutationId, header, body);
+ return TResponseInfo{
+ .RequestId = response->GetRequestId(),
+ .Response = response->GetResponse(),
+ .HttpCode = response->GetStatusCode(),
+ };
+ });
+ } catch (const std::exception& e) {
+ SetErrorResult(std::current_exception());
+ throw;
+ }
+ ParseResponse(std::move(result), RequestRetryPolicy_.Get());
+ }
+bool THttpRawBatchRequest::IsExecuted() const
+ return Executed_;
+void THttpRawBatchRequest::MarkExecuted()
+ Executed_ = true;
+template <typename TResponseParser>
+typename TResponseParser::TFutureResult THttpRawBatchRequest::AddRequest(
+ const TString& command,
+ TNode parameters,
+ TMaybe<TNode> input)
+ return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
+template <typename TResponseParser>
+typename TResponseParser::TFutureResult THttpRawBatchRequest::AddRequest(
+ const TString& command,
+ TNode parameters,
+ TMaybe<TNode> input,
+ ::TIntrusivePtr<TResponseParser> parser)
+ Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
+ TNode request;
+ request["command"] = command;
+ request["parameters"] = std::move(parameters);
+ if (input) {
+ request["input"] = std::move(*input);
+ }
+ BatchItemList_.emplace_back(std::move(request), parser);
+ return parser->GetFuture();
+void THttpRawBatchRequest::AddRequest(TBatchItem batchItem)
+ Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
+ BatchItemList_.push_back(batchItem);
+TFuture<TNodeId> THttpRawBatchRequest::Create(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ENodeType type,
+ const TCreateOptions& options)
+ return AddRequest<TGuidResponseParser>(
+ "create",
+ SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options),
+ Nothing());
+TFuture<void> THttpRawBatchRequest::Remove(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TRemoveOptions& options)
+ return AddRequest<TVoidResponseParser>(
+ "remove",
+ SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options),
+ Nothing());
+TFuture<bool> THttpRawBatchRequest::Exists(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TExistsOptions& options)
+ return AddRequest<TExistsResponseParser>(
+ "exists",
+ SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options),
+ Nothing());
+TFuture<TNode> THttpRawBatchRequest::Get(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TGetOptions& options)
+ return AddRequest<TGetResponseParser>(
+ "get",
+ SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options),
+ Nothing());
+TFuture<void> THttpRawBatchRequest::Set(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TNode& node,
+ const TSetOptions& options)
+ return AddRequest<TVoidResponseParser>(
+ "set",
+ SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options),
+ node);
+TFuture<TNode::TListType> THttpRawBatchRequest::List(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TListOptions& options)
+ return AddRequest<TListResponseParser>(
+ "list",
+ SerializeParamsForList(transaction, Context_.Config->Prefix, path, options),
+ Nothing());
+TFuture<TNodeId> THttpRawBatchRequest::Copy(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options)
+ return AddRequest<TGuidResponseParser>(
+ "copy",
+ SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
+ Nothing());
+TFuture<TNodeId> THttpRawBatchRequest::Move(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options)
+ return AddRequest<TGuidResponseParser>(
+ "move",
+ SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
+ Nothing());
+TFuture<TNodeId> THttpRawBatchRequest::Link(
+ const TTransactionId& transaction,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options)
+ return AddRequest<TGuidResponseParser>(
+ "link",
+ SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options),
+ Nothing());
+TFuture<TLockId> THttpRawBatchRequest::Lock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options)
+ return AddRequest<TGuidResponseParser>(
+ "lock",
+ SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options),
+ Nothing());
+TFuture<void> THttpRawBatchRequest::Unlock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TUnlockOptions& options)
+ return AddRequest<TVoidResponseParser>(
+ "unlock",
+ SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options),
+ Nothing());
+TFuture<TMaybe<TYPath>> THttpRawBatchRequest::GetFileFromCache(
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions& options)
+ return AddRequest<TGetFileFromCacheParser>(
+ "get_file_from_cache",
+ SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
+ Nothing());
+TFuture<TYPath> THttpRawBatchRequest::PutFileToCache(
+ const TTransactionId& transactionId,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options)
+ return AddRequest<TYPathParser>(
+ "put_file_to_cache",
+ SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options),
+ Nothing());
+TFuture<TCheckPermissionResponse> THttpRawBatchRequest::CheckPermission(
+ const TString& user,
+ EPermission permission,
+ const TYPath& path,
+ const TCheckPermissionOptions& options)
+ return AddRequest<TCheckPermissionParser>(
+ "check_permission",
+ SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options),
+ Nothing());
+TFuture<TOperationAttributes> THttpRawBatchRequest::GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options)
+ return AddRequest<TGetOperationResponseParser>(
+ "get_operation",
+ SerializeParamsForGetOperation(operationId, options),
+ Nothing());
+TFuture<void> THttpRawBatchRequest::AbortOperation(const TOperationId& operationId)
+ return AddRequest<TVoidResponseParser>(
+ "abort_op",
+ SerializeParamsForAbortOperation(operationId),
+ Nothing());
+TFuture<void> THttpRawBatchRequest::CompleteOperation(const TOperationId& operationId)
+ return AddRequest<TVoidResponseParser>(
+ "complete_op",
+ SerializeParamsForCompleteOperation(operationId),
+ Nothing());
+TFuture<void> THttpRawBatchRequest::SuspendOperation(
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options)
+ return AddRequest<TVoidResponseParser>(
+ "suspend_operation",
+ SerializeParamsForSuspendOperation(operationId, options),
+ Nothing());
+TFuture<void> THttpRawBatchRequest::ResumeOperation(
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options)
+ return AddRequest<TVoidResponseParser>(
+ "resume_operation",
+ SerializeParamsForResumeOperation(operationId, options),
+ Nothing());
+TFuture<void> THttpRawBatchRequest::UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options)
+ return AddRequest<TVoidResponseParser>(
+ "update_op_parameters",
+ SerializeParamsForUpdateOperationParameters(operationId, options),
+ Nothing());
+TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path)
+ TRichYPath result = path;
+ // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath.
+ if (!result.Path_.StartsWith("<")) {
+ result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix);
+ }
+ if (result.Path_.find_first_of("<>{}[]") != TString::npos) {
+ return AddRequest<TCanonizeYPathResponseParser>(
+ "parse_ypath",
+ SerializeParamsForParseYPath(result),
+ Nothing(),
+ MakeIntrusive<TCanonizeYPathResponseParser>(Context_.Config->Prefix, result));
+ }
+ return NThreading::MakeFuture(result);
+TFuture<TVector<TTableColumnarStatistics>> THttpRawBatchRequest::GetTableColumnarStatistics(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options)
+ return AddRequest<TTableColumnarStatisticsParser>(
+ "get_table_columnar_statistics",
+ SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
+ Nothing());
+TFuture<TMultiTablePartitions> THttpRawBatchRequest::GetTablePartitions(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options)
+ return AddRequest<TTablePartitionsParser>(
+ "partition_tables",
+ SerializeParamsForGetTablePartitions(transaction, paths, options),
+ Nothing());
+void THttpRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
+ Y_ABORT_UNLESS(result);
+ Y_ABORT_UNLESS(nextTry);
+ *nextTry = TInstant();
+ maxSize = Min(maxSize, BatchItemList_.size());
+ *result = TNode::CreateList();
+ for (size_t i = 0; i < maxSize; ++i) {
+ YT_LOG_DEBUG("ExecuteBatch preparing: %v",
+ RequestInfo(BatchItemList_[i].Parameters));
+ result->Add(BatchItemList_[i].Parameters);
+ if (BatchItemList_[i].NextTry > *nextTry) {
+ *nextTry = BatchItemList_[i].NextTry;
+ }
+ }
+void THttpRawBatchRequest::ParseResponse(
+ const TResponseInfo& requestResult,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ TInstant now)
+ TNode node = NodeFromYsonString(requestResult.Response);
+ return ParseResponse(node, requestResult.RequestId, retryPolicy, now);
+void THttpRawBatchRequest::ParseResponse(
+ TNode node,
+ const TString& requestId,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ TInstant now)
+ EnsureType(node, TNode::List);
+ auto& responseList = node.AsList();
+ const auto size = responseList.size();
+ Y_ENSURE(size <= BatchItemList_.size(),
+ "Size of server response exceeds size of batch request;"
+ " size of batch: " << BatchItemList_.size() <<
+ " size of server response: " << size << '.');
+ for (size_t i = 0; i != size; ++i) {
+ try {
+ EnsureType(responseList[i], TNode::Map);
+ auto& responseNode = responseList[i].AsMap();
+ const auto outputIt = responseNode.find("output");
+ if (outputIt != responseNode.end()) {
+ BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
+ } else {
+ const auto errorIt = responseNode.find("error");
+ if (errorIt == responseNode.end()) {
+ BatchItemList_[i].ResponseParser->SetResponse(Nothing());
+ } else {
+ TErrorResponse error(400, requestId);
+ error.SetError(TYtError(errorIt->second));
+ if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
+ "Batch subrequest (%s) failed, will retry, error: %s",
+ RequestInfo(BatchItemList_[i].Parameters),
+ error.what());
+ AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
+ } else {
+ "Batch subrequest (%s) failed, error: %s",
+ RequestInfo(BatchItemList_[i].Parameters),
+ error.what());
+ BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
+ }
+ }
+ }
+ } catch (const std::exception& e) {
+ // We don't expect other exceptions, so we don't catch (...)
+ BatchItemList_[i].ResponseParser->SetException(std::current_exception());
+ }
+ }
+ BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
+void THttpRawBatchRequest::SetErrorResult(std::exception_ptr e) const
+ for (const auto& batchItem : BatchItemList_) {
+ batchItem.ResponseParser->SetException(e);
+ }
+size_t THttpRawBatchRequest::BatchSize() const
+ return BatchItemList_.size();
+} // namespace NYT::NDetail::NRawClient