diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 20:17:09 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 21:21:14 +0300 |
commit | bb37d56338bace93813adc51a61936a19acc8edc (patch) | |
tree | 3d771660421001c994e2baf9a40b409d06c7a24b /yt/cpp | |
parent | 9a04c0af35c6a47061b3699089da07acfcf14587 (diff) | |
download | ydb-bb37d56338bace93813adc51a61936a19acc8edc.tar.gz |
YT-23616: Make THttpRawBatchRequest as an implementation of IRawBatchRequest interface
commit_hash:9e6c556686dda1562697762d38da532dc5c87b80
Diffstat (limited to 'yt/cpp')
20 files changed, 260 insertions, 107 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp index f45eee319b..e7456b9187 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.cpp +++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp @@ -36,11 +36,11 @@ using ::NThreading::NewPromise; TBatchRequest::TBatchRequest(const TTransactionId& defaultTransaction, ::TIntrusivePtr<TClient> client) : DefaultTransaction_(defaultTransaction) - , Impl_(MakeIntrusive<THttpRawBatchRequest>(client->GetContext().Config)) + , Impl_(client->GetRawClient()->CreateRawBatchRequest()) , Client_(client) { } -TBatchRequest::TBatchRequest(THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client) +TBatchRequest::TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client) : Impl_(impl) , Client_(std::move(client)) { } @@ -189,7 +189,7 @@ TFuture<TCheckPermissionResponse> TBatchRequest::CheckPermission( void TBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options) { - Impl_->ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), options); + Impl_->ExecuteBatch(options); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/batch_request_impl.h b/yt/cpp/mapreduce/client/batch_request_impl.h index d53eb196dd..bbd993a925 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.h +++ b/yt/cpp/mapreduce/client/batch_request_impl.h @@ -22,10 +22,6 @@ struct TResponseInfo; class TClient; using TClientPtr = ::TIntrusivePtr<TClient>; -namespace NRawClient { - class THttpRawBatchRequest; -} - //////////////////////////////////////////////////////////////////////////////// class TBatchRequest @@ -119,11 +115,11 @@ public: virtual void ExecuteBatch(const TExecuteBatchOptions& executeBatch) override; private: - TBatchRequest(NDetail::NRawClient::THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client); + TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client); private: TTransactionId DefaultTransaction_; - ::TIntrusivePtr<NDetail::NRawClient::THttpRawBatchRequest> Impl_; + IRawBatchRequestPtr Impl_; std::unique_ptr<TBatchRequest> TmpWithTransaction_; ::TIntrusivePtr<TClient> Client_; diff --git a/yt/cpp/mapreduce/client/lock.cpp b/yt/cpp/mapreduce/client/lock.cpp index 46306fa737..4eecfdb0d5 100644 --- a/yt/cpp/mapreduce/client/lock.cpp +++ b/yt/cpp/mapreduce/client/lock.cpp @@ -26,7 +26,7 @@ public: , Acquired_(acquired) { } - void PrepareRequest(THttpRawBatchRequest* batchRequest) override + void PrepareRequest(IRawBatchRequest* batchRequest) override { LockState_ = batchRequest->Get(TTransactionId(), LockStateYPath_, TGetOptions()); } diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 8ace8a44bb..5caf302358 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -228,7 +228,7 @@ TStructuredJobTableList ApplyProtobufColumnFilters( CreateDefaultRequestRetryPolicy(preparer.GetContext().Config), preparer.GetContext(), tableList, - [&] (NRawClient::THttpRawBatchRequest& batch, const auto& table) { + [&] (IRawBatchRequest& batch, const auto& table) { return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions()); }); @@ -318,7 +318,6 @@ TSimpleOperationIo CreateSimpleOperationIo( inputs, outputs, preparer.GetClient()->GetRawClient(), - preparer.GetContext(), preparer.GetClientRetryPolicy(), preparer.GetTransactionId()), &inputs, @@ -497,7 +496,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( structuredInputs, structuredOutputs, preparer.GetClient()->GetRawClient(), - preparer.GetContext(), preparer.GetClientRetryPolicy(), preparer.GetTransactionId()), &structuredInputs, @@ -1688,7 +1686,6 @@ void ExecuteMapReduce( structuredInputs, mapperOutput, preparer->GetClient()->GetRawClient(), - preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), &structuredInputs, @@ -1758,7 +1755,6 @@ void ExecuteMapReduce( inputs, outputs, preparer->GetClient()->GetRawClient(), - preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), &inputs, @@ -1824,7 +1820,6 @@ void ExecuteMapReduce( structuredInputs, structuredOutputs, preparer->GetClient()->GetRawClient(), - preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), &structuredInputs, @@ -2343,7 +2338,7 @@ public: : OperationImpl_(std::move(operationImpl)) { } - void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override + void PrepareRequest(IRawBatchRequest* batchRequest) override { auto filter = TOperationAttributeFilter() .Add(EOperationAttribute::State) diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index ac6ea0f923..1e5f81b5d9 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -42,7 +42,7 @@ public: , Transaction_(std::move(transaction)) { } - void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override + void PrepareRequest(IRawBatchRequest* batchRequest) override { Future_ = batchRequest->GetOperation( OperationId_, @@ -213,26 +213,26 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths) TVector<::NThreading::TFuture<TLockId>> lockIdFutures; lockIdFutures.reserve(paths->size()); - NRawClient::THttpRawBatchRequest lockRequest(GetContext().Config); + auto lockRequest = Client_->GetRawClient()->CreateRawBatchRequest(); for (const auto& path : *paths) { - lockIdFutures.push_back(lockRequest.Lock( + lockIdFutures.push_back(lockRequest->Lock( FileTransaction_->GetId(), path.Path_, ELockMode::LM_SNAPSHOT, TLockOptions().Waitable(true))); } - lockRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext()); + lockRequest->ExecuteBatch(); TVector<::NThreading::TFuture<TNode>> nodeIdFutures; nodeIdFutures.reserve(paths->size()); - NRawClient::THttpRawBatchRequest getNodeIdRequest(GetContext().Config); + auto getNodeIdRequest = Client_->GetRawClient()->CreateRawBatchRequest(); for (const auto& lockIdFuture : lockIdFutures) { - nodeIdFutures.push_back(getNodeIdRequest.Get( + nodeIdFutures.push_back(getNodeIdRequest->Get( FileTransaction_->GetId(), ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id", TGetOptions())); } - getNodeIdRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext()); + getNodeIdRequest->ExecuteBatch(); for (size_t i = 0; i != paths->size(); ++i) { auto& richPath = (*paths)[i]; diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp index 83695ef1e8..31574d7e0e 100644 --- a/yt/cpp/mapreduce/client/prepare_operation.cpp +++ b/yt/cpp/mapreduce/client/prepare_operation.cpp @@ -20,11 +20,9 @@ TOperationPreparationContext::TOperationPreparationContext( const TStructuredJobTableList& structuredInputs, const TStructuredJobTableList& structuredOutputs, const IRawClientPtr& rawClient, - const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId) : RawClient_(rawClient) - , Context_(context) , RetryPolicy_(retryPolicy) , TransactionId_(transactionId) , InputSchemas_(structuredInputs.size()) @@ -44,11 +42,9 @@ TOperationPreparationContext::TOperationPreparationContext( TVector<TRichYPath> inputs, TVector<TRichYPath> outputs, const IRawClientPtr& rawClient, - const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId) : RawClient_(rawClient) - , Context_(context) , RetryPolicy_(retryPolicy) , TransactionId_(transactionId) , InputSchemas_(inputs.size()) @@ -77,17 +73,17 @@ int TOperationPreparationContext::GetOutputCount() const const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() const { TVector<::NThreading::TFuture<TNode>> schemaFutures; - NRawClient::THttpRawBatchRequest batch(Context_.Config); + auto batch = RawClient_->CreateRawBatchRequest(); for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) { if (InputSchemasLoaded_[tableIndex]) { schemaFutures.emplace_back(); continue; } Y_ABORT_UNLESS(Inputs_[tableIndex]); - schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{})); + schemaFutures.push_back(batch->Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{})); } - batch.ExecuteBatch(RetryPolicy_->CreatePolicyForGenericRequest(), Context_); + batch->ExecuteBatch(); for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) { if (schemaFutures[tableIndex].Initialized()) { diff --git a/yt/cpp/mapreduce/client/prepare_operation.h b/yt/cpp/mapreduce/client/prepare_operation.h index 3fde1d1678..7b34703e89 100644 --- a/yt/cpp/mapreduce/client/prepare_operation.h +++ b/yt/cpp/mapreduce/client/prepare_operation.h @@ -16,7 +16,6 @@ public: const TStructuredJobTableList& structuredInputs, const TStructuredJobTableList& structuredOutputs, const IRawClientPtr& rawClient, - const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId); @@ -24,7 +23,6 @@ public: TVector<TRichYPath> inputs, TVector<TRichYPath> outputs, const IRawClientPtr& rawClient, - const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId); @@ -42,7 +40,6 @@ private: TVector<TMaybe<TRichYPath>> Outputs_; const IRawClientPtr RawClient_; - const TClientContext& Context_; const IClientRetryPolicyPtr RetryPolicy_; TTransactionId TransactionId_; diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp index 499a1308fd..50ef7793ba 100644 --- a/yt/cpp/mapreduce/client/skiff.cpp +++ b/yt/cpp/mapreduce/client/skiff.cpp @@ -307,7 +307,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( clientRetryPolicy->CreatePolicyForGenericRequest(), context, NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths), - [&] (THttpRawBatchRequest& batch, const TRichYPath& path) { + [&] (IRawBatchRequest& batch, const TRichYPath& path) { auto getOptions = TGetOptions() .AttributeFilter( TAttributeFilter() diff --git a/yt/cpp/mapreduce/client/yt_poller.cpp b/yt/cpp/mapreduce/client/yt_poller.cpp index 13670fddb5..4980d287fa 100644 --- a/yt/cpp/mapreduce/client/yt_poller.cpp +++ b/yt/cpp/mapreduce/client/yt_poller.cpp @@ -92,14 +92,14 @@ void TYtPoller::WatchLoop() Y_ABORT_UNLESS(!InProgress_.empty()); } - THttpRawBatchRequest rawBatchRequest(Context_.Config); + THttpRawBatchRequest rawBatchRequest(Context_, ClientRetryPolicy_->CreatePolicyForGenericRequest()); for (auto& item : InProgress_) { item->PrepareRequest(&rawBatchRequest); } try { - rawBatchRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_); + rawBatchRequest.ExecuteBatch(); } catch (const std::exception& ex) { YT_LOG_ERROR("Exception while executing batch request: %v", ex.what()); } diff --git a/yt/cpp/mapreduce/client/yt_poller.h b/yt/cpp/mapreduce/client/yt_poller.h index fe9979a6a9..c2e6809324 100644 --- a/yt/cpp/mapreduce/client/yt_poller.h +++ b/yt/cpp/mapreduce/client/yt_poller.h @@ -15,10 +15,6 @@ namespace NYT { namespace NDetail { -namespace NRawClient { - class THttpRawBatchRequest; -} - //////////////////////////////////////////////////////////////////////////////// class IYtPollerItem @@ -33,7 +29,7 @@ public: public: virtual ~IYtPollerItem() = default; - virtual void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) = 0; + virtual void PrepareRequest(IRawBatchRequest* batchRequest) = 0; // Should return PollContinue if poller should continue polling this item. // Should return PollBreak if poller should stop polling this item. diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h index 162a6ee3e9..9dcdf3789a 100644 --- a/yt/cpp/mapreduce/interface/fwd.h +++ b/yt/cpp/mapreduce/interface/fwd.h @@ -399,6 +399,13 @@ namespace NYT { using IRetryConfigProviderPtr = ::TIntrusivePtr<IRetryConfigProvider>; //////////////////////////////////////////////////////////////////////////////// + // raw_batch_request.h + //////////////////////////////////////////////////////////////////////////////// + + class IRawBatchRequest; + using IRawBatchRequestPtr = ::TIntrusivePtr<IRawBatchRequest>; + + //////////////////////////////////////////////////////////////////////////////// // raw_client.h //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/interface/raw_batch_request.h b/yt/cpp/mapreduce/interface/raw_batch_request.h new file mode 100644 index 0000000000..ec7b2b542c --- /dev/null +++ b/yt/cpp/mapreduce/interface/raw_batch_request.h @@ -0,0 +1,134 @@ +#pragma once + +#include "client_method_options.h" +#include "fwd.h" +#include "operation.h" + +#include <library/cpp/threading/future/core/future.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class IRawBatchRequest + : public virtual TThrRefBase +{ +public: + virtual void ExecuteBatch(const TExecuteBatchOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TNodeId> Create( + const TTransactionId& transaction, + const TYPath& path, + ENodeType type, + const TCreateOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<void> Remove( + const TTransactionId& transaction, + const TYPath& path, + const TRemoveOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<bool> Exists( + const TTransactionId& transaction, + const TYPath& path, + const TExistsOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TNode> Get( + const TTransactionId& transaction, + const TYPath& path, + const TGetOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<void> Set( + const TTransactionId& transaction, + const TYPath& path, + const TNode& value, + const TSetOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TNode::TListType> List( + const TTransactionId& transaction, + const TYPath& path, + const TListOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TNodeId> Copy( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TNodeId> Move( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TNodeId> Link( + const TTransactionId& transaction, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TLockId> Lock( + const TTransactionId& transaction, + const TYPath& path, + ELockMode mode, + const TLockOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<void> Unlock( + const TTransactionId& transaction, + const TYPath& path, + const TUnlockOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache( + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TYPath> PutFileToCache( + const TTransactionId& transactionId, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TCheckPermissionResponse> CheckPermission( + const TString& user, + EPermission permission, + const TYPath& path, + const TCheckPermissionOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TOperationAttributes> GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId) = 0; + + virtual ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId) = 0; + + virtual ::NThreading::TFuture<void> SuspendOperation( + const TOperationId& operationId, + const TSuspendOperationOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<void> ResumeOperation( + const TOperationId& operationId, + const TResumeOperationOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<void> UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path) = 0; + + virtual ::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options = {}) = 0; + + virtual ::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options = {}) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 1c140d1dd9..441e738287 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -327,6 +327,10 @@ public: const TGetTablePartitionsOptions& options = {}) = 0; virtual ui64 GenerateTimestamp() = 0; + + // Batch + + virtual IRawBatchRequestPtr CreateRawBatchRequest() = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp index 1a340b8c17..08fee25b79 100644 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp @@ -285,16 +285,14 @@ THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInsta //////////////////////////////////////////////////////////////////////////////// -THttpRawBatchRequest::THttpRawBatchRequest(const TConfigPtr& config) - : Config_(config) +THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy) + : Context_(context) + , RequestRetryPolicy_(std::move(retryPolicy)) { } THttpRawBatchRequest::~THttpRawBatchRequest() = default; -void THttpRawBatchRequest::ExecuteBatch( - IRequestRetryPolicyPtr retryPolicy, - const TClientContext& context, - const TExecuteBatchOptions& options) +void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options) { if (IsExecuted()) { ythrow yexception() << "Cannot execute batch request since it is already executed"; @@ -306,8 +304,8 @@ void THttpRawBatchRequest::ExecuteBatch( const auto concurrency = options.Concurrency_.GetOrElse(50); const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); - if (!retryPolicy) { - retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); + if (!RequestRetryPolicy_) { + RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config); } while (BatchSize()) { @@ -324,9 +322,9 @@ void THttpRawBatchRequest::ExecuteBatch( TResponseInfo result; try { result = RequestWithRetry<TResponseInfo>( - retryPolicy, - [&context, &header, &body] (TMutationId& mutationId) { - auto response = RequestWithoutRetry(context, mutationId, header, body); + RequestRetryPolicy_, + [this, &header, &body] (TMutationId& mutationId) { + auto response = RequestWithoutRetry(Context_, mutationId, header, body); return TResponseInfo{ .RequestId = response->GetRequestId(), .Response = response->GetResponse(), @@ -337,7 +335,7 @@ void THttpRawBatchRequest::ExecuteBatch( SetErrorResult(std::current_exception()); throw; } - ParseResponse(std::move(result), retryPolicy.Get()); + ParseResponse(std::move(result), RequestRetryPolicy_.Get()); } } @@ -392,7 +390,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Create( { return AddRequest<TGuidResponseParser>( "create", - SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options), + SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options), Nothing()); } @@ -403,7 +401,7 @@ TFuture<void> THttpRawBatchRequest::Remove( { return AddRequest<TVoidResponseParser>( "remove", - SerializeParamsForRemove(transaction, Config_->Prefix, path, options), + SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -414,7 +412,7 @@ TFuture<bool> THttpRawBatchRequest::Exists( { return AddRequest<TExistsResponseParser>( "exists", - SerializeParamsForExists(transaction, Config_->Prefix, path, options), + SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -425,7 +423,7 @@ TFuture<TNode> THttpRawBatchRequest::Get( { return AddRequest<TGetResponseParser>( "get", - SerializeParamsForGet(transaction, Config_->Prefix, path, options), + SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -437,7 +435,7 @@ TFuture<void> THttpRawBatchRequest::Set( { return AddRequest<TVoidResponseParser>( "set", - SerializeParamsForSet(transaction, Config_->Prefix, path, options), + SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options), node); } @@ -448,7 +446,7 @@ TFuture<TNode::TListType> THttpRawBatchRequest::List( { return AddRequest<TListResponseParser>( "list", - SerializeParamsForList(transaction, Config_->Prefix, path, options), + SerializeParamsForList(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -460,7 +458,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Copy( { return AddRequest<TGuidResponseParser>( "copy", - SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options), + SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options), Nothing()); } @@ -472,7 +470,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Move( { return AddRequest<TGuidResponseParser>( "move", - SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options), + SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options), Nothing()); } @@ -484,7 +482,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Link( { return AddRequest<TGuidResponseParser>( "link", - SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options), + SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options), Nothing()); } @@ -496,7 +494,7 @@ TFuture<TLockId> THttpRawBatchRequest::Lock( { return AddRequest<TGuidResponseParser>( "lock", - SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options), + SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options), Nothing()); } @@ -507,7 +505,7 @@ TFuture<void> THttpRawBatchRequest::Unlock( { return AddRequest<TVoidResponseParser>( "unlock", - SerializeParamsForUnlock(transaction, Config_->Prefix, path, options), + SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -532,7 +530,7 @@ TFuture<TYPath> THttpRawBatchRequest::PutFileToCache( { return AddRequest<TYPathParser>( "put_file_to_cache", - SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options), + SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options), Nothing()); } @@ -544,7 +542,7 @@ TFuture<TCheckPermissionResponse> THttpRawBatchRequest::CheckPermission( { return AddRequest<TCheckPermissionParser>( "check_permission", - SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options), + SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options), Nothing()); } @@ -607,7 +605,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path) TRichYPath result = path; // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath. if (!result.Path_.StartsWith("<")) { - result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix); + result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix); } if (result.Path_.find_first_of("<>{}[]") != TString::npos) { @@ -615,7 +613,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path) "parse_ypath", SerializeParamsForParseYPath(result), Nothing(), - MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, result)); + MakeIntrusive<TCanonizeYPathResponseParser>(Context_.Config->Prefix, result)); } return NThreading::MakeFuture(result); } diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.h b/yt/cpp/mapreduce/raw_client/raw_batch_request.h index 670b97a843..2af0e31305 100644 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.h +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.h @@ -2,13 +2,14 @@ #include <yt/cpp/mapreduce/common/fwd.h> -#include <yt/cpp/mapreduce/interface/batch_request.h> +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/requests.h> + #include <yt/cpp/mapreduce/interface/fwd.h> #include <yt/cpp/mapreduce/interface/node.h> +#include <yt/cpp/mapreduce/interface/raw_batch_request.h> #include <yt/cpp/mapreduce/interface/retry_policy.h> -#include <yt/cpp/mapreduce/http/requests.h> - #include <library/cpp/threading/future/future.h> #include <util/generic/ptr.h> @@ -25,7 +26,7 @@ namespace NYT::NDetail::NRawClient { //////////////////////////////////////////////////////////////////////////////// class THttpRawBatchRequest - : public TThrRefBase + : public IRawBatchRequest { public: struct IResponseItemParser @@ -38,13 +39,10 @@ public: }; public: - THttpRawBatchRequest(const TConfigPtr& config); + THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy); ~THttpRawBatchRequest(); - void ExecuteBatch( - IRequestRetryPolicyPtr retryPolicy, - const TClientContext& context, - const TExecuteBatchOptions& options = {}); + void ExecuteBatch(const TExecuteBatchOptions& options = {}) override; bool IsExecuted() const; void MarkExecuted(); @@ -68,91 +66,113 @@ public: const TTransactionId& transaction, const TYPath& path, ENodeType type, - const TCreateOptions& options); + const TCreateOptions& options = {}) override; + ::NThreading::TFuture<void> Remove( const TTransactionId& transaction, const TYPath& path, - const TRemoveOptions& options); + const TRemoveOptions& options = {}) override; + ::NThreading::TFuture<bool> Exists( const TTransactionId& transaction, const TYPath& path, - const TExistsOptions& options); + const TExistsOptions& options = {}) override; + ::NThreading::TFuture<TNode> Get( const TTransactionId& transaction, const TYPath& path, - const TGetOptions& options); + const TGetOptions& options = {}) override; + ::NThreading::TFuture<void> Set( const TTransactionId& transaction, const TYPath& path, const TNode& value, - const TSetOptions& options); + const TSetOptions& options = {}) override; + ::NThreading::TFuture<TNode::TListType> List( const TTransactionId& transaction, const TYPath& path, - const TListOptions& options); + const TListOptions& options = {}) override; + ::NThreading::TFuture<TNodeId> Copy( const TTransactionId& transaction, const TYPath& sourcePath, const TYPath& destinationPath, - const TCopyOptions& options); + const TCopyOptions& options = {}) override; + ::NThreading::TFuture<TNodeId> Move( const TTransactionId& transaction, const TYPath& sourcePath, const TYPath& destinationPath, - const TMoveOptions& options); + const TMoveOptions& options = {}) override; + ::NThreading::TFuture<TNodeId> Link( const TTransactionId& transaction, const TYPath& targetPath, const TYPath& linkPath, - const TLinkOptions& options); + const TLinkOptions& options = {}) override; + ::NThreading::TFuture<TLockId> Lock( const TTransactionId& transaction, const TYPath& path, ELockMode mode, - const TLockOptions& options); + const TLockOptions& options = {}) override; + ::NThreading::TFuture<void> Unlock( const TTransactionId& transaction, const TYPath& path, - const TUnlockOptions& options); + const TUnlockOptions& options = {}) override; + ::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, const TYPath& cachePath, - const TGetFileFromCacheOptions& options); + const TGetFileFromCacheOptions& options = {}) override; + ::NThreading::TFuture<TYPath> PutFileToCache( const TTransactionId& transactionId, const TYPath& filePath, const TString& md5Signature, const TYPath& cachePath, - const TPutFileToCacheOptions& options); + const TPutFileToCacheOptions& options = {}) override; + ::NThreading::TFuture<TCheckPermissionResponse> CheckPermission( const TString& user, EPermission permission, const TYPath& path, - const TCheckPermissionOptions& options); + const TCheckPermissionOptions& options = {}) override; + ::NThreading::TFuture<TOperationAttributes> GetOperation( const TOperationId& operationId, - const TGetOperationOptions& options); - ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId); - ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId); + const TGetOperationOptions& options = {}) override; + + ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId) override; + + ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId) override; + ::NThreading::TFuture<void> SuspendOperation( const TOperationId& operationId, - const TSuspendOperationOptions& options); + const TSuspendOperationOptions& options = {}) override; + ::NThreading::TFuture<void> ResumeOperation( const TOperationId& operationId, - const TResumeOperationOptions& options); + const TResumeOperationOptions& options = {}) override; + ::NThreading::TFuture<void> UpdateOperationParameters( const TOperationId& operationId, - const TUpdateOperationParametersOptions& options); - ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path); + const TUpdateOperationParametersOptions& options = {}) override; + + ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path) override; + ::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics( const TTransactionId& transaction, const TVector<TRichYPath>& paths, - const TGetTableColumnarStatisticsOptions& options); + const TGetTableColumnarStatisticsOptions& options = {}) override; + ::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions( const TTransactionId& transaction, const TVector<TRichYPath>& paths, - const TGetTablePartitionsOptions& options); + const TGetTablePartitionsOptions& options = {}) override; private: struct TBatchItem { @@ -182,7 +202,9 @@ private: void AddRequest(TBatchItem batchItem); private: - TConfigPtr Config_; + const TClientContext Context_; + + IRequestRetryPolicyPtr RequestRetryPolicy_; TDeque<TBatchItem> BatchItemList_; bool Executed_ = false; diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 53d2be114a..f25fe5a4ee 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -11,6 +11,7 @@ #include <yt/cpp/mapreduce/http/retry_request.h> #include <yt/cpp/mapreduce/interface/fluent.h> +#include <yt/cpp/mapreduce/interface/fwd.h> #include <yt/cpp/mapreduce/interface/operation.h> #include <yt/cpp/mapreduce/interface/tvm.h> @@ -925,6 +926,11 @@ ui64 THttpRawClient::GenerateTimestamp() return NodeFromYsonString(responseInfo->GetResponse()).AsUint64(); } +IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest() +{ + return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index e18e32a92e..f0e8378db3 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -335,6 +335,8 @@ public: ui64 GenerateTimestamp() override; + IRawBatchRequestPtr CreateRawBatchRequest() override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index fd5c59ec20..6ba8a3f084 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -265,13 +265,13 @@ TVector<TRichYPath> CanonizeYPaths( const TClientContext& context, const TVector<TRichYPath>& paths) { - THttpRawBatchRequest batch(context.Config); + THttpRawBatchRequest batch(context, retryPolicy); TVector<NThreading::TFuture<TRichYPath>> futures; futures.reserve(paths.size()); for (int i = 0; i < static_cast<int>(paths.size()); ++i) { futures.push_back(batch.CanonizeYPath(paths[i])); } - batch.ExecuteBatch(retryPolicy, context); + batch.ExecuteBatch(); TVector<TRichYPath> result; result.reserve(futures.size()); for (auto& future : futures) { diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 860d4ae939..9b37293b12 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -59,13 +59,13 @@ auto BatchTransform( TBatchAdder batchAdder, const TExecuteBatchOptions& executeBatchOptions = {}) { - THttpRawBatchRequest batch(context.Config); + THttpRawBatchRequest batch(context, retryPolicy); using TFuture = decltype(batchAdder(batch, *std::begin(src))); TVector<TFuture> futures; for (const auto& el : src) { futures.push_back(batchAdder(batch, el)); } - batch.ExecuteBatch(retryPolicy, context, executeBatchOptions); + batch.ExecuteBatch(executeBatchOptions); using TDst = decltype(futures[0].ExtractValueSync()); TVector<TDst> result; result.reserve(std::size(src)); diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp index 36a6935e82..1871ee80fb 100644 --- a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp +++ b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp @@ -73,7 +73,7 @@ TVector<TString> GetAllPathsFromRequestList(const TNode& requestList) TEST(TBatchRequestImplTest, ParseResponse) { TClientContext context; - THttpRawBatchRequest batchRequest(context.Config); + THttpRawBatchRequest batchRequest(context, /*retryPolicy*/ nullptr); EXPECT_EQ(batchRequest.BatchSize(), 0u); |