diff options
| author | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
| commit | 3e1899838408bbad47622007aa382bc8a2b01f87 (patch) | |
| tree | 0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | |
| parent | 5463eb3f5e72a86f858a3d27c886470a724ede34 (diff) | |
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing
changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_batch_request.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | 687 |
1 files changed, 0 insertions, 687 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp deleted file mode 100644 index be81f5a21a9..00000000000 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp +++ /dev/null @@ -1,687 +0,0 @@ -#include "raw_batch_request.h" - -#include "raw_requests.h" -#include "rpc_parameters_serialization.h" - -#include <yt/cpp/mapreduce/common/helpers.h> -#include <yt/cpp/mapreduce/common/retry_lib.h> - -#include <yt/cpp/mapreduce/interface/logging/yt_log.h> - -#include <yt/cpp/mapreduce/interface/client.h> -#include <yt/cpp/mapreduce/interface/errors.h> -#include <yt/cpp/mapreduce/interface/serialize.h> - -#include <library/cpp/yson/node/node.h> - -#include <yt/cpp/mapreduce/http/context.h> -#include <yt/cpp/mapreduce/http/retry_request.h> - -#include <util/generic/guid.h> -#include <util/string/builder.h> - -#include <exception> - -namespace NYT::NDetail::NRawClient { - -using NThreading::TFuture; -using NThreading::TPromise; -using NThreading::NewPromise; - -//////////////////////////////////////////////////////////////////// - -static TString RequestInfo(const TNode& request) -{ - return ::TStringBuilder() - << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]); -} - -static void EnsureNothing(const TMaybe<TNode>& node) -{ - Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType()); -} - -static void EnsureSomething(const TMaybe<TNode>& node) -{ - Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response."); -} - -static void EnsureType(const TNode& node, TNode::EType type) -{ - Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. " - << "Expected: " << type << ", actual: " << node.GetType()); -} - -static void EnsureType(const TMaybe<TNode>& node, TNode::EType type) -{ - Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response."); - EnsureType(*node, type); -} - -//////////////////////////////////////////////////////////////////// - -template <typename TReturnType> -class TResponseParserBase - : public TRawBatchRequest::IResponseItemParser -{ -public: - using TFutureResult = TFuture<TReturnType>; - -public: - TResponseParserBase() - : Result(NewPromise<TReturnType>()) - { } - - void SetException(std::exception_ptr e) override - { - Result.SetException(std::move(e)); - } - - TFuture<TReturnType> GetFuture() - { - return Result.GetFuture(); - } - -protected: - TPromise<TReturnType> Result; -}; - -//////////////////////////////////////////////////////////////////// - - -class TGetResponseParser - : public TResponseParserBase<TNode> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureSomething(node); - Result.SetValue(std::move(*node)); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TVoidResponseParser - : public TResponseParserBase<void> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureNothing(node); - Result.SetValue(); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TListResponseParser - : public TResponseParserBase<TNode::TListType> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::List); - Result.SetValue(std::move(node->AsList())); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TExistsResponseParser - : public TResponseParserBase<bool> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::Bool); - Result.SetValue(std::move(node->AsBool())); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TGuidResponseParser - : public TResponseParserBase<TGUID> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::String); - Result.SetValue(GetGuid(node->AsString())); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TCanonizeYPathResponseParser - : public TResponseParserBase<TRichYPath> -{ -public: - explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original) - : OriginalNode_(PathToNode(original)) - , PathPrefix_(std::move(pathPrefix)) - { } - - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::String); - - for (const auto& item : OriginalNode_.GetAttributes().AsMap()) { - node->Attributes()[item.first] = item.second; - } - TRichYPath result; - Deserialize(result, *node); - result.Path_ = AddPathPrefix(result.Path_, PathPrefix_); - Result.SetValue(result); - } - -private: - TNode OriginalNode_; - TString PathPrefix_; -}; - -//////////////////////////////////////////////////////////////////// - -class TGetOperationResponseParser - : public TResponseParserBase<TOperationAttributes> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::Map); - Result.SetValue(ParseOperationAttributes(*node)); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TTableColumnarStatisticsParser - : public TResponseParserBase<TVector<TTableColumnarStatistics>> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::Map); - TVector<TTableColumnarStatistics> statistics; - Deserialize(statistics, *node); - Result.SetValue(std::move(statistics)); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TTablePartitionsParser - : public TResponseParserBase<TMultiTablePartitions> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::Map); - TMultiTablePartitions partitions; - Deserialize(partitions, *node); - Result.SetValue(std::move(partitions)); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TGetFileFromCacheParser - : public TResponseParserBase<TMaybe<TYPath>> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::String); - if (node->AsString().empty()) { - Result.SetValue(Nothing()); - } else { - Result.SetValue(node->AsString()); - } - } -}; - -//////////////////////////////////////////////////////////////////// - -class TYPathParser - : public TResponseParserBase<TYPath> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::String); - Result.SetValue(node->AsString()); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TCheckPermissionParser - : public TResponseParserBase<TCheckPermissionResponse> -{ -public: - void SetResponse(TMaybe<TNode> node) override - { - EnsureType(node, TNode::Map); - Result.SetValue(ParseCheckPermissionResponse(*node)); - } -}; - -//////////////////////////////////////////////////////////////////// - -TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser) - : Parameters(std::move(parameters)) - , ResponseParser(std::move(responseParser)) - , NextTry() -{ } - -TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry) - : Parameters(batchItem.Parameters) - , ResponseParser(batchItem.ResponseParser) - , NextTry(nextTry) -{ } - -//////////////////////////////////////////////////////////////////// - -TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config) - : Config_(config) -{ } - -TRawBatchRequest::~TRawBatchRequest() = default; - -bool TRawBatchRequest::IsExecuted() const -{ - return Executed_; -} - -void TRawBatchRequest::MarkExecuted() -{ - Executed_ = true; -} - -template <typename TResponseParser> -typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest( - const TString& command, - TNode parameters, - TMaybe<TNode> input) -{ - return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>()); -} - -template <typename TResponseParser> -typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest( - const TString& command, - TNode parameters, - TMaybe<TNode> input, - ::TIntrusivePtr<TResponseParser> parser) -{ - Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); - TNode request; - request["command"] = command; - request["parameters"] = std::move(parameters); - if (input) { - request["input"] = std::move(*input); - } - BatchItemList_.emplace_back(std::move(request), parser); - return parser->GetFuture(); -} - -void TRawBatchRequest::AddRequest(TBatchItem batchItem) -{ - Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); - BatchItemList_.push_back(batchItem); -} - -TFuture<TNodeId> TRawBatchRequest::Create( - const TTransactionId& transaction, - const TYPath& path, - ENodeType type, - const TCreateOptions& options) -{ - return AddRequest<TGuidResponseParser>( - "create", - SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options), - Nothing()); -} - -TFuture<void> TRawBatchRequest::Remove( - const TTransactionId& transaction, - const TYPath& path, - const TRemoveOptions& options) -{ - return AddRequest<TVoidResponseParser>( - "remove", - SerializeParamsForRemove(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture<bool> TRawBatchRequest::Exists( - const TTransactionId& transaction, - const TYPath& path, - const TExistsOptions& options) -{ - return AddRequest<TExistsResponseParser>( - "exists", - SerializeParamsForExists(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture<TNode> TRawBatchRequest::Get( - const TTransactionId& transaction, - const TYPath& path, - const TGetOptions& options) -{ - return AddRequest<TGetResponseParser>( - "get", - SerializeParamsForGet(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture<void> TRawBatchRequest::Set( - const TTransactionId& transaction, - const TYPath& path, - const TNode& node, - const TSetOptions& options) -{ - return AddRequest<TVoidResponseParser>( - "set", - SerializeParamsForSet(transaction, Config_->Prefix, path, options), - node); -} - -TFuture<TNode::TListType> TRawBatchRequest::List( - const TTransactionId& transaction, - const TYPath& path, - const TListOptions& options) -{ - return AddRequest<TListResponseParser>( - "list", - SerializeParamsForList(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture<TNodeId> TRawBatchRequest::Copy( - const TTransactionId& transaction, - const TYPath& sourcePath, - const TYPath& destinationPath, - const TCopyOptions& options) -{ - return AddRequest<TGuidResponseParser>( - "copy", - SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options), - Nothing()); -} - -TFuture<TNodeId> TRawBatchRequest::Move( - const TTransactionId& transaction, - const TYPath& sourcePath, - const TYPath& destinationPath, - const TMoveOptions& options) -{ - return AddRequest<TGuidResponseParser>( - "move", - SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options), - Nothing()); -} - -TFuture<TNodeId> TRawBatchRequest::Link( - const TTransactionId& transaction, - const TYPath& targetPath, - const TYPath& linkPath, - const TLinkOptions& options) -{ - return AddRequest<TGuidResponseParser>( - "link", - SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options), - Nothing()); -} - -TFuture<TLockId> TRawBatchRequest::Lock( - const TTransactionId& transaction, - const TYPath& path, - ELockMode mode, - const TLockOptions& options) -{ - return AddRequest<TGuidResponseParser>( - "lock", - SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options), - Nothing()); -} - -TFuture<void> TRawBatchRequest::Unlock( - const TTransactionId& transaction, - const TYPath& path, - const TUnlockOptions& options) -{ - return AddRequest<TVoidResponseParser>( - "unlock", - SerializeParamsForUnlock(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache( - const TTransactionId& transactionId, - const TString& md5Signature, - const TYPath& cachePath, - const TGetFileFromCacheOptions& options) -{ - return AddRequest<TGetFileFromCacheParser>( - "get_file_from_cache", - SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options), - Nothing()); -} - -TFuture<TYPath> TRawBatchRequest::PutFileToCache( - const TTransactionId& transactionId, - const TYPath& filePath, - const TString& md5Signature, - const TYPath& cachePath, - const TPutFileToCacheOptions& options) -{ - return AddRequest<TYPathParser>( - "put_file_to_cache", - SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options), - Nothing()); -} - -TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission( - const TString& user, - EPermission permission, - const TYPath& path, - const TCheckPermissionOptions& options) -{ - return AddRequest<TCheckPermissionParser>( - "check_permission", - SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options), - Nothing()); -} - -TFuture<TOperationAttributes> TRawBatchRequest::GetOperation( - const TOperationId& operationId, - const TGetOperationOptions& options) -{ - return AddRequest<TGetOperationResponseParser>( - "get_operation", - SerializeParamsForGetOperation(operationId, options), - Nothing()); -} - -TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId) -{ - return AddRequest<TVoidResponseParser>( - "abort_op", - SerializeParamsForAbortOperation(operationId), - Nothing()); -} - -TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId) -{ - return AddRequest<TVoidResponseParser>( - "complete_op", - SerializeParamsForCompleteOperation(operationId), - Nothing()); -} -TFuture<void> TRawBatchRequest::SuspendOperation( - const TOperationId& operationId, - const TSuspendOperationOptions& options) -{ - return AddRequest<TVoidResponseParser>( - "suspend_operation", - SerializeParamsForSuspendOperation(operationId, options), - Nothing()); -} -TFuture<void> TRawBatchRequest::ResumeOperation( - const TOperationId& operationId, - const TResumeOperationOptions& options) -{ - return AddRequest<TVoidResponseParser>( - "resume_operation", - SerializeParamsForResumeOperation(operationId, options), - Nothing()); -} - -TFuture<void> TRawBatchRequest::UpdateOperationParameters( - const TOperationId& operationId, - const TUpdateOperationParametersOptions& options) -{ - return AddRequest<TVoidResponseParser>( - "update_op_parameters", - SerializeParamsForUpdateOperationParameters(operationId, options), - Nothing()); -} - -TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path) -{ - if (path.Path_.find_first_of("<>{}[]") != TString::npos) { - return AddRequest<TCanonizeYPathResponseParser>( - "parse_ypath", - SerializeParamsForParseYPath(path), - Nothing(), - MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, path)); - } else { - TRichYPath result = path; - result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix); - return NThreading::MakeFuture(result); - } -} - -TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics( - const TTransactionId& transaction, - const TVector<TRichYPath>& paths, - const TGetTableColumnarStatisticsOptions& options) -{ - return AddRequest<TTableColumnarStatisticsParser>( - "get_table_columnar_statistics", - SerializeParamsForGetTableColumnarStatistics(transaction, paths, options), - Nothing()); -} - -TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions( - const TTransactionId& transaction, - const TVector<TRichYPath>& paths, - const TGetTablePartitionsOptions& options) -{ - return AddRequest<TTablePartitionsParser>( - "partition_tables", - SerializeParamsForGetTablePartitions(transaction, paths, options), - Nothing()); -} - -void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const -{ - Y_VERIFY(result); - Y_VERIFY(nextTry); - - *nextTry = TInstant(); - maxSize = Min(maxSize, BatchItemList_.size()); - *result = TNode::CreateList(); - for (size_t i = 0; i < maxSize; ++i) { - YT_LOG_DEBUG("ExecuteBatch preparing: %v", - RequestInfo(BatchItemList_[i].Parameters)); - - result->Add(BatchItemList_[i].Parameters); - if (BatchItemList_[i].NextTry > *nextTry) { - *nextTry = BatchItemList_[i].NextTry; - } - } -} - -void TRawBatchRequest::ParseResponse( - const TResponseInfo& requestResult, - const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, - TInstant now) -{ - TNode node = NodeFromYsonString(requestResult.Response); - return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now); -} - -void TRawBatchRequest::ParseResponse( - TNode node, - const TString& requestId, - const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, - TInstant now) -{ - Y_VERIFY(retryBatch); - - EnsureType(node, TNode::List); - auto& responseList = node.AsList(); - const auto size = responseList.size(); - Y_ENSURE(size <= BatchItemList_.size(), - "Size of server response exceeds size of batch request;" - " size of batch: " << BatchItemList_.size() << - " size of server response: " << size << '.'); - - for (size_t i = 0; i != size; ++i) { - try { - EnsureType(responseList[i], TNode::Map); - auto& responseNode = responseList[i].AsMap(); - const auto outputIt = responseNode.find("output"); - if (outputIt != responseNode.end()) { - BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second)); - } else { - const auto errorIt = responseNode.find("error"); - if (errorIt == responseNode.end()) { - BatchItemList_[i].ResponseParser->SetResponse(Nothing()); - } else { - TErrorResponse error(400, requestId); - error.SetError(TYtError(errorIt->second)); - if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) { - YT_LOG_INFO( - "Batch subrequest (%s) failed, will retry, error: %s", - RequestInfo(BatchItemList_[i].Parameters), - error.what()); - retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); - } else { - YT_LOG_ERROR( - "Batch subrequest (%s) failed, error: %s", - RequestInfo(BatchItemList_[i].Parameters), - error.what()); - BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error)); - } - } - } - } catch (const std::exception& e) { - // We don't expect other exceptions, so we don't catch (...) - BatchItemList_[i].ResponseParser->SetException(std::current_exception()); - } - } - BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size); -} - -void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const -{ - for (const auto& batchItem : BatchItemList_) { - batchItem.ResponseParser->SetException(e); - } -} - -size_t TRawBatchRequest::BatchSize() const -{ - return BatchItemList_.size(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NDetail::NRawClient |
