From 3e1899838408bbad47622007aa382bc8a2b01f87 Mon Sep 17 00:00:00 2001 From: max42 Date: Fri, 30 Jun 2023 11:13:34 +0300 Subject: Revert "YT-19324: move YT provider to ydb/library/yql" This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12. --- yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | 687 ---------------------- 1 file changed, 687 deletions(-) delete mode 100644 yt/cpp/mapreduce/raw_client/raw_batch_request.cpp (limited to 'yt/cpp/mapreduce/raw_client/raw_batch_request.cpp') diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp deleted file mode 100644 index be81f5a21a9..00000000000 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp +++ /dev/null @@ -1,687 +0,0 @@ -#include "raw_batch_request.h" - -#include "raw_requests.h" -#include "rpc_parameters_serialization.h" - -#include -#include - -#include - -#include -#include -#include - -#include - -#include -#include - -#include -#include - -#include - -namespace NYT::NDetail::NRawClient { - -using NThreading::TFuture; -using NThreading::TPromise; -using NThreading::NewPromise; - -//////////////////////////////////////////////////////////////////// - -static TString RequestInfo(const TNode& request) -{ - return ::TStringBuilder() - << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]); -} - -static void EnsureNothing(const TMaybe& node) -{ - Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType()); -} - -static void EnsureSomething(const TMaybe& node) -{ - Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response."); -} - -static void EnsureType(const TNode& node, TNode::EType type) -{ - Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. " - << "Expected: " << type << ", actual: " << node.GetType()); -} - -static void EnsureType(const TMaybe& node, TNode::EType type) -{ - Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response."); - EnsureType(*node, type); -} - -//////////////////////////////////////////////////////////////////// - -template -class TResponseParserBase - : public TRawBatchRequest::IResponseItemParser -{ -public: - using TFutureResult = TFuture; - -public: - TResponseParserBase() - : Result(NewPromise()) - { } - - void SetException(std::exception_ptr e) override - { - Result.SetException(std::move(e)); - } - - TFuture GetFuture() - { - return Result.GetFuture(); - } - -protected: - TPromise Result; -}; - -//////////////////////////////////////////////////////////////////// - - -class TGetResponseParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureSomething(node); - Result.SetValue(std::move(*node)); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TVoidResponseParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureNothing(node); - Result.SetValue(); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TListResponseParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::List); - Result.SetValue(std::move(node->AsList())); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TExistsResponseParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::Bool); - Result.SetValue(std::move(node->AsBool())); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TGuidResponseParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::String); - Result.SetValue(GetGuid(node->AsString())); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TCanonizeYPathResponseParser - : public TResponseParserBase -{ -public: - explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original) - : OriginalNode_(PathToNode(original)) - , PathPrefix_(std::move(pathPrefix)) - { } - - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::String); - - for (const auto& item : OriginalNode_.GetAttributes().AsMap()) { - node->Attributes()[item.first] = item.second; - } - TRichYPath result; - Deserialize(result, *node); - result.Path_ = AddPathPrefix(result.Path_, PathPrefix_); - Result.SetValue(result); - } - -private: - TNode OriginalNode_; - TString PathPrefix_; -}; - -//////////////////////////////////////////////////////////////////// - -class TGetOperationResponseParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::Map); - Result.SetValue(ParseOperationAttributes(*node)); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TTableColumnarStatisticsParser - : public TResponseParserBase> -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::Map); - TVector statistics; - Deserialize(statistics, *node); - Result.SetValue(std::move(statistics)); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TTablePartitionsParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::Map); - TMultiTablePartitions partitions; - Deserialize(partitions, *node); - Result.SetValue(std::move(partitions)); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TGetFileFromCacheParser - : public TResponseParserBase> -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::String); - if (node->AsString().empty()) { - Result.SetValue(Nothing()); - } else { - Result.SetValue(node->AsString()); - } - } -}; - -//////////////////////////////////////////////////////////////////// - -class TYPathParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::String); - Result.SetValue(node->AsString()); - } -}; - -//////////////////////////////////////////////////////////////////// - -class TCheckPermissionParser - : public TResponseParserBase -{ -public: - void SetResponse(TMaybe node) override - { - EnsureType(node, TNode::Map); - Result.SetValue(ParseCheckPermissionResponse(*node)); - } -}; - -//////////////////////////////////////////////////////////////////// - -TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr responseParser) - : Parameters(std::move(parameters)) - , ResponseParser(std::move(responseParser)) - , NextTry() -{ } - -TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry) - : Parameters(batchItem.Parameters) - , ResponseParser(batchItem.ResponseParser) - , NextTry(nextTry) -{ } - -//////////////////////////////////////////////////////////////////// - -TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config) - : Config_(config) -{ } - -TRawBatchRequest::~TRawBatchRequest() = default; - -bool TRawBatchRequest::IsExecuted() const -{ - return Executed_; -} - -void TRawBatchRequest::MarkExecuted() -{ - Executed_ = true; -} - -template -typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest( - const TString& command, - TNode parameters, - TMaybe input) -{ - return AddRequest(command, parameters, input, MakeIntrusive()); -} - -template -typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest( - const TString& command, - TNode parameters, - TMaybe input, - ::TIntrusivePtr parser) -{ - Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); - TNode request; - request["command"] = command; - request["parameters"] = std::move(parameters); - if (input) { - request["input"] = std::move(*input); - } - BatchItemList_.emplace_back(std::move(request), parser); - return parser->GetFuture(); -} - -void TRawBatchRequest::AddRequest(TBatchItem batchItem) -{ - Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); - BatchItemList_.push_back(batchItem); -} - -TFuture TRawBatchRequest::Create( - const TTransactionId& transaction, - const TYPath& path, - ENodeType type, - const TCreateOptions& options) -{ - return AddRequest( - "create", - SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options), - Nothing()); -} - -TFuture TRawBatchRequest::Remove( - const TTransactionId& transaction, - const TYPath& path, - const TRemoveOptions& options) -{ - return AddRequest( - "remove", - SerializeParamsForRemove(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture TRawBatchRequest::Exists( - const TTransactionId& transaction, - const TYPath& path, - const TExistsOptions& options) -{ - return AddRequest( - "exists", - SerializeParamsForExists(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture TRawBatchRequest::Get( - const TTransactionId& transaction, - const TYPath& path, - const TGetOptions& options) -{ - return AddRequest( - "get", - SerializeParamsForGet(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture TRawBatchRequest::Set( - const TTransactionId& transaction, - const TYPath& path, - const TNode& node, - const TSetOptions& options) -{ - return AddRequest( - "set", - SerializeParamsForSet(transaction, Config_->Prefix, path, options), - node); -} - -TFuture TRawBatchRequest::List( - const TTransactionId& transaction, - const TYPath& path, - const TListOptions& options) -{ - return AddRequest( - "list", - SerializeParamsForList(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture TRawBatchRequest::Copy( - const TTransactionId& transaction, - const TYPath& sourcePath, - const TYPath& destinationPath, - const TCopyOptions& options) -{ - return AddRequest( - "copy", - SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options), - Nothing()); -} - -TFuture TRawBatchRequest::Move( - const TTransactionId& transaction, - const TYPath& sourcePath, - const TYPath& destinationPath, - const TMoveOptions& options) -{ - return AddRequest( - "move", - SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options), - Nothing()); -} - -TFuture TRawBatchRequest::Link( - const TTransactionId& transaction, - const TYPath& targetPath, - const TYPath& linkPath, - const TLinkOptions& options) -{ - return AddRequest( - "link", - SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options), - Nothing()); -} - -TFuture TRawBatchRequest::Lock( - const TTransactionId& transaction, - const TYPath& path, - ELockMode mode, - const TLockOptions& options) -{ - return AddRequest( - "lock", - SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options), - Nothing()); -} - -TFuture TRawBatchRequest::Unlock( - const TTransactionId& transaction, - const TYPath& path, - const TUnlockOptions& options) -{ - return AddRequest( - "unlock", - SerializeParamsForUnlock(transaction, Config_->Prefix, path, options), - Nothing()); -} - -TFuture> TRawBatchRequest::GetFileFromCache( - const TTransactionId& transactionId, - const TString& md5Signature, - const TYPath& cachePath, - const TGetFileFromCacheOptions& options) -{ - return AddRequest( - "get_file_from_cache", - SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options), - Nothing()); -} - -TFuture TRawBatchRequest::PutFileToCache( - const TTransactionId& transactionId, - const TYPath& filePath, - const TString& md5Signature, - const TYPath& cachePath, - const TPutFileToCacheOptions& options) -{ - return AddRequest( - "put_file_to_cache", - SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options), - Nothing()); -} - -TFuture TRawBatchRequest::CheckPermission( - const TString& user, - EPermission permission, - const TYPath& path, - const TCheckPermissionOptions& options) -{ - return AddRequest( - "check_permission", - SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options), - Nothing()); -} - -TFuture TRawBatchRequest::GetOperation( - const TOperationId& operationId, - const TGetOperationOptions& options) -{ - return AddRequest( - "get_operation", - SerializeParamsForGetOperation(operationId, options), - Nothing()); -} - -TFuture TRawBatchRequest::AbortOperation(const TOperationId& operationId) -{ - return AddRequest( - "abort_op", - SerializeParamsForAbortOperation(operationId), - Nothing()); -} - -TFuture TRawBatchRequest::CompleteOperation(const TOperationId& operationId) -{ - return AddRequest( - "complete_op", - SerializeParamsForCompleteOperation(operationId), - Nothing()); -} -TFuture TRawBatchRequest::SuspendOperation( - const TOperationId& operationId, - const TSuspendOperationOptions& options) -{ - return AddRequest( - "suspend_operation", - SerializeParamsForSuspendOperation(operationId, options), - Nothing()); -} -TFuture TRawBatchRequest::ResumeOperation( - const TOperationId& operationId, - const TResumeOperationOptions& options) -{ - return AddRequest( - "resume_operation", - SerializeParamsForResumeOperation(operationId, options), - Nothing()); -} - -TFuture TRawBatchRequest::UpdateOperationParameters( - const TOperationId& operationId, - const TUpdateOperationParametersOptions& options) -{ - return AddRequest( - "update_op_parameters", - SerializeParamsForUpdateOperationParameters(operationId, options), - Nothing()); -} - -TFuture TRawBatchRequest::CanonizeYPath(const TRichYPath& path) -{ - if (path.Path_.find_first_of("<>{}[]") != TString::npos) { - return AddRequest( - "parse_ypath", - SerializeParamsForParseYPath(path), - Nothing(), - MakeIntrusive(Config_->Prefix, path)); - } else { - TRichYPath result = path; - result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix); - return NThreading::MakeFuture(result); - } -} - -TFuture> TRawBatchRequest::GetTableColumnarStatistics( - const TTransactionId& transaction, - const TVector& paths, - const TGetTableColumnarStatisticsOptions& options) -{ - return AddRequest( - "get_table_columnar_statistics", - SerializeParamsForGetTableColumnarStatistics(transaction, paths, options), - Nothing()); -} - -TFuture TRawBatchRequest::GetTablePartitions( - const TTransactionId& transaction, - const TVector& paths, - const TGetTablePartitionsOptions& options) -{ - return AddRequest( - "partition_tables", - SerializeParamsForGetTablePartitions(transaction, paths, options), - Nothing()); -} - -void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const -{ - Y_VERIFY(result); - Y_VERIFY(nextTry); - - *nextTry = TInstant(); - maxSize = Min(maxSize, BatchItemList_.size()); - *result = TNode::CreateList(); - for (size_t i = 0; i < maxSize; ++i) { - YT_LOG_DEBUG("ExecuteBatch preparing: %v", - RequestInfo(BatchItemList_[i].Parameters)); - - result->Add(BatchItemList_[i].Parameters); - if (BatchItemList_[i].NextTry > *nextTry) { - *nextTry = BatchItemList_[i].NextTry; - } - } -} - -void TRawBatchRequest::ParseResponse( - const TResponseInfo& requestResult, - const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, - TInstant now) -{ - TNode node = NodeFromYsonString(requestResult.Response); - return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now); -} - -void TRawBatchRequest::ParseResponse( - TNode node, - const TString& requestId, - const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, - TInstant now) -{ - Y_VERIFY(retryBatch); - - EnsureType(node, TNode::List); - auto& responseList = node.AsList(); - const auto size = responseList.size(); - Y_ENSURE(size <= BatchItemList_.size(), - "Size of server response exceeds size of batch request;" - " size of batch: " << BatchItemList_.size() << - " size of server response: " << size << '.'); - - for (size_t i = 0; i != size; ++i) { - try { - EnsureType(responseList[i], TNode::Map); - auto& responseNode = responseList[i].AsMap(); - const auto outputIt = responseNode.find("output"); - if (outputIt != responseNode.end()) { - BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second)); - } else { - const auto errorIt = responseNode.find("error"); - if (errorIt == responseNode.end()) { - BatchItemList_[i].ResponseParser->SetResponse(Nothing()); - } else { - TErrorResponse error(400, requestId); - error.SetError(TYtError(errorIt->second)); - if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) { - YT_LOG_INFO( - "Batch subrequest (%s) failed, will retry, error: %s", - RequestInfo(BatchItemList_[i].Parameters), - error.what()); - retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); - } else { - YT_LOG_ERROR( - "Batch subrequest (%s) failed, error: %s", - RequestInfo(BatchItemList_[i].Parameters), - error.what()); - BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error)); - } - } - } - } catch (const std::exception& e) { - // We don't expect other exceptions, so we don't catch (...) - BatchItemList_[i].ResponseParser->SetException(std::current_exception()); - } - } - BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size); -} - -void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const -{ - for (const auto& batchItem : BatchItemList_) { - batchItem.ResponseParser->SetException(e); - } -} - -size_t TRawBatchRequest::BatchSize() const -{ - return BatchItemList_.size(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NDetail::NRawClient -- cgit v1.3