aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/common
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/common
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/common')
-rw-r--r--yt/cpp/mapreduce/common/debug_metrics.cpp62
-rw-r--r--yt/cpp/mapreduce/common/debug_metrics.h22
-rw-r--r--yt/cpp/mapreduce/common/fwd.h11
-rw-r--r--yt/cpp/mapreduce/common/helpers.cpp126
-rw-r--r--yt/cpp/mapreduce/common/helpers.h37
-rw-r--r--yt/cpp/mapreduce/common/node_builder.h4
-rw-r--r--yt/cpp/mapreduce/common/node_visitor.h4
-rw-r--r--yt/cpp/mapreduce/common/retry_lib.cpp267
-rw-r--r--yt/cpp/mapreduce/common/retry_lib.h100
-rw-r--r--yt/cpp/mapreduce/common/wait_proxy.cpp118
-rw-r--r--yt/cpp/mapreduce/common/wait_proxy.h53
-rw-r--r--yt/cpp/mapreduce/common/ya.make23
12 files changed, 827 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/common/debug_metrics.cpp b/yt/cpp/mapreduce/common/debug_metrics.cpp
new file mode 100644
index 0000000000..6235e55f7e
--- /dev/null
+++ b/yt/cpp/mapreduce/common/debug_metrics.cpp
@@ -0,0 +1,62 @@
+#include "debug_metrics.h"
+
+#include <util/generic/hash.h>
+#include <util/generic/singleton.h>
+
+#include <util/string/cast.h>
+#include <util/system/mutex.h>
+
+namespace NYT {
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDebugMetrics {
+public:
+ static TDebugMetrics& Get()
+ {
+ return *Singleton<TDebugMetrics>();
+ }
+
+ void Inc(TStringBuf name)
+ {
+ auto g = Guard(Lock_);
+ auto it = Metrics_.find(name);
+ if (it == Metrics_.end()) {
+ it = Metrics_.emplace(ToString(name), 0).first;
+ }
+ ++it->second;
+ }
+
+ ui64 Get(TStringBuf name) const
+ {
+ auto g = Guard(Lock_);
+ auto it = Metrics_.find(name);
+ if (it == Metrics_.end()) {
+ return 0;
+ } else {
+ return it->second;
+ }
+ }
+
+private:
+ TMutex Lock_;
+ THashMap<TString, ui64> Metrics_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+void IncDebugMetricImpl(TStringBuf name)
+{
+ TDebugMetrics::Get().Inc(name);
+}
+
+ui64 GetDebugMetric(TStringBuf name)
+{
+ return TDebugMetrics::Get().Get(name);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/common/debug_metrics.h b/yt/cpp/mapreduce/common/debug_metrics.h
new file mode 100644
index 0000000000..6ebbc89f72
--- /dev/null
+++ b/yt/cpp/mapreduce/common/debug_metrics.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/config.h>
+
+#include <util/generic/strbuf.h>
+
+namespace NYT {
+namespace NDetail {
+
+void IncDebugMetricImpl(TStringBuf name);
+
+// Helper functions that allows to track various events inside YT library, useful for testing.
+inline void IncDebugMetric(TStringBuf name)
+{
+ if (TConfig::Get()->EnableDebugMetrics) {
+ IncDebugMetricImpl(name);
+ }
+}
+ui64 GetDebugMetric(TStringBuf name);
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/common/fwd.h b/yt/cpp/mapreduce/common/fwd.h
new file mode 100644
index 0000000000..a195e727be
--- /dev/null
+++ b/yt/cpp/mapreduce/common/fwd.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+
+namespace NYT {
+ class IRequestRetryPolicy;
+ using IRequestRetryPolicyPtr = ::TIntrusivePtr<IRequestRetryPolicy>;
+
+ class IClientRetryPolicy;
+ using IClientRetryPolicyPtr = ::TIntrusivePtr<IClientRetryPolicy>;
+}
diff --git a/yt/cpp/mapreduce/common/helpers.cpp b/yt/cpp/mapreduce/common/helpers.cpp
new file mode 100644
index 0000000000..95924d812c
--- /dev/null
+++ b/yt/cpp/mapreduce/common/helpers.cpp
@@ -0,0 +1,126 @@
+#include "helpers.h"
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
+#include <yt/cpp/mapreduce/interface/fluent.h>
+
+#include <library/cpp/yson/node/node_builder.h>
+#include <library/cpp/yson/node/node_visitor.h>
+
+#include <library/cpp/yson/parser.h>
+#include <library/cpp/yson/writer.h>
+
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/json_value.h>
+
+#include <util/stream/input.h>
+#include <util/stream/output.h>
+#include <util/stream/str.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TString NodeListToYsonString(const TNode::TListType& nodes)
+{
+ TStringStream stream;
+ ::NYson::TYsonWriter writer(&stream, NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment);
+ auto list = BuildYsonListFluently(&writer);
+ for (const auto& node : nodes) {
+ list.Item().Value(node);
+ }
+ return stream.Str();
+}
+
+TNode PathToNode(const TRichYPath& path)
+{
+ TNode result;
+ TNodeBuilder builder(&result);
+ Serialize(path, &builder);
+ return result;
+}
+
+TNode PathToParamNode(const TRichYPath& path)
+{
+ return TNode()("path", PathToNode(path));
+}
+
+TString AttributesToYsonString(const TNode& node)
+{
+ return BuildYsonStringFluently().BeginMap()
+ .Item("attributes").Value(node)
+ .EndMap();
+}
+
+TString AttributeFilterToYsonString(const TAttributeFilter& filter)
+{
+ return BuildYsonStringFluently().BeginMap()
+ .Item("attributes").Value(filter)
+ .EndMap();
+}
+
+TNode NodeFromTableSchema(const TTableSchema& schema)
+{
+ TNode result;
+ TNodeBuilder builder(&result);
+ Serialize(schema, &builder);
+ return result;
+}
+
+void MergeNodes(TNode& dst, const TNode& src)
+{
+ if (dst.IsMap() && src.IsMap()) {
+ auto& dstMap = dst.AsMap();
+ const auto& srcMap = src.AsMap();
+ for (const auto& srcItem : srcMap) {
+ const auto& key = srcItem.first;
+ auto dstItem = dstMap.find(key);
+ if (dstItem != dstMap.end()) {
+ MergeNodes(dstItem->second, srcItem.second);
+ } else {
+ dstMap[key] = srcItem.second;
+ }
+ }
+ } else {
+ if (dst.GetType() == src.GetType() && src.HasAttributes()) {
+ auto attributes = dst.GetAttributes();
+ MergeNodes(attributes, src.GetAttributes());
+ dst = src;
+ dst.Attributes() = attributes;
+ } else {
+ dst = src;
+ }
+ }
+}
+
+TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix)
+{
+ if (path.StartsWith("//") || path.StartsWith("#")) {
+ return path;
+ }
+ return pathPrefix + path;
+}
+
+TString GetWriteTableCommand(const TString& apiVersion)
+{
+ return apiVersion == "v2" ? "write" : "write_table";
+}
+
+TString GetReadTableCommand(const TString& apiVersion)
+{
+ return apiVersion == "v2" ? "read" : "read_table";
+}
+
+TString GetWriteFileCommand(const TString& apiVersion)
+{
+ return apiVersion == "v2" ? "upload" : "write_file";
+}
+
+TString GetReadFileCommand(const TString& apiVersion)
+{
+ return apiVersion == "v2" ? "download" : "read_file";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/common/helpers.h b/yt/cpp/mapreduce/common/helpers.h
new file mode 100644
index 0000000000..2174ba820b
--- /dev/null
+++ b/yt/cpp/mapreduce/common/helpers.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <library/cpp/yson/node/node_io.h> // backward compatibility
+
+#include <yt/cpp/mapreduce/interface/node.h>
+#include <yt/cpp/mapreduce/interface/common.h>
+#include <library/cpp/yson/public.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TString NodeListToYsonString(const TNode::TListType& nodes);
+
+TNode PathToNode(const TRichYPath& path);
+TNode PathToParamNode(const TRichYPath& path);
+
+TString AttributesToYsonString(const TNode& attributes);
+
+TString AttributeFilterToYsonString(const TAttributeFilter& filter);
+
+TNode NodeFromTableSchema(const TTableSchema& schema);
+
+void MergeNodes(TNode& dst, const TNode& src);
+
+TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix);
+
+TString GetWriteTableCommand(const TString& apiVersion);
+TString GetReadTableCommand(const TString& apiVersion);
+TString GetWriteFileCommand(const TString& apiVersion);
+TString GetReadFileCommand(const TString& apiVersion);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/common/node_builder.h b/yt/cpp/mapreduce/common/node_builder.h
new file mode 100644
index 0000000000..c7f731cf09
--- /dev/null
+++ b/yt/cpp/mapreduce/common/node_builder.h
@@ -0,0 +1,4 @@
+#pragma once
+
+// Backward compatibility.
+#include <library/cpp/yson/node/node_builder.h>
diff --git a/yt/cpp/mapreduce/common/node_visitor.h b/yt/cpp/mapreduce/common/node_visitor.h
new file mode 100644
index 0000000000..a8bde52b5a
--- /dev/null
+++ b/yt/cpp/mapreduce/common/node_visitor.h
@@ -0,0 +1,4 @@
+#pragma once
+
+// Backward compatibility.
+#include <library/cpp/yson/node/node_visitor.h>
diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp
new file mode 100644
index 0000000000..cf2c021eb4
--- /dev/null
+++ b/yt/cpp/mapreduce/common/retry_lib.cpp
@@ -0,0 +1,267 @@
+#include "retry_lib.h"
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/error_codes.h>
+#include <yt/cpp/mapreduce/interface/retry_policy.h>
+
+#include <util/string/builder.h>
+#include <util/generic/set.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config)
+ : Config_(config)
+ , AttemptLimit_(attemptLimit)
+{ }
+
+void TAttemptLimitedRetryPolicy::NotifyNewAttempt()
+{
+ ++Attempt_;
+}
+
+TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnGenericError(const std::exception& e)
+{
+ if (IsAttemptLimitExceeded()) {
+ return Nothing();
+ }
+ return GetBackoffDuration(e, Config_);
+}
+
+TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnRetriableError(const TErrorResponse& e)
+{
+ if (IsAttemptLimitExceeded()) {
+ return Nothing();
+ }
+ return GetBackoffDuration(e, Config_);
+}
+
+void TAttemptLimitedRetryPolicy::OnIgnoredError(const TErrorResponse& /*e*/)
+{
+ --Attempt_;
+}
+
+TString TAttemptLimitedRetryPolicy::GetAttemptDescription() const
+{
+ return ::TStringBuilder() << "attempt " << Attempt_ << " of " << AttemptLimit_;
+}
+
+bool TAttemptLimitedRetryPolicy::IsAttemptLimitExceeded() const
+{
+ return Attempt_ >= AttemptLimit_;
+}
+////////////////////////////////////////////////////////////////////////////////
+
+class TTimeLimitedRetryPolicy
+ : public IRequestRetryPolicy
+{
+public:
+ TTimeLimitedRetryPolicy(IRequestRetryPolicyPtr retryPolicy, TDuration timeout)
+ : RetryPolicy_(retryPolicy)
+ , Deadline_(TInstant::Now() + timeout)
+ , Timeout_(timeout)
+ { }
+ void NotifyNewAttempt() override
+ {
+ if (TInstant::Now() >= Deadline_) {
+ ythrow TRequestRetriesTimeout() << "retry timeout exceeded (timeout: " << Timeout_ << ")";
+ }
+ RetryPolicy_->NotifyNewAttempt();
+ }
+
+ TMaybe<TDuration> OnGenericError(const std::exception& e) override
+ {
+ return RetryPolicy_->OnGenericError(e);
+ }
+
+ TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
+ {
+ return RetryPolicy_->OnRetriableError(e);
+ }
+
+ void OnIgnoredError(const TErrorResponse& e) override
+ {
+ return RetryPolicy_->OnIgnoredError(e);
+ }
+
+ TString GetAttemptDescription() const override
+ {
+ return RetryPolicy_->GetAttemptDescription();
+ }
+
+private:
+ const IRequestRetryPolicyPtr RetryPolicy_;
+ const TInstant Deadline_;
+ const TDuration Timeout_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDefaultClientRetryPolicy
+ : public IClientRetryPolicy
+{
+public:
+ explicit TDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config)
+ : RetryConfigProvider_(std::move(retryConfigProvider))
+ , Config_(config)
+ { }
+
+ IRequestRetryPolicyPtr CreatePolicyForGenericRequest() override
+ {
+ return Wrap(CreateDefaultRequestRetryPolicy(Config_));
+ }
+
+ IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() override
+ {
+ return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_));
+ }
+
+ IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy)
+ {
+ auto config = RetryConfigProvider_->CreateRetryConfig();
+ if (config.RetriesTimeLimit < TDuration::Max()) {
+ return ::MakeIntrusive<TTimeLimitedRetryPolicy>(std::move(basePolicy), config.RetriesTimeLimit);
+ }
+ return basePolicy;
+ }
+
+private:
+ IRetryConfigProviderPtr RetryConfigProvider_;
+ const TConfigPtr Config_;
+};
+
+class TDefaultRetryConfigProvider
+ : public IRetryConfigProvider
+{
+public:
+ TRetryConfig CreateRetryConfig() override
+ {
+ return {};
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config)
+{
+ return MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(config->RetryCount), config);
+}
+
+IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config)
+{
+ return MakeIntrusive<TDefaultClientRetryPolicy>(std::move(retryConfigProvider), config);
+}
+IRetryConfigProviderPtr CreateDefaultRetryConfigProvider()
+{
+ return MakeIntrusive<TDefaultRetryConfigProvider>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static bool IsChunkError(int code)
+{
+ return code / 100 == 7;
+}
+
+// Check whether:
+// 1) codes contain at least one chunk error AND
+// 2) codes don't contain non-retriable chunk errors.
+static bool IsRetriableChunkError(const TSet<int>& codes)
+{
+ using namespace NClusterErrorCodes;
+ auto isChunkError = false;
+ for (auto code : codes) {
+ switch (code) {
+ case NChunkClient::SessionAlreadyExists:
+ case NChunkClient::ChunkAlreadyExists:
+ case NChunkClient::WindowError:
+ case NChunkClient::BlockContentMismatch:
+ case NChunkClient::InvalidBlockChecksum:
+ case NChunkClient::BlockOutOfRange:
+ case NChunkClient::MissingExtension:
+ case NChunkClient::NoSuchBlock:
+ case NChunkClient::NoSuchChunk:
+ case NChunkClient::NoSuchChunkList:
+ case NChunkClient::NoSuchChunkTree:
+ case NChunkClient::NoSuchChunkView:
+ case NChunkClient::NoSuchMedium:
+ return false;
+ default:
+ isChunkError |= IsChunkError(code);
+ break;
+ }
+ }
+ return isChunkError;
+}
+
+static TMaybe<TDuration> TryGetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config)
+{
+ int httpCode = errorResponse.GetHttpCode();
+ if (httpCode / 100 != 4 && !errorResponse.IsFromTrailers()) {
+ return config->RetryInterval;
+ }
+
+ auto allCodes = errorResponse.GetError().GetAllErrorCodes();
+ using namespace NClusterErrorCodes;
+ if (httpCode == 429
+ || allCodes.count(NSecurityClient::RequestQueueSizeLimitExceeded)
+ || allCodes.count(NRpc::RequestQueueSizeLimitExceeded))
+ {
+ // request rate limit exceeded
+ return config->RateLimitExceededRetryInterval;
+ }
+ if (errorResponse.IsConcurrentOperationsLimitReached()) {
+ // limit for the number of concurrent operations exceeded
+ return config->StartOperationRetryInterval;
+ }
+ if (IsRetriableChunkError(allCodes)) {
+ // chunk client errors
+ return config->ChunkErrorsRetryInterval;
+ }
+ for (auto code : TVector<int>{
+ NRpc::TransportError,
+ NRpc::Unavailable,
+ NApi::RetriableArchiveError,
+ Canceled,
+ }) {
+ if (allCodes.contains(code)) {
+ return config->RetryInterval;
+ }
+ }
+ return Nothing();
+}
+
+TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config)
+{
+ return TryGetBackoffDuration(errorResponse, config).GetOrElse(config->RetryInterval);
+}
+
+bool IsRetriable(const TErrorResponse& errorResponse)
+{
+ // Retriability of an error doesn't depend on config, so just use global one.
+ return TryGetBackoffDuration(errorResponse, TConfig::Get()).Defined();
+}
+
+bool IsRetriable(const std::exception& ex)
+{
+ if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) {
+ return false;
+ }
+ return true;
+}
+
+TDuration GetBackoffDuration(const std::exception& /*error*/, const TConfigPtr& config)
+{
+ return GetBackoffDuration(config);
+}
+
+TDuration GetBackoffDuration(const TConfigPtr& config)
+{
+ return config->RetryInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/common/retry_lib.h b/yt/cpp/mapreduce/common/retry_lib.h
new file mode 100644
index 0000000000..c6c061f614
--- /dev/null
+++ b/yt/cpp/mapreduce/common/retry_lib.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <yt/cpp/mapreduce/interface/fwd.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// IRequestRetryPolicy class controls retries of single request.
+class IRequestRetryPolicy
+ : public virtual TThrRefBase
+{
+public:
+ // Helper function that returns text description of current attempt, e.g.
+ // "attempt 3 / 10"
+ // used in logs.
+ virtual TString GetAttemptDescription() const = 0;
+
+ // Library code calls this function before any request attempt.
+ virtual void NotifyNewAttempt() = 0;
+
+ // OnRetriableError is called whenever client gets YT error that can be retried (e.g. operation limit exceeded).
+ // OnGenericError is called whenever request failed due to generic error like network error.
+ //
+ // Both methods must return nothing if policy doesn't want to retry this error.
+ // Otherwise method should return backoff time.
+ virtual TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) = 0;
+ virtual TMaybe<TDuration> OnGenericError(const std::exception& e) = 0;
+
+ // OnIgnoredError is called whenever client gets an error but is going to ignore it.
+ virtual void OnIgnoredError(const TErrorResponse& /*e*/) = 0;
+};
+using IRequestRetryPolicyPtr = ::TIntrusivePtr<IRequestRetryPolicy>;
+
+////////////////////////////////////////////////////////////////////////////////
+
+// IClientRetryPolicy controls creation of policies for individual requests.
+class IClientRetryPolicy
+ : public virtual TThrRefBase
+{
+public:
+ virtual IRequestRetryPolicyPtr CreatePolicyForGenericRequest() = 0;
+ virtual IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() = 0;
+};
+
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAttemptLimitedRetryPolicy
+ : public IRequestRetryPolicy
+{
+public:
+ explicit TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config);
+
+ void NotifyNewAttempt() override;
+
+ TMaybe<TDuration> OnGenericError(const std::exception& e) override;
+ TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override;
+ void OnIgnoredError(const TErrorResponse& e) override;
+ TString GetAttemptDescription() const override;
+
+ bool IsAttemptLimitExceeded() const;
+
+protected:
+ const TConfigPtr Config_;
+
+private:
+ const ui32 AttemptLimit_;
+ ui32 Attempt_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config);
+IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config);
+IRetryConfigProviderPtr CreateDefaultRetryConfigProvider();
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Check if error returned by YT can be retried
+bool IsRetriable(const TErrorResponse& errorResponse);
+bool IsRetriable(const std::exception& ex);
+
+// Get backoff duration for errors returned by YT.
+TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config);
+
+// Get backoff duration for errors that are not TErrorResponse.
+TDuration GetBackoffDuration(const std::exception& error, const TConfigPtr& config);
+TDuration GetBackoffDuration(const TConfigPtr& config);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/common/wait_proxy.cpp b/yt/cpp/mapreduce/common/wait_proxy.cpp
new file mode 100644
index 0000000000..3db034a098
--- /dev/null
+++ b/yt/cpp/mapreduce/common/wait_proxy.cpp
@@ -0,0 +1,118 @@
+#include "wait_proxy.h"
+
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/system/event.h>
+#include <util/system/condvar.h>
+
+namespace NYT {
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool TDefaultWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TDuration timeout)
+{
+ return future.Wait(timeout);
+}
+
+bool TDefaultWaitProxy::WaitEvent(TSystemEvent& event, TDuration timeout)
+{
+ return event.WaitT(timeout);
+}
+
+bool TDefaultWaitProxy::WaitCondVar(TCondVar &condVar, TMutex &mutex, TDuration timeout)
+{
+ return condVar.WaitT(mutex, timeout);
+}
+
+void TDefaultWaitProxy::Sleep(TDuration timeout)
+{
+ ::Sleep(timeout);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TWaitProxy::TWaitProxy()
+ : Proxy_(::MakeIntrusive<TDefaultWaitProxy>())
+{ }
+
+TWaitProxy* TWaitProxy::Get()
+{
+ return Singleton<TWaitProxy>();
+}
+
+void TWaitProxy::SetProxy(::TIntrusivePtr<IWaitProxy> proxy)
+{
+ Proxy_ = std::move(proxy);
+}
+
+bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future)
+{
+ return Proxy_->WaitFuture(future, TDuration::Max());
+}
+
+bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TInstant deadLine)
+{
+ return Proxy_->WaitFuture(future, deadLine - TInstant::Now());
+}
+
+bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TDuration timeout)
+{
+ return Proxy_->WaitFuture(future, timeout);
+}
+
+bool TWaitProxy::WaitEventD(TSystemEvent& event, TInstant deadLine)
+{
+ return Proxy_->WaitEvent(event, deadLine - TInstant::Now());
+}
+
+bool TWaitProxy::WaitEventT(TSystemEvent& event, TDuration timeout)
+{
+ return Proxy_->WaitEvent(event, timeout);
+}
+
+void TWaitProxy::WaitEventI(TSystemEvent& event)
+{
+ Proxy_->WaitEvent(event, TDuration::Max());
+}
+
+bool TWaitProxy::WaitEvent(TSystemEvent& event)
+{
+ return Proxy_->WaitEvent(event, TDuration::Max());
+}
+
+bool TWaitProxy::WaitCondVarD(TCondVar& condVar, TMutex& m, TInstant deadLine)
+{
+ return Proxy_->WaitCondVar(condVar, m, deadLine - TInstant::Now());
+}
+
+bool TWaitProxy::WaitCondVarT(TCondVar& condVar, TMutex& m, TDuration timeOut)
+{
+ return Proxy_->WaitCondVar(condVar, m, timeOut);
+}
+
+void TWaitProxy::WaitCondVarI(TCondVar& condVar, TMutex& m)
+{
+ Proxy_->WaitCondVar(condVar, m, TDuration::Max());
+}
+
+void TWaitProxy::WaitCondVar(TCondVar& condVar, TMutex& m)
+{
+ Proxy_->WaitCondVar(condVar, m, TDuration::Max());
+}
+
+void TWaitProxy::Sleep(TDuration timeout)
+{
+ Proxy_->Sleep(timeout);
+}
+
+void TWaitProxy::SleepUntil(TInstant instant)
+{
+ Proxy_->Sleep(instant - TInstant::Now());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/common/wait_proxy.h b/yt/cpp/mapreduce/common/wait_proxy.h
new file mode 100644
index 0000000000..e7c944cf24
--- /dev/null
+++ b/yt/cpp/mapreduce/common/wait_proxy.h
@@ -0,0 +1,53 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/wait_proxy.h>
+
+namespace NYT {
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDefaultWaitProxy
+ : public IWaitProxy
+{
+public:
+ bool WaitFuture(const ::NThreading::TFuture<void>& future, TDuration timeout) override;
+ bool WaitEvent(TSystemEvent& event, TDuration timeout) override;
+ bool WaitCondVar(TCondVar& condVar, TMutex& mutex, TDuration timeout) override;
+ void Sleep(TDuration timeout) override;
+};
+
+class TWaitProxy {
+public:
+ TWaitProxy();
+
+ static TWaitProxy* Get();
+
+ // NB: Non thread-safe, should be called only in initialization code.
+ void SetProxy(::TIntrusivePtr<IWaitProxy> proxy);
+
+ bool WaitFuture(const ::NThreading::TFuture<void>& future);
+ bool WaitFuture(const ::NThreading::TFuture<void>& future, TInstant deadLine);
+ bool WaitFuture(const ::NThreading::TFuture<void>& future, TDuration timeout);
+
+ bool WaitEventD(TSystemEvent& event, TInstant deadLine);
+ bool WaitEventT(TSystemEvent& event, TDuration timeout);
+ void WaitEventI(TSystemEvent& event);
+ bool WaitEvent(TSystemEvent& event);
+
+ bool WaitCondVarD(TCondVar& condVar, TMutex& m, TInstant deadLine);
+ bool WaitCondVarT(TCondVar& condVar, TMutex& m, TDuration timeOut);
+ void WaitCondVarI(TCondVar& condVar, TMutex& m);
+ void WaitCondVar(TCondVar& condVar, TMutex& m);
+
+ void Sleep(TDuration timeout);
+ void SleepUntil(TInstant instant);
+
+private:
+ ::TIntrusivePtr<IWaitProxy> Proxy_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/common/ya.make b/yt/cpp/mapreduce/common/ya.make
new file mode 100644
index 0000000000..004708cb44
--- /dev/null
+++ b/yt/cpp/mapreduce/common/ya.make
@@ -0,0 +1,23 @@
+LIBRARY()
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+SRCS(
+ debug_metrics.cpp
+ helpers.cpp
+ retry_lib.cpp
+ wait_proxy.cpp
+)
+
+PEERDIR(
+ library/cpp/json
+ library/cpp/svnversion
+ library/cpp/threading/future
+ library/cpp/yson
+ library/cpp/yson/json
+ library/cpp/yson/node
+ yt/cpp/mapreduce/interface
+ yt/cpp/mapreduce/interface/logging
+)
+
+END()