summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 11:13:34 +0300
committermax42 <[email protected]>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_batch_request.cpp')
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.cpp687
1 files changed, 0 insertions, 687 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
deleted file mode 100644
index be81f5a21a9..00000000000
--- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
+++ /dev/null
@@ -1,687 +0,0 @@
-#include "raw_batch_request.h"
-
-#include "raw_requests.h"
-#include "rpc_parameters_serialization.h"
-
-#include <yt/cpp/mapreduce/common/helpers.h>
-#include <yt/cpp/mapreduce/common/retry_lib.h>
-
-#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
-
-#include <yt/cpp/mapreduce/interface/client.h>
-#include <yt/cpp/mapreduce/interface/errors.h>
-#include <yt/cpp/mapreduce/interface/serialize.h>
-
-#include <library/cpp/yson/node/node.h>
-
-#include <yt/cpp/mapreduce/http/context.h>
-#include <yt/cpp/mapreduce/http/retry_request.h>
-
-#include <util/generic/guid.h>
-#include <util/string/builder.h>
-
-#include <exception>
-
-namespace NYT::NDetail::NRawClient {
-
-using NThreading::TFuture;
-using NThreading::TPromise;
-using NThreading::NewPromise;
-
-////////////////////////////////////////////////////////////////////
-
-static TString RequestInfo(const TNode& request)
-{
- return ::TStringBuilder()
- << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
-}
-
-static void EnsureNothing(const TMaybe<TNode>& node)
-{
- Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
-}
-
-static void EnsureSomething(const TMaybe<TNode>& node)
-{
- Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
-}
-
-static void EnsureType(const TNode& node, TNode::EType type)
-{
- Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
- << "Expected: " << type << ", actual: " << node.GetType());
-}
-
-static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
-{
- Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
- EnsureType(*node, type);
-}
-
-////////////////////////////////////////////////////////////////////
-
-template <typename TReturnType>
-class TResponseParserBase
- : public TRawBatchRequest::IResponseItemParser
-{
-public:
- using TFutureResult = TFuture<TReturnType>;
-
-public:
- TResponseParserBase()
- : Result(NewPromise<TReturnType>())
- { }
-
- void SetException(std::exception_ptr e) override
- {
- Result.SetException(std::move(e));
- }
-
- TFuture<TReturnType> GetFuture()
- {
- return Result.GetFuture();
- }
-
-protected:
- TPromise<TReturnType> Result;
-};
-
-////////////////////////////////////////////////////////////////////
-
-
-class TGetResponseParser
- : public TResponseParserBase<TNode>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureSomething(node);
- Result.SetValue(std::move(*node));
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TVoidResponseParser
- : public TResponseParserBase<void>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureNothing(node);
- Result.SetValue();
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TListResponseParser
- : public TResponseParserBase<TNode::TListType>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::List);
- Result.SetValue(std::move(node->AsList()));
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TExistsResponseParser
- : public TResponseParserBase<bool>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Bool);
- Result.SetValue(std::move(node->AsBool()));
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TGuidResponseParser
- : public TResponseParserBase<TGUID>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::String);
- Result.SetValue(GetGuid(node->AsString()));
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TCanonizeYPathResponseParser
- : public TResponseParserBase<TRichYPath>
-{
-public:
- explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
- : OriginalNode_(PathToNode(original))
- , PathPrefix_(std::move(pathPrefix))
- { }
-
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::String);
-
- for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
- node->Attributes()[item.first] = item.second;
- }
- TRichYPath result;
- Deserialize(result, *node);
- result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
- Result.SetValue(result);
- }
-
-private:
- TNode OriginalNode_;
- TString PathPrefix_;
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TGetOperationResponseParser
- : public TResponseParserBase<TOperationAttributes>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Map);
- Result.SetValue(ParseOperationAttributes(*node));
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TTableColumnarStatisticsParser
- : public TResponseParserBase<TVector<TTableColumnarStatistics>>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Map);
- TVector<TTableColumnarStatistics> statistics;
- Deserialize(statistics, *node);
- Result.SetValue(std::move(statistics));
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TTablePartitionsParser
- : public TResponseParserBase<TMultiTablePartitions>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Map);
- TMultiTablePartitions partitions;
- Deserialize(partitions, *node);
- Result.SetValue(std::move(partitions));
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TGetFileFromCacheParser
- : public TResponseParserBase<TMaybe<TYPath>>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::String);
- if (node->AsString().empty()) {
- Result.SetValue(Nothing());
- } else {
- Result.SetValue(node->AsString());
- }
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TYPathParser
- : public TResponseParserBase<TYPath>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::String);
- Result.SetValue(node->AsString());
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-class TCheckPermissionParser
- : public TResponseParserBase<TCheckPermissionResponse>
-{
-public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Map);
- Result.SetValue(ParseCheckPermissionResponse(*node));
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
- : Parameters(std::move(parameters))
- , ResponseParser(std::move(responseParser))
- , NextTry()
-{ }
-
-TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
- : Parameters(batchItem.Parameters)
- , ResponseParser(batchItem.ResponseParser)
- , NextTry(nextTry)
-{ }
-
-////////////////////////////////////////////////////////////////////
-
-TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config)
- : Config_(config)
-{ }
-
-TRawBatchRequest::~TRawBatchRequest() = default;
-
-bool TRawBatchRequest::IsExecuted() const
-{
- return Executed_;
-}
-
-void TRawBatchRequest::MarkExecuted()
-{
- Executed_ = true;
-}
-
-template <typename TResponseParser>
-typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
- const TString& command,
- TNode parameters,
- TMaybe<TNode> input)
-{
- return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
-}
-
-template <typename TResponseParser>
-typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
- const TString& command,
- TNode parameters,
- TMaybe<TNode> input,
- ::TIntrusivePtr<TResponseParser> parser)
-{
- Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
- TNode request;
- request["command"] = command;
- request["parameters"] = std::move(parameters);
- if (input) {
- request["input"] = std::move(*input);
- }
- BatchItemList_.emplace_back(std::move(request), parser);
- return parser->GetFuture();
-}
-
-void TRawBatchRequest::AddRequest(TBatchItem batchItem)
-{
- Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
- BatchItemList_.push_back(batchItem);
-}
-
-TFuture<TNodeId> TRawBatchRequest::Create(
- const TTransactionId& transaction,
- const TYPath& path,
- ENodeType type,
- const TCreateOptions& options)
-{
- return AddRequest<TGuidResponseParser>(
- "create",
- SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options),
- Nothing());
-}
-
-TFuture<void> TRawBatchRequest::Remove(
- const TTransactionId& transaction,
- const TYPath& path,
- const TRemoveOptions& options)
-{
- return AddRequest<TVoidResponseParser>(
- "remove",
- SerializeParamsForRemove(transaction, Config_->Prefix, path, options),
- Nothing());
-}
-
-TFuture<bool> TRawBatchRequest::Exists(
- const TTransactionId& transaction,
- const TYPath& path,
- const TExistsOptions& options)
-{
- return AddRequest<TExistsResponseParser>(
- "exists",
- SerializeParamsForExists(transaction, Config_->Prefix, path, options),
- Nothing());
-}
-
-TFuture<TNode> TRawBatchRequest::Get(
- const TTransactionId& transaction,
- const TYPath& path,
- const TGetOptions& options)
-{
- return AddRequest<TGetResponseParser>(
- "get",
- SerializeParamsForGet(transaction, Config_->Prefix, path, options),
- Nothing());
-}
-
-TFuture<void> TRawBatchRequest::Set(
- const TTransactionId& transaction,
- const TYPath& path,
- const TNode& node,
- const TSetOptions& options)
-{
- return AddRequest<TVoidResponseParser>(
- "set",
- SerializeParamsForSet(transaction, Config_->Prefix, path, options),
- node);
-}
-
-TFuture<TNode::TListType> TRawBatchRequest::List(
- const TTransactionId& transaction,
- const TYPath& path,
- const TListOptions& options)
-{
- return AddRequest<TListResponseParser>(
- "list",
- SerializeParamsForList(transaction, Config_->Prefix, path, options),
- Nothing());
-}
-
-TFuture<TNodeId> TRawBatchRequest::Copy(
- const TTransactionId& transaction,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TCopyOptions& options)
-{
- return AddRequest<TGuidResponseParser>(
- "copy",
- SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options),
- Nothing());
-}
-
-TFuture<TNodeId> TRawBatchRequest::Move(
- const TTransactionId& transaction,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TMoveOptions& options)
-{
- return AddRequest<TGuidResponseParser>(
- "move",
- SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options),
- Nothing());
-}
-
-TFuture<TNodeId> TRawBatchRequest::Link(
- const TTransactionId& transaction,
- const TYPath& targetPath,
- const TYPath& linkPath,
- const TLinkOptions& options)
-{
- return AddRequest<TGuidResponseParser>(
- "link",
- SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options),
- Nothing());
-}
-
-TFuture<TLockId> TRawBatchRequest::Lock(
- const TTransactionId& transaction,
- const TYPath& path,
- ELockMode mode,
- const TLockOptions& options)
-{
- return AddRequest<TGuidResponseParser>(
- "lock",
- SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options),
- Nothing());
-}
-
-TFuture<void> TRawBatchRequest::Unlock(
- const TTransactionId& transaction,
- const TYPath& path,
- const TUnlockOptions& options)
-{
- return AddRequest<TVoidResponseParser>(
- "unlock",
- SerializeParamsForUnlock(transaction, Config_->Prefix, path, options),
- Nothing());
-}
-
-TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache(
- const TTransactionId& transactionId,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TGetFileFromCacheOptions& options)
-{
- return AddRequest<TGetFileFromCacheParser>(
- "get_file_from_cache",
- SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
- Nothing());
-}
-
-TFuture<TYPath> TRawBatchRequest::PutFileToCache(
- const TTransactionId& transactionId,
- const TYPath& filePath,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TPutFileToCacheOptions& options)
-{
- return AddRequest<TYPathParser>(
- "put_file_to_cache",
- SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options),
- Nothing());
-}
-
-TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission(
- const TString& user,
- EPermission permission,
- const TYPath& path,
- const TCheckPermissionOptions& options)
-{
- return AddRequest<TCheckPermissionParser>(
- "check_permission",
- SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options),
- Nothing());
-}
-
-TFuture<TOperationAttributes> TRawBatchRequest::GetOperation(
- const TOperationId& operationId,
- const TGetOperationOptions& options)
-{
- return AddRequest<TGetOperationResponseParser>(
- "get_operation",
- SerializeParamsForGetOperation(operationId, options),
- Nothing());
-}
-
-TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId)
-{
- return AddRequest<TVoidResponseParser>(
- "abort_op",
- SerializeParamsForAbortOperation(operationId),
- Nothing());
-}
-
-TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId)
-{
- return AddRequest<TVoidResponseParser>(
- "complete_op",
- SerializeParamsForCompleteOperation(operationId),
- Nothing());
-}
-TFuture<void> TRawBatchRequest::SuspendOperation(
- const TOperationId& operationId,
- const TSuspendOperationOptions& options)
-{
- return AddRequest<TVoidResponseParser>(
- "suspend_operation",
- SerializeParamsForSuspendOperation(operationId, options),
- Nothing());
-}
-TFuture<void> TRawBatchRequest::ResumeOperation(
- const TOperationId& operationId,
- const TResumeOperationOptions& options)
-{
- return AddRequest<TVoidResponseParser>(
- "resume_operation",
- SerializeParamsForResumeOperation(operationId, options),
- Nothing());
-}
-
-TFuture<void> TRawBatchRequest::UpdateOperationParameters(
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options)
-{
- return AddRequest<TVoidResponseParser>(
- "update_op_parameters",
- SerializeParamsForUpdateOperationParameters(operationId, options),
- Nothing());
-}
-
-TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path)
-{
- if (path.Path_.find_first_of("<>{}[]") != TString::npos) {
- return AddRequest<TCanonizeYPathResponseParser>(
- "parse_ypath",
- SerializeParamsForParseYPath(path),
- Nothing(),
- MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, path));
- } else {
- TRichYPath result = path;
- result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix);
- return NThreading::MakeFuture(result);
- }
-}
-
-TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics(
- const TTransactionId& transaction,
- const TVector<TRichYPath>& paths,
- const TGetTableColumnarStatisticsOptions& options)
-{
- return AddRequest<TTableColumnarStatisticsParser>(
- "get_table_columnar_statistics",
- SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
- Nothing());
-}
-
-TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions(
- const TTransactionId& transaction,
- const TVector<TRichYPath>& paths,
- const TGetTablePartitionsOptions& options)
-{
- return AddRequest<TTablePartitionsParser>(
- "partition_tables",
- SerializeParamsForGetTablePartitions(transaction, paths, options),
- Nothing());
-}
-
-void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
-{
- Y_VERIFY(result);
- Y_VERIFY(nextTry);
-
- *nextTry = TInstant();
- maxSize = Min(maxSize, BatchItemList_.size());
- *result = TNode::CreateList();
- for (size_t i = 0; i < maxSize; ++i) {
- YT_LOG_DEBUG("ExecuteBatch preparing: %v",
- RequestInfo(BatchItemList_[i].Parameters));
-
- result->Add(BatchItemList_[i].Parameters);
- if (BatchItemList_[i].NextTry > *nextTry) {
- *nextTry = BatchItemList_[i].NextTry;
- }
- }
-}
-
-void TRawBatchRequest::ParseResponse(
- const TResponseInfo& requestResult,
- const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
- TInstant now)
-{
- TNode node = NodeFromYsonString(requestResult.Response);
- return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now);
-}
-
-void TRawBatchRequest::ParseResponse(
- TNode node,
- const TString& requestId,
- const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
- TInstant now)
-{
- Y_VERIFY(retryBatch);
-
- EnsureType(node, TNode::List);
- auto& responseList = node.AsList();
- const auto size = responseList.size();
- Y_ENSURE(size <= BatchItemList_.size(),
- "Size of server response exceeds size of batch request;"
- " size of batch: " << BatchItemList_.size() <<
- " size of server response: " << size << '.');
-
- for (size_t i = 0; i != size; ++i) {
- try {
- EnsureType(responseList[i], TNode::Map);
- auto& responseNode = responseList[i].AsMap();
- const auto outputIt = responseNode.find("output");
- if (outputIt != responseNode.end()) {
- BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
- } else {
- const auto errorIt = responseNode.find("error");
- if (errorIt == responseNode.end()) {
- BatchItemList_[i].ResponseParser->SetResponse(Nothing());
- } else {
- TErrorResponse error(400, requestId);
- error.SetError(TYtError(errorIt->second));
- if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
- YT_LOG_INFO(
- "Batch subrequest (%s) failed, will retry, error: %s",
- RequestInfo(BatchItemList_[i].Parameters),
- error.what());
- retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
- } else {
- YT_LOG_ERROR(
- "Batch subrequest (%s) failed, error: %s",
- RequestInfo(BatchItemList_[i].Parameters),
- error.what());
- BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
- }
- }
- }
- } catch (const std::exception& e) {
- // We don't expect other exceptions, so we don't catch (...)
- BatchItemList_[i].ResponseParser->SetException(std::current_exception());
- }
- }
- BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
-}
-
-void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const
-{
- for (const auto& batchItem : BatchItemList_) {
- batchItem.ResponseParser->SetException(e);
- }
-}
-
-size_t TRawBatchRequest::BatchSize() const
-{
- return BatchItemList_.size();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NDetail::NRawClient