summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation_preparer.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-13 15:22:36 +0300
committerhiddenpath <[email protected]>2024-12-13 17:04:18 +0300
commit09c88b035d29fac5fd49de2fbc3c71e2d2a80754 (patch)
treea84b5b2de4dcdf85c3b22b9cac7e984aebb8b68d /yt/cpp/mapreduce/client/operation_preparer.cpp
parent615edba542d9394b0eae47ef957ec2257549cfdd (diff)
yt/cpp/mapreduce: move Get, TryGet, Exists, MultisetAttributes to THttpRawClient
commit_hash:bd2228f98fa92de408ca850f9bc1608fdf99e7f5
Diffstat (limited to 'yt/cpp/mapreduce/client/operation_preparer.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp33
1 files changed, 22 insertions, 11 deletions
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index a6d424c5a1d..eb7264425cf 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -142,6 +142,7 @@ TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transac
: Client_(std::move(client))
, TransactionId_(transactionId)
, FileTransaction_(MakeHolder<TPingableTransaction>(
+ Client_->GetRawClient(),
Client_->GetRetryPolicy(),
Client_->GetContext(),
TransactionId_,
@@ -609,6 +610,7 @@ TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) cons
{
TFileWriter writer(
uniquePath,
+ OperationPreparer_.GetClient()->GetRawClient(),
OperationPreparer_.GetClientRetryPolicy(),
OperationPreparer_.GetClient()->GetTransactionPinger(),
OperationPreparer_.GetContext(),
@@ -746,23 +748,29 @@ TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const
void TJobPreparer::UseFileInCypress(const TRichYPath& file)
{
- if (!Exists(
+ auto exists = RequestWithRetry<bool>(
OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- OperationPreparer_.GetContext(),
- file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
- file.Path_))
+ [this, &file] (TMutationId& mutationId) {
+ return RawClient_->Exists(
+ mutationId,
+ file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
+ file.Path_);
+ });
+ if (!exists)
{
ythrow yexception() << "File " << file.Path_ << " does not exist";
}
if (ShouldMountSandbox()) {
- auto size = Get(
+ auto size = RequestWithRetry<i64>(
OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- OperationPreparer_.GetContext(),
- file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
- file.Path_ + "/@uncompressed_data_size")
- .AsInt64();
-
+ [this, &file] (TMutationId& mutationId) {
+ return RawClient_->Get(
+ mutationId,
+ file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
+ file.Path_ + "/@uncompressed_data_size")
+ .AsInt64();
+ });
TotalFileSize_ += RoundUpFileSize(static_cast<ui64>(size));
}
CypressFiles_.push_back(file);
@@ -835,7 +843,10 @@ void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile)
bool TJobPreparer::IsLocalMode() const
{
- return UseLocalModeOptimization(OperationPreparer_.GetContext(), OperationPreparer_.GetClientRetryPolicy());
+ return UseLocalModeOptimization(
+ OperationPreparer_.GetClient()->GetRawClient(),
+ OperationPreparer_.GetContext(),
+ OperationPreparer_.GetClientRetryPolicy());
}
void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState)