diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 20:17:09 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 21:21:14 +0300 |
commit | bb37d56338bace93813adc51a61936a19acc8edc (patch) | |
tree | 3d771660421001c994e2baf9a40b409d06c7a24b /yt/cpp/mapreduce/client | |
parent | 9a04c0af35c6a47061b3699089da07acfcf14587 (diff) | |
download | ydb-bb37d56338bace93813adc51a61936a19acc8edc.tar.gz |
YT-23616: Make THttpRawBatchRequest as an implementation of IRawBatchRequest interface
commit_hash:9e6c556686dda1562697762d38da532dc5c87b80
Diffstat (limited to 'yt/cpp/mapreduce/client')
-rw-r--r-- | yt/cpp/mapreduce/client/batch_request_impl.cpp | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/batch_request_impl.h | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/lock.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.cpp | 14 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/prepare_operation.cpp | 10 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/prepare_operation.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/skiff.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/yt_poller.cpp | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/yt_poller.h | 6 |
10 files changed, 22 insertions, 42 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp index f45eee319b..e7456b9187 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.cpp +++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp @@ -36,11 +36,11 @@ using ::NThreading::NewPromise; TBatchRequest::TBatchRequest(const TTransactionId& defaultTransaction, ::TIntrusivePtr<TClient> client) : DefaultTransaction_(defaultTransaction) - , Impl_(MakeIntrusive<THttpRawBatchRequest>(client->GetContext().Config)) + , Impl_(client->GetRawClient()->CreateRawBatchRequest()) , Client_(client) { } -TBatchRequest::TBatchRequest(THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client) +TBatchRequest::TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client) : Impl_(impl) , Client_(std::move(client)) { } @@ -189,7 +189,7 @@ TFuture<TCheckPermissionResponse> TBatchRequest::CheckPermission( void TBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options) { - Impl_->ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), options); + Impl_->ExecuteBatch(options); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/batch_request_impl.h b/yt/cpp/mapreduce/client/batch_request_impl.h index d53eb196dd..bbd993a925 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.h +++ b/yt/cpp/mapreduce/client/batch_request_impl.h @@ -22,10 +22,6 @@ struct TResponseInfo; class TClient; using TClientPtr = ::TIntrusivePtr<TClient>; -namespace NRawClient { - class THttpRawBatchRequest; -} - //////////////////////////////////////////////////////////////////////////////// class TBatchRequest @@ -119,11 +115,11 @@ public: virtual void ExecuteBatch(const TExecuteBatchOptions& executeBatch) override; private: - TBatchRequest(NDetail::NRawClient::THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client); + TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client); private: TTransactionId DefaultTransaction_; - ::TIntrusivePtr<NDetail::NRawClient::THttpRawBatchRequest> Impl_; + IRawBatchRequestPtr Impl_; std::unique_ptr<TBatchRequest> TmpWithTransaction_; ::TIntrusivePtr<TClient> Client_; diff --git a/yt/cpp/mapreduce/client/lock.cpp b/yt/cpp/mapreduce/client/lock.cpp index 46306fa737..4eecfdb0d5 100644 --- a/yt/cpp/mapreduce/client/lock.cpp +++ b/yt/cpp/mapreduce/client/lock.cpp @@ -26,7 +26,7 @@ public: , Acquired_(acquired) { } - void PrepareRequest(THttpRawBatchRequest* batchRequest) override + void PrepareRequest(IRawBatchRequest* batchRequest) override { LockState_ = batchRequest->Get(TTransactionId(), LockStateYPath_, TGetOptions()); } diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 8ace8a44bb..5caf302358 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -228,7 +228,7 @@ TStructuredJobTableList ApplyProtobufColumnFilters( CreateDefaultRequestRetryPolicy(preparer.GetContext().Config), preparer.GetContext(), tableList, - [&] (NRawClient::THttpRawBatchRequest& batch, const auto& table) { + [&] (IRawBatchRequest& batch, const auto& table) { return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions()); }); @@ -318,7 +318,6 @@ TSimpleOperationIo CreateSimpleOperationIo( inputs, outputs, preparer.GetClient()->GetRawClient(), - preparer.GetContext(), preparer.GetClientRetryPolicy(), preparer.GetTransactionId()), &inputs, @@ -497,7 +496,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( structuredInputs, structuredOutputs, preparer.GetClient()->GetRawClient(), - preparer.GetContext(), preparer.GetClientRetryPolicy(), preparer.GetTransactionId()), &structuredInputs, @@ -1688,7 +1686,6 @@ void ExecuteMapReduce( structuredInputs, mapperOutput, preparer->GetClient()->GetRawClient(), - preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), &structuredInputs, @@ -1758,7 +1755,6 @@ void ExecuteMapReduce( inputs, outputs, preparer->GetClient()->GetRawClient(), - preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), &inputs, @@ -1824,7 +1820,6 @@ void ExecuteMapReduce( structuredInputs, structuredOutputs, preparer->GetClient()->GetRawClient(), - preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), &structuredInputs, @@ -2343,7 +2338,7 @@ public: : OperationImpl_(std::move(operationImpl)) { } - void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override + void PrepareRequest(IRawBatchRequest* batchRequest) override { auto filter = TOperationAttributeFilter() .Add(EOperationAttribute::State) diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index ac6ea0f923..1e5f81b5d9 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -42,7 +42,7 @@ public: , Transaction_(std::move(transaction)) { } - void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override + void PrepareRequest(IRawBatchRequest* batchRequest) override { Future_ = batchRequest->GetOperation( OperationId_, @@ -213,26 +213,26 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths) TVector<::NThreading::TFuture<TLockId>> lockIdFutures; lockIdFutures.reserve(paths->size()); - NRawClient::THttpRawBatchRequest lockRequest(GetContext().Config); + auto lockRequest = Client_->GetRawClient()->CreateRawBatchRequest(); for (const auto& path : *paths) { - lockIdFutures.push_back(lockRequest.Lock( + lockIdFutures.push_back(lockRequest->Lock( FileTransaction_->GetId(), path.Path_, ELockMode::LM_SNAPSHOT, TLockOptions().Waitable(true))); } - lockRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext()); + lockRequest->ExecuteBatch(); TVector<::NThreading::TFuture<TNode>> nodeIdFutures; nodeIdFutures.reserve(paths->size()); - NRawClient::THttpRawBatchRequest getNodeIdRequest(GetContext().Config); + auto getNodeIdRequest = Client_->GetRawClient()->CreateRawBatchRequest(); for (const auto& lockIdFuture : lockIdFutures) { - nodeIdFutures.push_back(getNodeIdRequest.Get( + nodeIdFutures.push_back(getNodeIdRequest->Get( FileTransaction_->GetId(), ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id", TGetOptions())); } - getNodeIdRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext()); + getNodeIdRequest->ExecuteBatch(); for (size_t i = 0; i != paths->size(); ++i) { auto& richPath = (*paths)[i]; diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp index 83695ef1e8..31574d7e0e 100644 --- a/yt/cpp/mapreduce/client/prepare_operation.cpp +++ b/yt/cpp/mapreduce/client/prepare_operation.cpp @@ -20,11 +20,9 @@ TOperationPreparationContext::TOperationPreparationContext( const TStructuredJobTableList& structuredInputs, const TStructuredJobTableList& structuredOutputs, const IRawClientPtr& rawClient, - const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId) : RawClient_(rawClient) - , Context_(context) , RetryPolicy_(retryPolicy) , TransactionId_(transactionId) , InputSchemas_(structuredInputs.size()) @@ -44,11 +42,9 @@ TOperationPreparationContext::TOperationPreparationContext( TVector<TRichYPath> inputs, TVector<TRichYPath> outputs, const IRawClientPtr& rawClient, - const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId) : RawClient_(rawClient) - , Context_(context) , RetryPolicy_(retryPolicy) , TransactionId_(transactionId) , InputSchemas_(inputs.size()) @@ -77,17 +73,17 @@ int TOperationPreparationContext::GetOutputCount() const const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() const { TVector<::NThreading::TFuture<TNode>> schemaFutures; - NRawClient::THttpRawBatchRequest batch(Context_.Config); + auto batch = RawClient_->CreateRawBatchRequest(); for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) { if (InputSchemasLoaded_[tableIndex]) { schemaFutures.emplace_back(); continue; } Y_ABORT_UNLESS(Inputs_[tableIndex]); - schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{})); + schemaFutures.push_back(batch->Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{})); } - batch.ExecuteBatch(RetryPolicy_->CreatePolicyForGenericRequest(), Context_); + batch->ExecuteBatch(); for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) { if (schemaFutures[tableIndex].Initialized()) { diff --git a/yt/cpp/mapreduce/client/prepare_operation.h b/yt/cpp/mapreduce/client/prepare_operation.h index 3fde1d1678..7b34703e89 100644 --- a/yt/cpp/mapreduce/client/prepare_operation.h +++ b/yt/cpp/mapreduce/client/prepare_operation.h @@ -16,7 +16,6 @@ public: const TStructuredJobTableList& structuredInputs, const TStructuredJobTableList& structuredOutputs, const IRawClientPtr& rawClient, - const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId); @@ -24,7 +23,6 @@ public: TVector<TRichYPath> inputs, TVector<TRichYPath> outputs, const IRawClientPtr& rawClient, - const TClientContext& context, const IClientRetryPolicyPtr& retryPolicy, TTransactionId transactionId); @@ -42,7 +40,6 @@ private: TVector<TMaybe<TRichYPath>> Outputs_; const IRawClientPtr RawClient_; - const TClientContext& Context_; const IClientRetryPolicyPtr RetryPolicy_; TTransactionId TransactionId_; diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp index 499a1308fd..50ef7793ba 100644 --- a/yt/cpp/mapreduce/client/skiff.cpp +++ b/yt/cpp/mapreduce/client/skiff.cpp @@ -307,7 +307,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( clientRetryPolicy->CreatePolicyForGenericRequest(), context, NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths), - [&] (THttpRawBatchRequest& batch, const TRichYPath& path) { + [&] (IRawBatchRequest& batch, const TRichYPath& path) { auto getOptions = TGetOptions() .AttributeFilter( TAttributeFilter() diff --git a/yt/cpp/mapreduce/client/yt_poller.cpp b/yt/cpp/mapreduce/client/yt_poller.cpp index 13670fddb5..4980d287fa 100644 --- a/yt/cpp/mapreduce/client/yt_poller.cpp +++ b/yt/cpp/mapreduce/client/yt_poller.cpp @@ -92,14 +92,14 @@ void TYtPoller::WatchLoop() Y_ABORT_UNLESS(!InProgress_.empty()); } - THttpRawBatchRequest rawBatchRequest(Context_.Config); + THttpRawBatchRequest rawBatchRequest(Context_, ClientRetryPolicy_->CreatePolicyForGenericRequest()); for (auto& item : InProgress_) { item->PrepareRequest(&rawBatchRequest); } try { - rawBatchRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_); + rawBatchRequest.ExecuteBatch(); } catch (const std::exception& ex) { YT_LOG_ERROR("Exception while executing batch request: %v", ex.what()); } diff --git a/yt/cpp/mapreduce/client/yt_poller.h b/yt/cpp/mapreduce/client/yt_poller.h index fe9979a6a9..c2e6809324 100644 --- a/yt/cpp/mapreduce/client/yt_poller.h +++ b/yt/cpp/mapreduce/client/yt_poller.h @@ -15,10 +15,6 @@ namespace NYT { namespace NDetail { -namespace NRawClient { - class THttpRawBatchRequest; -} - //////////////////////////////////////////////////////////////////////////////// class IYtPollerItem @@ -33,7 +29,7 @@ public: public: virtual ~IYtPollerItem() = default; - virtual void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) = 0; + virtual void PrepareRequest(IRawBatchRequest* batchRequest) = 0; // Should return PollContinue if poller should continue polling this item. // Should return PollBreak if poller should stop polling this item. |