aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_batch_request.cpp')
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.cpp687
1 files changed, 687 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
new file mode 100644
index 0000000000..be81f5a21a
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
@@ -0,0 +1,687 @@
+#include "raw_batch_request.h"
+
+#include "raw_requests.h"
+#include "rpc_parameters_serialization.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
+
+#include <library/cpp/yson/node/node.h>
+
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <util/generic/guid.h>
+#include <util/string/builder.h>
+
+#include <exception>
+
+namespace NYT::NDetail::NRawClient {
+
+using NThreading::TFuture;
+using NThreading::TPromise;
+using NThreading::NewPromise;
+
+////////////////////////////////////////////////////////////////////
+
+static TString RequestInfo(const TNode& request)
+{
+ return ::TStringBuilder()
+ << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
+}
+
+static void EnsureNothing(const TMaybe<TNode>& node)
+{
+ Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
+}
+
+static void EnsureSomething(const TMaybe<TNode>& node)
+{
+ Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
+}
+
+static void EnsureType(const TNode& node, TNode::EType type)
+{
+ Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
+ << "Expected: " << type << ", actual: " << node.GetType());
+}
+
+static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
+{
+ Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
+ EnsureType(*node, type);
+}
+
+////////////////////////////////////////////////////////////////////
+
+template <typename TReturnType>
+class TResponseParserBase
+ : public TRawBatchRequest::IResponseItemParser
+{
+public:
+ using TFutureResult = TFuture<TReturnType>;
+
+public:
+ TResponseParserBase()
+ : Result(NewPromise<TReturnType>())
+ { }
+
+ void SetException(std::exception_ptr e) override
+ {
+ Result.SetException(std::move(e));
+ }
+
+ TFuture<TReturnType> GetFuture()
+ {
+ return Result.GetFuture();
+ }
+
+protected:
+ TPromise<TReturnType> Result;
+};
+
+////////////////////////////////////////////////////////////////////
+
+
+class TGetResponseParser
+ : public TResponseParserBase<TNode>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureSomething(node);
+ Result.SetValue(std::move(*node));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TVoidResponseParser
+ : public TResponseParserBase<void>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureNothing(node);
+ Result.SetValue();
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TListResponseParser
+ : public TResponseParserBase<TNode::TListType>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::List);
+ Result.SetValue(std::move(node->AsList()));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TExistsResponseParser
+ : public TResponseParserBase<bool>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Bool);
+ Result.SetValue(std::move(node->AsBool()));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TGuidResponseParser
+ : public TResponseParserBase<TGUID>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ Result.SetValue(GetGuid(node->AsString()));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TCanonizeYPathResponseParser
+ : public TResponseParserBase<TRichYPath>
+{
+public:
+ explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
+ : OriginalNode_(PathToNode(original))
+ , PathPrefix_(std::move(pathPrefix))
+ { }
+
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+
+ for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
+ node->Attributes()[item.first] = item.second;
+ }
+ TRichYPath result;
+ Deserialize(result, *node);
+ result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
+ Result.SetValue(result);
+ }
+
+private:
+ TNode OriginalNode_;
+ TString PathPrefix_;
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TGetOperationResponseParser
+ : public TResponseParserBase<TOperationAttributes>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ Result.SetValue(ParseOperationAttributes(*node));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TTableColumnarStatisticsParser
+ : public TResponseParserBase<TVector<TTableColumnarStatistics>>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ TVector<TTableColumnarStatistics> statistics;
+ Deserialize(statistics, *node);
+ Result.SetValue(std::move(statistics));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TTablePartitionsParser
+ : public TResponseParserBase<TMultiTablePartitions>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ TMultiTablePartitions partitions;
+ Deserialize(partitions, *node);
+ Result.SetValue(std::move(partitions));
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TGetFileFromCacheParser
+ : public TResponseParserBase<TMaybe<TYPath>>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ if (node->AsString().empty()) {
+ Result.SetValue(Nothing());
+ } else {
+ Result.SetValue(node->AsString());
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TYPathParser
+ : public TResponseParserBase<TYPath>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::String);
+ Result.SetValue(node->AsString());
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TCheckPermissionParser
+ : public TResponseParserBase<TCheckPermissionResponse>
+{
+public:
+ void SetResponse(TMaybe<TNode> node) override
+ {
+ EnsureType(node, TNode::Map);
+ Result.SetValue(ParseCheckPermissionResponse(*node));
+ }
+};
+
+////////////////////////////////////////////////////////////////////
+
+TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
+ : Parameters(std::move(parameters))
+ , ResponseParser(std::move(responseParser))
+ , NextTry()
+{ }
+
+TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
+ : Parameters(batchItem.Parameters)
+ , ResponseParser(batchItem.ResponseParser)
+ , NextTry(nextTry)
+{ }
+
+////////////////////////////////////////////////////////////////////
+
+TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config)
+ : Config_(config)
+{ }
+
+TRawBatchRequest::~TRawBatchRequest() = default;
+
+bool TRawBatchRequest::IsExecuted() const
+{
+ return Executed_;
+}
+
+void TRawBatchRequest::MarkExecuted()
+{
+ Executed_ = true;
+}
+
+template <typename TResponseParser>
+typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
+ const TString& command,
+ TNode parameters,
+ TMaybe<TNode> input)
+{
+ return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
+}
+
+template <typename TResponseParser>
+typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
+ const TString& command,
+ TNode parameters,
+ TMaybe<TNode> input,
+ ::TIntrusivePtr<TResponseParser> parser)
+{
+ Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
+ TNode request;
+ request["command"] = command;
+ request["parameters"] = std::move(parameters);
+ if (input) {
+ request["input"] = std::move(*input);
+ }
+ BatchItemList_.emplace_back(std::move(request), parser);
+ return parser->GetFuture();
+}
+
+void TRawBatchRequest::AddRequest(TBatchItem batchItem)
+{
+ Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
+ BatchItemList_.push_back(batchItem);
+}
+
+TFuture<TNodeId> TRawBatchRequest::Create(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ENodeType type,
+ const TCreateOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "create",
+ SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::Remove(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TRemoveOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "remove",
+ SerializeParamsForRemove(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<bool> TRawBatchRequest::Exists(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TExistsOptions& options)
+{
+ return AddRequest<TExistsResponseParser>(
+ "exists",
+ SerializeParamsForExists(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<TNode> TRawBatchRequest::Get(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ return AddRequest<TGetResponseParser>(
+ "get",
+ SerializeParamsForGet(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::Set(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TNode& node,
+ const TSetOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "set",
+ SerializeParamsForSet(transaction, Config_->Prefix, path, options),
+ node);
+}
+
+TFuture<TNode::TListType> TRawBatchRequest::List(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TListOptions& options)
+{
+ return AddRequest<TListResponseParser>(
+ "list",
+ SerializeParamsForList(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<TNodeId> TRawBatchRequest::Copy(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "copy",
+ SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options),
+ Nothing());
+}
+
+TFuture<TNodeId> TRawBatchRequest::Move(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "move",
+ SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options),
+ Nothing());
+}
+
+TFuture<TNodeId> TRawBatchRequest::Link(
+ const TTransactionId& transaction,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "link",
+ SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options),
+ Nothing());
+}
+
+TFuture<TLockId> TRawBatchRequest::Lock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options)
+{
+ return AddRequest<TGuidResponseParser>(
+ "lock",
+ SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::Unlock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TUnlockOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "unlock",
+ SerializeParamsForUnlock(transaction, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache(
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions& options)
+{
+ return AddRequest<TGetFileFromCacheParser>(
+ "get_file_from_cache",
+ SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
+ Nothing());
+}
+
+TFuture<TYPath> TRawBatchRequest::PutFileToCache(
+ const TTransactionId& transactionId,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options)
+{
+ return AddRequest<TYPathParser>(
+ "put_file_to_cache",
+ SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options),
+ Nothing());
+}
+
+TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission(
+ const TString& user,
+ EPermission permission,
+ const TYPath& path,
+ const TCheckPermissionOptions& options)
+{
+ return AddRequest<TCheckPermissionParser>(
+ "check_permission",
+ SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options),
+ Nothing());
+}
+
+TFuture<TOperationAttributes> TRawBatchRequest::GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options)
+{
+ return AddRequest<TGetOperationResponseParser>(
+ "get_operation",
+ SerializeParamsForGetOperation(operationId, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId)
+{
+ return AddRequest<TVoidResponseParser>(
+ "abort_op",
+ SerializeParamsForAbortOperation(operationId),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId)
+{
+ return AddRequest<TVoidResponseParser>(
+ "complete_op",
+ SerializeParamsForCompleteOperation(operationId),
+ Nothing());
+}
+TFuture<void> TRawBatchRequest::SuspendOperation(
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "suspend_operation",
+ SerializeParamsForSuspendOperation(operationId, options),
+ Nothing());
+}
+TFuture<void> TRawBatchRequest::ResumeOperation(
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "resume_operation",
+ SerializeParamsForResumeOperation(operationId, options),
+ Nothing());
+}
+
+TFuture<void> TRawBatchRequest::UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options)
+{
+ return AddRequest<TVoidResponseParser>(
+ "update_op_parameters",
+ SerializeParamsForUpdateOperationParameters(operationId, options),
+ Nothing());
+}
+
+TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path)
+{
+ if (path.Path_.find_first_of("<>{}[]") != TString::npos) {
+ return AddRequest<TCanonizeYPathResponseParser>(
+ "parse_ypath",
+ SerializeParamsForParseYPath(path),
+ Nothing(),
+ MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, path));
+ } else {
+ TRichYPath result = path;
+ result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix);
+ return NThreading::MakeFuture(result);
+ }
+}
+
+TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options)
+{
+ return AddRequest<TTableColumnarStatisticsParser>(
+ "get_table_columnar_statistics",
+ SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
+ Nothing());
+}
+
+TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options)
+{
+ return AddRequest<TTablePartitionsParser>(
+ "partition_tables",
+ SerializeParamsForGetTablePartitions(transaction, paths, options),
+ Nothing());
+}
+
+void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
+{
+ Y_VERIFY(result);
+ Y_VERIFY(nextTry);
+
+ *nextTry = TInstant();
+ maxSize = Min(maxSize, BatchItemList_.size());
+ *result = TNode::CreateList();
+ for (size_t i = 0; i < maxSize; ++i) {
+ YT_LOG_DEBUG("ExecuteBatch preparing: %v",
+ RequestInfo(BatchItemList_[i].Parameters));
+
+ result->Add(BatchItemList_[i].Parameters);
+ if (BatchItemList_[i].NextTry > *nextTry) {
+ *nextTry = BatchItemList_[i].NextTry;
+ }
+ }
+}
+
+void TRawBatchRequest::ParseResponse(
+ const TResponseInfo& requestResult,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ TRawBatchRequest* retryBatch,
+ TInstant now)
+{
+ TNode node = NodeFromYsonString(requestResult.Response);
+ return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now);
+}
+
+void TRawBatchRequest::ParseResponse(
+ TNode node,
+ const TString& requestId,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ TRawBatchRequest* retryBatch,
+ TInstant now)
+{
+ Y_VERIFY(retryBatch);
+
+ EnsureType(node, TNode::List);
+ auto& responseList = node.AsList();
+ const auto size = responseList.size();
+ Y_ENSURE(size <= BatchItemList_.size(),
+ "Size of server response exceeds size of batch request;"
+ " size of batch: " << BatchItemList_.size() <<
+ " size of server response: " << size << '.');
+
+ for (size_t i = 0; i != size; ++i) {
+ try {
+ EnsureType(responseList[i], TNode::Map);
+ auto& responseNode = responseList[i].AsMap();
+ const auto outputIt = responseNode.find("output");
+ if (outputIt != responseNode.end()) {
+ BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
+ } else {
+ const auto errorIt = responseNode.find("error");
+ if (errorIt == responseNode.end()) {
+ BatchItemList_[i].ResponseParser->SetResponse(Nothing());
+ } else {
+ TErrorResponse error(400, requestId);
+ error.SetError(TYtError(errorIt->second));
+ if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
+ YT_LOG_INFO(
+ "Batch subrequest (%s) failed, will retry, error: %s",
+ RequestInfo(BatchItemList_[i].Parameters),
+ error.what());
+ retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
+ } else {
+ YT_LOG_ERROR(
+ "Batch subrequest (%s) failed, error: %s",
+ RequestInfo(BatchItemList_[i].Parameters),
+ error.what());
+ BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
+ }
+ }
+ }
+ } catch (const std::exception& e) {
+ // We don't expect other exceptions, so we don't catch (...)
+ BatchItemList_[i].ResponseParser->SetException(std::current_exception());
+ }
+ }
+ BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
+}
+
+void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const
+{
+ for (const auto& batchItem : BatchItemList_) {
+ batchItem.ResponseParser->SetException(e);
+ }
+}
+
+size_t TRawBatchRequest::BatchSize() const
+{
+ return BatchItemList_.size();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail::NRawClient