summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation_helpers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'yt/cpp/mapreduce/client/operation_helpers.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation_helpers.cpp41
1 files changed, 27 insertions, 14 deletions
diff --git a/yt/cpp/mapreduce/client/operation_helpers.cpp b/yt/cpp/mapreduce/client/operation_helpers.cpp
index abb21856622..f307efaffea 100644
--- a/yt/cpp/mapreduce/client/operation_helpers.cpp
+++ b/yt/cpp/mapreduce/client/operation_helpers.cpp
@@ -2,15 +2,17 @@
#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/requests.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
-#include <yt/cpp/mapreduce/http/context.h>
-#include <yt/cpp/mapreduce/http/requests.h>
-
#include <util/string/builder.h>
#include <util/system/mutex.h>
@@ -26,7 +28,10 @@ ui64 RoundUpFileSize(ui64 size)
return (size + roundUpTo - 1) & ~(roundUpTo - 1);
}
-bool UseLocalModeOptimization(const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy)
+bool UseLocalModeOptimization(
+ const IRawClientPtr& rawClient,
+ const TClientContext& context,
+ const IClientRetryPolicyPtr& clientRetryPolicy)
{
if (!context.Config->EnableLocalModeOptimization) {
return false;
@@ -47,18 +52,26 @@ bool UseLocalModeOptimization(const TClientContext& context, const IClientRetryP
TString localModeAttr("//sys/@local_mode_fqdn");
// We don't want to pollute logs with errors about failed request,
// so we check if path exists before getting it.
- if (NRawClient::Exists(clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- TTransactionId(),
- localModeAttr,
- TExistsOptions().ReadFrom(EMasterReadKind::Cache)))
+ auto exists = RequestWithRetry<bool>(
+ clientRetryPolicy->CreatePolicyForGenericRequest(),
+ [&rawClient, &localModeAttr] (TMutationId& mutationId) {
+ return rawClient->Exists(
+ mutationId,
+ TTransactionId(),
+ localModeAttr,
+ TExistsOptions().ReadFrom(EMasterReadKind::Cache));
+ });
+ if (exists)
{
- auto fqdnNode = NRawClient::TryGet(
+ auto fqdnNode = RequestWithRetry<TNode>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- TTransactionId(),
- localModeAttr,
- TGetOptions().ReadFrom(EMasterReadKind::Cache));
+ [&rawClient, &localModeAttr] (TMutationId& mutationId) {
+ return rawClient->TryGet(
+ mutationId,
+ TTransactionId(),
+ localModeAttr,
+ TGetOptions().ReadFrom(EMasterReadKind::Cache));
+ });
if (!fqdnNode.IsUndefined()) {
auto fqdn = fqdnNode.AsString();
isLocalMode = (fqdn == TProcessState::Get()->FqdnHostName);