diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_batch_request.cpp')
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | 687 |
1 files changed, 687 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp new file mode 100644 index 0000000000..be81f5a21a --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp @@ -0,0 +1,687 @@ +#include "raw_batch_request.h" + +#include "raw_requests.h" +#include "rpc_parameters_serialization.h" + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/cpp/mapreduce/interface/client.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/serialize.h> + +#include <library/cpp/yson/node/node.h> + +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <util/generic/guid.h> +#include <util/string/builder.h> + +#include <exception> + +namespace NYT::NDetail::NRawClient { + +using NThreading::TFuture; +using NThreading::TPromise; +using NThreading::NewPromise; + +//////////////////////////////////////////////////////////////////// + +static TString RequestInfo(const TNode& request) +{ + return ::TStringBuilder() + << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]); +} + +static void EnsureNothing(const TMaybe<TNode>& node) +{ + Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType()); +} + +static void EnsureSomething(const TMaybe<TNode>& node) +{ + Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response."); +} + +static void EnsureType(const TNode& node, TNode::EType type) +{ + Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. " + << "Expected: " << type << ", actual: " << node.GetType()); +} + +static void EnsureType(const TMaybe<TNode>& node, TNode::EType type) +{ + Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response."); + EnsureType(*node, type); +} + +//////////////////////////////////////////////////////////////////// + +template <typename TReturnType> +class TResponseParserBase + : public TRawBatchRequest::IResponseItemParser +{ +public: + using TFutureResult = TFuture<TReturnType>; + +public: + TResponseParserBase() + : Result(NewPromise<TReturnType>()) + { } + + void SetException(std::exception_ptr e) override + { + Result.SetException(std::move(e)); + } + + TFuture<TReturnType> GetFuture() + { + return Result.GetFuture(); + } + +protected: + TPromise<TReturnType> Result; +}; + +//////////////////////////////////////////////////////////////////// + + +class TGetResponseParser + : public TResponseParserBase<TNode> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureSomething(node); + Result.SetValue(std::move(*node)); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TVoidResponseParser + : public TResponseParserBase<void> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureNothing(node); + Result.SetValue(); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TListResponseParser + : public TResponseParserBase<TNode::TListType> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::List); + Result.SetValue(std::move(node->AsList())); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TExistsResponseParser + : public TResponseParserBase<bool> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Bool); + Result.SetValue(std::move(node->AsBool())); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TGuidResponseParser + : public TResponseParserBase<TGUID> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::String); + Result.SetValue(GetGuid(node->AsString())); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TCanonizeYPathResponseParser + : public TResponseParserBase<TRichYPath> +{ +public: + explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original) + : OriginalNode_(PathToNode(original)) + , PathPrefix_(std::move(pathPrefix)) + { } + + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::String); + + for (const auto& item : OriginalNode_.GetAttributes().AsMap()) { + node->Attributes()[item.first] = item.second; + } + TRichYPath result; + Deserialize(result, *node); + result.Path_ = AddPathPrefix(result.Path_, PathPrefix_); + Result.SetValue(result); + } + +private: + TNode OriginalNode_; + TString PathPrefix_; +}; + +//////////////////////////////////////////////////////////////////// + +class TGetOperationResponseParser + : public TResponseParserBase<TOperationAttributes> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Map); + Result.SetValue(ParseOperationAttributes(*node)); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TTableColumnarStatisticsParser + : public TResponseParserBase<TVector<TTableColumnarStatistics>> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Map); + TVector<TTableColumnarStatistics> statistics; + Deserialize(statistics, *node); + Result.SetValue(std::move(statistics)); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TTablePartitionsParser + : public TResponseParserBase<TMultiTablePartitions> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Map); + TMultiTablePartitions partitions; + Deserialize(partitions, *node); + Result.SetValue(std::move(partitions)); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TGetFileFromCacheParser + : public TResponseParserBase<TMaybe<TYPath>> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::String); + if (node->AsString().empty()) { + Result.SetValue(Nothing()); + } else { + Result.SetValue(node->AsString()); + } + } +}; + +//////////////////////////////////////////////////////////////////// + +class TYPathParser + : public TResponseParserBase<TYPath> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::String); + Result.SetValue(node->AsString()); + } +}; + +//////////////////////////////////////////////////////////////////// + +class TCheckPermissionParser + : public TResponseParserBase<TCheckPermissionResponse> +{ +public: + void SetResponse(TMaybe<TNode> node) override + { + EnsureType(node, TNode::Map); + Result.SetValue(ParseCheckPermissionResponse(*node)); + } +}; + +//////////////////////////////////////////////////////////////////// + +TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser) + : Parameters(std::move(parameters)) + , ResponseParser(std::move(responseParser)) + , NextTry() +{ } + +TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry) + : Parameters(batchItem.Parameters) + , ResponseParser(batchItem.ResponseParser) + , NextTry(nextTry) +{ } + +//////////////////////////////////////////////////////////////////// + +TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config) + : Config_(config) +{ } + +TRawBatchRequest::~TRawBatchRequest() = default; + +bool TRawBatchRequest::IsExecuted() const +{ + return Executed_; +} + +void TRawBatchRequest::MarkExecuted() +{ + Executed_ = true; +} + +template <typename TResponseParser> +typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest( + const TString& command, + TNode parameters, + TMaybe<TNode> input) +{ + return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>()); +} + +template <typename TResponseParser> +typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest( + const TString& command, + TNode parameters, + TMaybe<TNode> input, + ::TIntrusivePtr<TResponseParser> parser) +{ + Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); + TNode request; + request["command"] = command; + request["parameters"] = std::move(parameters); + if (input) { + request["input"] = std::move(*input); + } + BatchItemList_.emplace_back(std::move(request), parser); + return parser->GetFuture(); +} + +void TRawBatchRequest::AddRequest(TBatchItem batchItem) +{ + Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); + BatchItemList_.push_back(batchItem); +} + +TFuture<TNodeId> TRawBatchRequest::Create( + const TTransactionId& transaction, + const TYPath& path, + ENodeType type, + const TCreateOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "create", + SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::Remove( + const TTransactionId& transaction, + const TYPath& path, + const TRemoveOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "remove", + SerializeParamsForRemove(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<bool> TRawBatchRequest::Exists( + const TTransactionId& transaction, + const TYPath& path, + const TExistsOptions& options) +{ + return AddRequest<TExistsResponseParser>( + "exists", + SerializeParamsForExists(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<TNode> TRawBatchRequest::Get( + const TTransactionId& transaction, + const TYPath& path, + const TGetOptions& options) +{ + return AddRequest<TGetResponseParser>( + "get", + SerializeParamsForGet(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::Set( + const TTransactionId& transaction, + const TYPath& path, + const TNode& node, + const TSetOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "set", + SerializeParamsForSet(transaction, Config_->Prefix, path, options), + node); +} + +TFuture<TNode::TListType> TRawBatchRequest::List( + const TTransactionId& transaction, + const TYPath& path, + const TListOptions& options) +{ + return AddRequest<TListResponseParser>( + "list", + SerializeParamsForList(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<TNodeId> TRawBatchRequest::Copy( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "copy", + SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options), + Nothing()); +} + +TFuture<TNodeId> TRawBatchRequest::Move( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "move", + SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options), + Nothing()); +} + +TFuture<TNodeId> TRawBatchRequest::Link( + const TTransactionId& transaction, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "link", + SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options), + Nothing()); +} + +TFuture<TLockId> TRawBatchRequest::Lock( + const TTransactionId& transaction, + const TYPath& path, + ELockMode mode, + const TLockOptions& options) +{ + return AddRequest<TGuidResponseParser>( + "lock", + SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::Unlock( + const TTransactionId& transaction, + const TYPath& path, + const TUnlockOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "unlock", + SerializeParamsForUnlock(transaction, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache( + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions& options) +{ + return AddRequest<TGetFileFromCacheParser>( + "get_file_from_cache", + SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options), + Nothing()); +} + +TFuture<TYPath> TRawBatchRequest::PutFileToCache( + const TTransactionId& transactionId, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options) +{ + return AddRequest<TYPathParser>( + "put_file_to_cache", + SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options), + Nothing()); +} + +TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission( + const TString& user, + EPermission permission, + const TYPath& path, + const TCheckPermissionOptions& options) +{ + return AddRequest<TCheckPermissionParser>( + "check_permission", + SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options), + Nothing()); +} + +TFuture<TOperationAttributes> TRawBatchRequest::GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options) +{ + return AddRequest<TGetOperationResponseParser>( + "get_operation", + SerializeParamsForGetOperation(operationId, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId) +{ + return AddRequest<TVoidResponseParser>( + "abort_op", + SerializeParamsForAbortOperation(operationId), + Nothing()); +} + +TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId) +{ + return AddRequest<TVoidResponseParser>( + "complete_op", + SerializeParamsForCompleteOperation(operationId), + Nothing()); +} +TFuture<void> TRawBatchRequest::SuspendOperation( + const TOperationId& operationId, + const TSuspendOperationOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "suspend_operation", + SerializeParamsForSuspendOperation(operationId, options), + Nothing()); +} +TFuture<void> TRawBatchRequest::ResumeOperation( + const TOperationId& operationId, + const TResumeOperationOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "resume_operation", + SerializeParamsForResumeOperation(operationId, options), + Nothing()); +} + +TFuture<void> TRawBatchRequest::UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options) +{ + return AddRequest<TVoidResponseParser>( + "update_op_parameters", + SerializeParamsForUpdateOperationParameters(operationId, options), + Nothing()); +} + +TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path) +{ + if (path.Path_.find_first_of("<>{}[]") != TString::npos) { + return AddRequest<TCanonizeYPathResponseParser>( + "parse_ypath", + SerializeParamsForParseYPath(path), + Nothing(), + MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, path)); + } else { + TRichYPath result = path; + result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix); + return NThreading::MakeFuture(result); + } +} + +TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options) +{ + return AddRequest<TTableColumnarStatisticsParser>( + "get_table_columnar_statistics", + SerializeParamsForGetTableColumnarStatistics(transaction, paths, options), + Nothing()); +} + +TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options) +{ + return AddRequest<TTablePartitionsParser>( + "partition_tables", + SerializeParamsForGetTablePartitions(transaction, paths, options), + Nothing()); +} + +void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const +{ + Y_VERIFY(result); + Y_VERIFY(nextTry); + + *nextTry = TInstant(); + maxSize = Min(maxSize, BatchItemList_.size()); + *result = TNode::CreateList(); + for (size_t i = 0; i < maxSize; ++i) { + YT_LOG_DEBUG("ExecuteBatch preparing: %v", + RequestInfo(BatchItemList_[i].Parameters)); + + result->Add(BatchItemList_[i].Parameters); + if (BatchItemList_[i].NextTry > *nextTry) { + *nextTry = BatchItemList_[i].NextTry; + } + } +} + +void TRawBatchRequest::ParseResponse( + const TResponseInfo& requestResult, + const IRequestRetryPolicyPtr& retryPolicy, + TRawBatchRequest* retryBatch, + TInstant now) +{ + TNode node = NodeFromYsonString(requestResult.Response); + return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now); +} + +void TRawBatchRequest::ParseResponse( + TNode node, + const TString& requestId, + const IRequestRetryPolicyPtr& retryPolicy, + TRawBatchRequest* retryBatch, + TInstant now) +{ + Y_VERIFY(retryBatch); + + EnsureType(node, TNode::List); + auto& responseList = node.AsList(); + const auto size = responseList.size(); + Y_ENSURE(size <= BatchItemList_.size(), + "Size of server response exceeds size of batch request;" + " size of batch: " << BatchItemList_.size() << + " size of server response: " << size << '.'); + + for (size_t i = 0; i != size; ++i) { + try { + EnsureType(responseList[i], TNode::Map); + auto& responseNode = responseList[i].AsMap(); + const auto outputIt = responseNode.find("output"); + if (outputIt != responseNode.end()) { + BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second)); + } else { + const auto errorIt = responseNode.find("error"); + if (errorIt == responseNode.end()) { + BatchItemList_[i].ResponseParser->SetResponse(Nothing()); + } else { + TErrorResponse error(400, requestId); + error.SetError(TYtError(errorIt->second)); + if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) { + YT_LOG_INFO( + "Batch subrequest (%s) failed, will retry, error: %s", + RequestInfo(BatchItemList_[i].Parameters), + error.what()); + retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); + } else { + YT_LOG_ERROR( + "Batch subrequest (%s) failed, error: %s", + RequestInfo(BatchItemList_[i].Parameters), + error.what()); + BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error)); + } + } + } + } catch (const std::exception& e) { + // We don't expect other exceptions, so we don't catch (...) + BatchItemList_[i].ResponseParser->SetException(std::current_exception()); + } + } + BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size); +} + +void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const +{ + for (const auto& batchItem : BatchItemList_) { + batchItem.ResponseParser->SetException(e); + } +} + +size_t TRawBatchRequest::BatchSize() const +{ + return BatchItemList_.size(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail::NRawClient |