aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2025-01-15 20:17:09 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-01-15 21:21:14 +0300
commitbb37d56338bace93813adc51a61936a19acc8edc (patch)
tree3d771660421001c994e2baf9a40b409d06c7a24b /yt/cpp/mapreduce/client
parent9a04c0af35c6a47061b3699089da07acfcf14587 (diff)
downloadydb-bb37d56338bace93813adc51a61936a19acc8edc.tar.gz
YT-23616: Make THttpRawBatchRequest as an implementation of IRawBatchRequest interface
commit_hash:9e6c556686dda1562697762d38da532dc5c87b80
Diffstat (limited to 'yt/cpp/mapreduce/client')
-rw-r--r--yt/cpp/mapreduce/client/batch_request_impl.cpp6
-rw-r--r--yt/cpp/mapreduce/client/batch_request_impl.h8
-rw-r--r--yt/cpp/mapreduce/client/lock.cpp2
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp9
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp14
-rw-r--r--yt/cpp/mapreduce/client/prepare_operation.cpp10
-rw-r--r--yt/cpp/mapreduce/client/prepare_operation.h3
-rw-r--r--yt/cpp/mapreduce/client/skiff.cpp2
-rw-r--r--yt/cpp/mapreduce/client/yt_poller.cpp4
-rw-r--r--yt/cpp/mapreduce/client/yt_poller.h6
10 files changed, 22 insertions, 42 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp
index f45eee319b..e7456b9187 100644
--- a/yt/cpp/mapreduce/client/batch_request_impl.cpp
+++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp
@@ -36,11 +36,11 @@ using ::NThreading::NewPromise;
TBatchRequest::TBatchRequest(const TTransactionId& defaultTransaction, ::TIntrusivePtr<TClient> client)
: DefaultTransaction_(defaultTransaction)
- , Impl_(MakeIntrusive<THttpRawBatchRequest>(client->GetContext().Config))
+ , Impl_(client->GetRawClient()->CreateRawBatchRequest())
, Client_(client)
{ }
-TBatchRequest::TBatchRequest(THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client)
+TBatchRequest::TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client)
: Impl_(impl)
, Client_(std::move(client))
{ }
@@ -189,7 +189,7 @@ TFuture<TCheckPermissionResponse> TBatchRequest::CheckPermission(
void TBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
{
- Impl_->ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), options);
+ Impl_->ExecuteBatch(options);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.h b/yt/cpp/mapreduce/client/batch_request_impl.h
index d53eb196dd..bbd993a925 100644
--- a/yt/cpp/mapreduce/client/batch_request_impl.h
+++ b/yt/cpp/mapreduce/client/batch_request_impl.h
@@ -22,10 +22,6 @@ struct TResponseInfo;
class TClient;
using TClientPtr = ::TIntrusivePtr<TClient>;
-namespace NRawClient {
- class THttpRawBatchRequest;
-}
-
////////////////////////////////////////////////////////////////////////////////
class TBatchRequest
@@ -119,11 +115,11 @@ public:
virtual void ExecuteBatch(const TExecuteBatchOptions& executeBatch) override;
private:
- TBatchRequest(NDetail::NRawClient::THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client);
+ TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client);
private:
TTransactionId DefaultTransaction_;
- ::TIntrusivePtr<NDetail::NRawClient::THttpRawBatchRequest> Impl_;
+ IRawBatchRequestPtr Impl_;
std::unique_ptr<TBatchRequest> TmpWithTransaction_;
::TIntrusivePtr<TClient> Client_;
diff --git a/yt/cpp/mapreduce/client/lock.cpp b/yt/cpp/mapreduce/client/lock.cpp
index 46306fa737..4eecfdb0d5 100644
--- a/yt/cpp/mapreduce/client/lock.cpp
+++ b/yt/cpp/mapreduce/client/lock.cpp
@@ -26,7 +26,7 @@ public:
, Acquired_(acquired)
{ }
- void PrepareRequest(THttpRawBatchRequest* batchRequest) override
+ void PrepareRequest(IRawBatchRequest* batchRequest) override
{
LockState_ = batchRequest->Get(TTransactionId(), LockStateYPath_, TGetOptions());
}
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 8ace8a44bb..5caf302358 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -228,7 +228,7 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
CreateDefaultRequestRetryPolicy(preparer.GetContext().Config),
preparer.GetContext(),
tableList,
- [&] (NRawClient::THttpRawBatchRequest& batch, const auto& table) {
+ [&] (IRawBatchRequest& batch, const auto& table) {
return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
});
@@ -318,7 +318,6 @@ TSimpleOperationIo CreateSimpleOperationIo(
inputs,
outputs,
preparer.GetClient()->GetRawClient(),
- preparer.GetContext(),
preparer.GetClientRetryPolicy(),
preparer.GetTransactionId()),
&inputs,
@@ -497,7 +496,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
structuredInputs,
structuredOutputs,
preparer.GetClient()->GetRawClient(),
- preparer.GetContext(),
preparer.GetClientRetryPolicy(),
preparer.GetTransactionId()),
&structuredInputs,
@@ -1688,7 +1686,6 @@ void ExecuteMapReduce(
structuredInputs,
mapperOutput,
preparer->GetClient()->GetRawClient(),
- preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
&structuredInputs,
@@ -1758,7 +1755,6 @@ void ExecuteMapReduce(
inputs,
outputs,
preparer->GetClient()->GetRawClient(),
- preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
&inputs,
@@ -1824,7 +1820,6 @@ void ExecuteMapReduce(
structuredInputs,
structuredOutputs,
preparer->GetClient()->GetRawClient(),
- preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
&structuredInputs,
@@ -2343,7 +2338,7 @@ public:
: OperationImpl_(std::move(operationImpl))
{ }
- void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override
+ void PrepareRequest(IRawBatchRequest* batchRequest) override
{
auto filter = TOperationAttributeFilter()
.Add(EOperationAttribute::State)
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index ac6ea0f923..1e5f81b5d9 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -42,7 +42,7 @@ public:
, Transaction_(std::move(transaction))
{ }
- void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override
+ void PrepareRequest(IRawBatchRequest* batchRequest) override
{
Future_ = batchRequest->GetOperation(
OperationId_,
@@ -213,26 +213,26 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
TVector<::NThreading::TFuture<TLockId>> lockIdFutures;
lockIdFutures.reserve(paths->size());
- NRawClient::THttpRawBatchRequest lockRequest(GetContext().Config);
+ auto lockRequest = Client_->GetRawClient()->CreateRawBatchRequest();
for (const auto& path : *paths) {
- lockIdFutures.push_back(lockRequest.Lock(
+ lockIdFutures.push_back(lockRequest->Lock(
FileTransaction_->GetId(),
path.Path_,
ELockMode::LM_SNAPSHOT,
TLockOptions().Waitable(true)));
}
- lockRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext());
+ lockRequest->ExecuteBatch();
TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
nodeIdFutures.reserve(paths->size());
- NRawClient::THttpRawBatchRequest getNodeIdRequest(GetContext().Config);
+ auto getNodeIdRequest = Client_->GetRawClient()->CreateRawBatchRequest();
for (const auto& lockIdFuture : lockIdFutures) {
- nodeIdFutures.push_back(getNodeIdRequest.Get(
+ nodeIdFutures.push_back(getNodeIdRequest->Get(
FileTransaction_->GetId(),
::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
TGetOptions()));
}
- getNodeIdRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext());
+ getNodeIdRequest->ExecuteBatch();
for (size_t i = 0; i != paths->size(); ++i) {
auto& richPath = (*paths)[i];
diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp
index 83695ef1e8..31574d7e0e 100644
--- a/yt/cpp/mapreduce/client/prepare_operation.cpp
+++ b/yt/cpp/mapreduce/client/prepare_operation.cpp
@@ -20,11 +20,9 @@ TOperationPreparationContext::TOperationPreparationContext(
const TStructuredJobTableList& structuredInputs,
const TStructuredJobTableList& structuredOutputs,
const IRawClientPtr& rawClient,
- const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId)
: RawClient_(rawClient)
- , Context_(context)
, RetryPolicy_(retryPolicy)
, TransactionId_(transactionId)
, InputSchemas_(structuredInputs.size())
@@ -44,11 +42,9 @@ TOperationPreparationContext::TOperationPreparationContext(
TVector<TRichYPath> inputs,
TVector<TRichYPath> outputs,
const IRawClientPtr& rawClient,
- const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId)
: RawClient_(rawClient)
- , Context_(context)
, RetryPolicy_(retryPolicy)
, TransactionId_(transactionId)
, InputSchemas_(inputs.size())
@@ -77,17 +73,17 @@ int TOperationPreparationContext::GetOutputCount() const
const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() const
{
TVector<::NThreading::TFuture<TNode>> schemaFutures;
- NRawClient::THttpRawBatchRequest batch(Context_.Config);
+ auto batch = RawClient_->CreateRawBatchRequest();
for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
if (InputSchemasLoaded_[tableIndex]) {
schemaFutures.emplace_back();
continue;
}
Y_ABORT_UNLESS(Inputs_[tableIndex]);
- schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
+ schemaFutures.push_back(batch->Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
}
- batch.ExecuteBatch(RetryPolicy_->CreatePolicyForGenericRequest(), Context_);
+ batch->ExecuteBatch();
for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
if (schemaFutures[tableIndex].Initialized()) {
diff --git a/yt/cpp/mapreduce/client/prepare_operation.h b/yt/cpp/mapreduce/client/prepare_operation.h
index 3fde1d1678..7b34703e89 100644
--- a/yt/cpp/mapreduce/client/prepare_operation.h
+++ b/yt/cpp/mapreduce/client/prepare_operation.h
@@ -16,7 +16,6 @@ public:
const TStructuredJobTableList& structuredInputs,
const TStructuredJobTableList& structuredOutputs,
const IRawClientPtr& rawClient,
- const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId);
@@ -24,7 +23,6 @@ public:
TVector<TRichYPath> inputs,
TVector<TRichYPath> outputs,
const IRawClientPtr& rawClient,
- const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId);
@@ -42,7 +40,6 @@ private:
TVector<TMaybe<TRichYPath>> Outputs_;
const IRawClientPtr RawClient_;
- const TClientContext& Context_;
const IClientRetryPolicyPtr RetryPolicy_;
TTransactionId TransactionId_;
diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp
index 499a1308fd..50ef7793ba 100644
--- a/yt/cpp/mapreduce/client/skiff.cpp
+++ b/yt/cpp/mapreduce/client/skiff.cpp
@@ -307,7 +307,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
clientRetryPolicy->CreatePolicyForGenericRequest(),
context,
NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
- [&] (THttpRawBatchRequest& batch, const TRichYPath& path) {
+ [&] (IRawBatchRequest& batch, const TRichYPath& path) {
auto getOptions = TGetOptions()
.AttributeFilter(
TAttributeFilter()
diff --git a/yt/cpp/mapreduce/client/yt_poller.cpp b/yt/cpp/mapreduce/client/yt_poller.cpp
index 13670fddb5..4980d287fa 100644
--- a/yt/cpp/mapreduce/client/yt_poller.cpp
+++ b/yt/cpp/mapreduce/client/yt_poller.cpp
@@ -92,14 +92,14 @@ void TYtPoller::WatchLoop()
Y_ABORT_UNLESS(!InProgress_.empty());
}
- THttpRawBatchRequest rawBatchRequest(Context_.Config);
+ THttpRawBatchRequest rawBatchRequest(Context_, ClientRetryPolicy_->CreatePolicyForGenericRequest());
for (auto& item : InProgress_) {
item->PrepareRequest(&rawBatchRequest);
}
try {
- rawBatchRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_);
+ rawBatchRequest.ExecuteBatch();
} catch (const std::exception& ex) {
YT_LOG_ERROR("Exception while executing batch request: %v", ex.what());
}
diff --git a/yt/cpp/mapreduce/client/yt_poller.h b/yt/cpp/mapreduce/client/yt_poller.h
index fe9979a6a9..c2e6809324 100644
--- a/yt/cpp/mapreduce/client/yt_poller.h
+++ b/yt/cpp/mapreduce/client/yt_poller.h
@@ -15,10 +15,6 @@
namespace NYT {
namespace NDetail {
-namespace NRawClient {
- class THttpRawBatchRequest;
-}
-
////////////////////////////////////////////////////////////////////////////////
class IYtPollerItem
@@ -33,7 +29,7 @@ public:
public:
virtual ~IYtPollerItem() = default;
- virtual void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) = 0;
+ virtual void PrepareRequest(IRawBatchRequest* batchRequest) = 0;
// Should return PollContinue if poller should continue polling this item.
// Should return PollBreak if poller should stop polling this item.