diff options
author | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/common | |
parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
download | ydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/common')
-rw-r--r-- | yt/cpp/mapreduce/common/CMakeLists.darwin-x86_64.txt | 31 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/CMakeLists.linux-aarch64.txt | 32 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/CMakeLists.linux-x86_64.txt | 32 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/CMakeLists.txt | 17 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/CMakeLists.windows-x86_64.txt | 28 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/debug_metrics.cpp | 62 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/debug_metrics.h | 22 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/fwd.h | 11 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/helpers.cpp | 126 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/helpers.h | 37 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/node_builder.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/node_visitor.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.cpp | 267 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.h | 100 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/wait_proxy.cpp | 118 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/wait_proxy.h | 53 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/ya.make | 23 |
17 files changed, 967 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/common/CMakeLists.darwin-x86_64.txt b/yt/cpp/mapreduce/common/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..5955826356c --- /dev/null +++ b/yt/cpp/mapreduce/common/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-mapreduce-common) +target_compile_options(cpp-mapreduce-common PRIVATE + -Wdeprecated-this-capture +) +target_link_libraries(cpp-mapreduce-common PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-json + library-cpp-svnversion + cpp-threading-future + library-cpp-yson + cpp-yson-json + cpp-yson-node + cpp-mapreduce-interface + mapreduce-interface-logging +) +target_sources(cpp-mapreduce-common PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/debug_metrics.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/retry_lib.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/wait_proxy.cpp +) diff --git a/yt/cpp/mapreduce/common/CMakeLists.linux-aarch64.txt b/yt/cpp/mapreduce/common/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..9b588d74bd5 --- /dev/null +++ b/yt/cpp/mapreduce/common/CMakeLists.linux-aarch64.txt @@ -0,0 +1,32 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-mapreduce-common) +target_compile_options(cpp-mapreduce-common PRIVATE + -Wdeprecated-this-capture +) +target_link_libraries(cpp-mapreduce-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-json + library-cpp-svnversion + cpp-threading-future + library-cpp-yson + cpp-yson-json + cpp-yson-node + cpp-mapreduce-interface + mapreduce-interface-logging +) +target_sources(cpp-mapreduce-common PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/debug_metrics.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/retry_lib.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/wait_proxy.cpp +) diff --git a/yt/cpp/mapreduce/common/CMakeLists.linux-x86_64.txt b/yt/cpp/mapreduce/common/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..9b588d74bd5 --- /dev/null +++ b/yt/cpp/mapreduce/common/CMakeLists.linux-x86_64.txt @@ -0,0 +1,32 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-mapreduce-common) +target_compile_options(cpp-mapreduce-common PRIVATE + -Wdeprecated-this-capture +) +target_link_libraries(cpp-mapreduce-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-json + library-cpp-svnversion + cpp-threading-future + library-cpp-yson + cpp-yson-json + cpp-yson-node + cpp-mapreduce-interface + mapreduce-interface-logging +) +target_sources(cpp-mapreduce-common PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/debug_metrics.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/retry_lib.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/wait_proxy.cpp +) diff --git a/yt/cpp/mapreduce/common/CMakeLists.txt b/yt/cpp/mapreduce/common/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/yt/cpp/mapreduce/common/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/yt/cpp/mapreduce/common/CMakeLists.windows-x86_64.txt b/yt/cpp/mapreduce/common/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..dbe9c55c3da --- /dev/null +++ b/yt/cpp/mapreduce/common/CMakeLists.windows-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-mapreduce-common) +target_link_libraries(cpp-mapreduce-common PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-json + library-cpp-svnversion + cpp-threading-future + library-cpp-yson + cpp-yson-json + cpp-yson-node + cpp-mapreduce-interface + mapreduce-interface-logging +) +target_sources(cpp-mapreduce-common PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/debug_metrics.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/retry_lib.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/common/wait_proxy.cpp +) diff --git a/yt/cpp/mapreduce/common/debug_metrics.cpp b/yt/cpp/mapreduce/common/debug_metrics.cpp new file mode 100644 index 00000000000..6235e55f7e6 --- /dev/null +++ b/yt/cpp/mapreduce/common/debug_metrics.cpp @@ -0,0 +1,62 @@ +#include "debug_metrics.h" + +#include <util/generic/hash.h> +#include <util/generic/singleton.h> + +#include <util/string/cast.h> +#include <util/system/mutex.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class TDebugMetrics { +public: + static TDebugMetrics& Get() + { + return *Singleton<TDebugMetrics>(); + } + + void Inc(TStringBuf name) + { + auto g = Guard(Lock_); + auto it = Metrics_.find(name); + if (it == Metrics_.end()) { + it = Metrics_.emplace(ToString(name), 0).first; + } + ++it->second; + } + + ui64 Get(TStringBuf name) const + { + auto g = Guard(Lock_); + auto it = Metrics_.find(name); + if (it == Metrics_.end()) { + return 0; + } else { + return it->second; + } + } + +private: + TMutex Lock_; + THashMap<TString, ui64> Metrics_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +void IncDebugMetricImpl(TStringBuf name) +{ + TDebugMetrics::Get().Inc(name); +} + +ui64 GetDebugMetric(TStringBuf name) +{ + return TDebugMetrics::Get().Get(name); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/debug_metrics.h b/yt/cpp/mapreduce/common/debug_metrics.h new file mode 100644 index 00000000000..6ebbc89f72c --- /dev/null +++ b/yt/cpp/mapreduce/common/debug_metrics.h @@ -0,0 +1,22 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <util/generic/strbuf.h> + +namespace NYT { +namespace NDetail { + +void IncDebugMetricImpl(TStringBuf name); + +// Helper functions that allows to track various events inside YT library, useful for testing. +inline void IncDebugMetric(TStringBuf name) +{ + if (TConfig::Get()->EnableDebugMetrics) { + IncDebugMetricImpl(name); + } +} +ui64 GetDebugMetric(TStringBuf name); + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/fwd.h b/yt/cpp/mapreduce/common/fwd.h new file mode 100644 index 00000000000..a195e727be4 --- /dev/null +++ b/yt/cpp/mapreduce/common/fwd.h @@ -0,0 +1,11 @@ +#pragma once + +#include <util/generic/fwd.h> + +namespace NYT { + class IRequestRetryPolicy; + using IRequestRetryPolicyPtr = ::TIntrusivePtr<IRequestRetryPolicy>; + + class IClientRetryPolicy; + using IClientRetryPolicyPtr = ::TIntrusivePtr<IClientRetryPolicy>; +} diff --git a/yt/cpp/mapreduce/common/helpers.cpp b/yt/cpp/mapreduce/common/helpers.cpp new file mode 100644 index 00000000000..95924d812c9 --- /dev/null +++ b/yt/cpp/mapreduce/common/helpers.cpp @@ -0,0 +1,126 @@ +#include "helpers.h" + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/serialize.h> +#include <yt/cpp/mapreduce/interface/fluent.h> + +#include <library/cpp/yson/node/node_builder.h> +#include <library/cpp/yson/node/node_visitor.h> + +#include <library/cpp/yson/parser.h> +#include <library/cpp/yson/writer.h> + +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_value.h> + +#include <util/stream/input.h> +#include <util/stream/output.h> +#include <util/stream/str.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TString NodeListToYsonString(const TNode::TListType& nodes) +{ + TStringStream stream; + ::NYson::TYsonWriter writer(&stream, NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); + auto list = BuildYsonListFluently(&writer); + for (const auto& node : nodes) { + list.Item().Value(node); + } + return stream.Str(); +} + +TNode PathToNode(const TRichYPath& path) +{ + TNode result; + TNodeBuilder builder(&result); + Serialize(path, &builder); + return result; +} + +TNode PathToParamNode(const TRichYPath& path) +{ + return TNode()("path", PathToNode(path)); +} + +TString AttributesToYsonString(const TNode& node) +{ + return BuildYsonStringFluently().BeginMap() + .Item("attributes").Value(node) + .EndMap(); +} + +TString AttributeFilterToYsonString(const TAttributeFilter& filter) +{ + return BuildYsonStringFluently().BeginMap() + .Item("attributes").Value(filter) + .EndMap(); +} + +TNode NodeFromTableSchema(const TTableSchema& schema) +{ + TNode result; + TNodeBuilder builder(&result); + Serialize(schema, &builder); + return result; +} + +void MergeNodes(TNode& dst, const TNode& src) +{ + if (dst.IsMap() && src.IsMap()) { + auto& dstMap = dst.AsMap(); + const auto& srcMap = src.AsMap(); + for (const auto& srcItem : srcMap) { + const auto& key = srcItem.first; + auto dstItem = dstMap.find(key); + if (dstItem != dstMap.end()) { + MergeNodes(dstItem->second, srcItem.second); + } else { + dstMap[key] = srcItem.second; + } + } + } else { + if (dst.GetType() == src.GetType() && src.HasAttributes()) { + auto attributes = dst.GetAttributes(); + MergeNodes(attributes, src.GetAttributes()); + dst = src; + dst.Attributes() = attributes; + } else { + dst = src; + } + } +} + +TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix) +{ + if (path.StartsWith("//") || path.StartsWith("#")) { + return path; + } + return pathPrefix + path; +} + +TString GetWriteTableCommand(const TString& apiVersion) +{ + return apiVersion == "v2" ? "write" : "write_table"; +} + +TString GetReadTableCommand(const TString& apiVersion) +{ + return apiVersion == "v2" ? "read" : "read_table"; +} + +TString GetWriteFileCommand(const TString& apiVersion) +{ + return apiVersion == "v2" ? "upload" : "write_file"; +} + +TString GetReadFileCommand(const TString& apiVersion) +{ + return apiVersion == "v2" ? "download" : "read_file"; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/helpers.h b/yt/cpp/mapreduce/common/helpers.h new file mode 100644 index 00000000000..2174ba820b5 --- /dev/null +++ b/yt/cpp/mapreduce/common/helpers.h @@ -0,0 +1,37 @@ +#pragma once + +#include "fwd.h" + +#include <library/cpp/yson/node/node_io.h> // backward compatibility + +#include <yt/cpp/mapreduce/interface/node.h> +#include <yt/cpp/mapreduce/interface/common.h> +#include <library/cpp/yson/public.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TString NodeListToYsonString(const TNode::TListType& nodes); + +TNode PathToNode(const TRichYPath& path); +TNode PathToParamNode(const TRichYPath& path); + +TString AttributesToYsonString(const TNode& attributes); + +TString AttributeFilterToYsonString(const TAttributeFilter& filter); + +TNode NodeFromTableSchema(const TTableSchema& schema); + +void MergeNodes(TNode& dst, const TNode& src); + +TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix); + +TString GetWriteTableCommand(const TString& apiVersion); +TString GetReadTableCommand(const TString& apiVersion); +TString GetWriteFileCommand(const TString& apiVersion); +TString GetReadFileCommand(const TString& apiVersion); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/node_builder.h b/yt/cpp/mapreduce/common/node_builder.h new file mode 100644 index 00000000000..c7f731cf09c --- /dev/null +++ b/yt/cpp/mapreduce/common/node_builder.h @@ -0,0 +1,4 @@ +#pragma once + +// Backward compatibility. +#include <library/cpp/yson/node/node_builder.h> diff --git a/yt/cpp/mapreduce/common/node_visitor.h b/yt/cpp/mapreduce/common/node_visitor.h new file mode 100644 index 00000000000..a8bde52b5a5 --- /dev/null +++ b/yt/cpp/mapreduce/common/node_visitor.h @@ -0,0 +1,4 @@ +#pragma once + +// Backward compatibility. +#include <library/cpp/yson/node/node_visitor.h> diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp new file mode 100644 index 00000000000..cf2c021eb44 --- /dev/null +++ b/yt/cpp/mapreduce/common/retry_lib.cpp @@ -0,0 +1,267 @@ +#include "retry_lib.h" + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> +#include <yt/cpp/mapreduce/interface/retry_policy.h> + +#include <util/string/builder.h> +#include <util/generic/set.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config) + : Config_(config) + , AttemptLimit_(attemptLimit) +{ } + +void TAttemptLimitedRetryPolicy::NotifyNewAttempt() +{ + ++Attempt_; +} + +TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnGenericError(const std::exception& e) +{ + if (IsAttemptLimitExceeded()) { + return Nothing(); + } + return GetBackoffDuration(e, Config_); +} + +TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnRetriableError(const TErrorResponse& e) +{ + if (IsAttemptLimitExceeded()) { + return Nothing(); + } + return GetBackoffDuration(e, Config_); +} + +void TAttemptLimitedRetryPolicy::OnIgnoredError(const TErrorResponse& /*e*/) +{ + --Attempt_; +} + +TString TAttemptLimitedRetryPolicy::GetAttemptDescription() const +{ + return ::TStringBuilder() << "attempt " << Attempt_ << " of " << AttemptLimit_; +} + +bool TAttemptLimitedRetryPolicy::IsAttemptLimitExceeded() const +{ + return Attempt_ >= AttemptLimit_; +} +//////////////////////////////////////////////////////////////////////////////// + +class TTimeLimitedRetryPolicy + : public IRequestRetryPolicy +{ +public: + TTimeLimitedRetryPolicy(IRequestRetryPolicyPtr retryPolicy, TDuration timeout) + : RetryPolicy_(retryPolicy) + , Deadline_(TInstant::Now() + timeout) + , Timeout_(timeout) + { } + void NotifyNewAttempt() override + { + if (TInstant::Now() >= Deadline_) { + ythrow TRequestRetriesTimeout() << "retry timeout exceeded (timeout: " << Timeout_ << ")"; + } + RetryPolicy_->NotifyNewAttempt(); + } + + TMaybe<TDuration> OnGenericError(const std::exception& e) override + { + return RetryPolicy_->OnGenericError(e); + } + + TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override + { + return RetryPolicy_->OnRetriableError(e); + } + + void OnIgnoredError(const TErrorResponse& e) override + { + return RetryPolicy_->OnIgnoredError(e); + } + + TString GetAttemptDescription() const override + { + return RetryPolicy_->GetAttemptDescription(); + } + +private: + const IRequestRetryPolicyPtr RetryPolicy_; + const TInstant Deadline_; + const TDuration Timeout_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TDefaultClientRetryPolicy + : public IClientRetryPolicy +{ +public: + explicit TDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config) + : RetryConfigProvider_(std::move(retryConfigProvider)) + , Config_(config) + { } + + IRequestRetryPolicyPtr CreatePolicyForGenericRequest() override + { + return Wrap(CreateDefaultRequestRetryPolicy(Config_)); + } + + IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() override + { + return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_)); + } + + IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy) + { + auto config = RetryConfigProvider_->CreateRetryConfig(); + if (config.RetriesTimeLimit < TDuration::Max()) { + return ::MakeIntrusive<TTimeLimitedRetryPolicy>(std::move(basePolicy), config.RetriesTimeLimit); + } + return basePolicy; + } + +private: + IRetryConfigProviderPtr RetryConfigProvider_; + const TConfigPtr Config_; +}; + +class TDefaultRetryConfigProvider + : public IRetryConfigProvider +{ +public: + TRetryConfig CreateRetryConfig() override + { + return {}; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config) +{ + return MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(config->RetryCount), config); +} + +IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config) +{ + return MakeIntrusive<TDefaultClientRetryPolicy>(std::move(retryConfigProvider), config); +} +IRetryConfigProviderPtr CreateDefaultRetryConfigProvider() +{ + return MakeIntrusive<TDefaultRetryConfigProvider>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +static bool IsChunkError(int code) +{ + return code / 100 == 7; +} + +// Check whether: +// 1) codes contain at least one chunk error AND +// 2) codes don't contain non-retriable chunk errors. +static bool IsRetriableChunkError(const TSet<int>& codes) +{ + using namespace NClusterErrorCodes; + auto isChunkError = false; + for (auto code : codes) { + switch (code) { + case NChunkClient::SessionAlreadyExists: + case NChunkClient::ChunkAlreadyExists: + case NChunkClient::WindowError: + case NChunkClient::BlockContentMismatch: + case NChunkClient::InvalidBlockChecksum: + case NChunkClient::BlockOutOfRange: + case NChunkClient::MissingExtension: + case NChunkClient::NoSuchBlock: + case NChunkClient::NoSuchChunk: + case NChunkClient::NoSuchChunkList: + case NChunkClient::NoSuchChunkTree: + case NChunkClient::NoSuchChunkView: + case NChunkClient::NoSuchMedium: + return false; + default: + isChunkError |= IsChunkError(code); + break; + } + } + return isChunkError; +} + +static TMaybe<TDuration> TryGetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config) +{ + int httpCode = errorResponse.GetHttpCode(); + if (httpCode / 100 != 4 && !errorResponse.IsFromTrailers()) { + return config->RetryInterval; + } + + auto allCodes = errorResponse.GetError().GetAllErrorCodes(); + using namespace NClusterErrorCodes; + if (httpCode == 429 + || allCodes.count(NSecurityClient::RequestQueueSizeLimitExceeded) + || allCodes.count(NRpc::RequestQueueSizeLimitExceeded)) + { + // request rate limit exceeded + return config->RateLimitExceededRetryInterval; + } + if (errorResponse.IsConcurrentOperationsLimitReached()) { + // limit for the number of concurrent operations exceeded + return config->StartOperationRetryInterval; + } + if (IsRetriableChunkError(allCodes)) { + // chunk client errors + return config->ChunkErrorsRetryInterval; + } + for (auto code : TVector<int>{ + NRpc::TransportError, + NRpc::Unavailable, + NApi::RetriableArchiveError, + Canceled, + }) { + if (allCodes.contains(code)) { + return config->RetryInterval; + } + } + return Nothing(); +} + +TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config) +{ + return TryGetBackoffDuration(errorResponse, config).GetOrElse(config->RetryInterval); +} + +bool IsRetriable(const TErrorResponse& errorResponse) +{ + // Retriability of an error doesn't depend on config, so just use global one. + return TryGetBackoffDuration(errorResponse, TConfig::Get()).Defined(); +} + +bool IsRetriable(const std::exception& ex) +{ + if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) { + return false; + } + return true; +} + +TDuration GetBackoffDuration(const std::exception& /*error*/, const TConfigPtr& config) +{ + return GetBackoffDuration(config); +} + +TDuration GetBackoffDuration(const TConfigPtr& config) +{ + return config->RetryInterval; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/retry_lib.h b/yt/cpp/mapreduce/common/retry_lib.h new file mode 100644 index 00000000000..c6c061f614b --- /dev/null +++ b/yt/cpp/mapreduce/common/retry_lib.h @@ -0,0 +1,100 @@ +#pragma once + +#include "fwd.h" + +#include <yt/cpp/mapreduce/interface/fwd.h> + +#include <util/datetime/base.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +// IRequestRetryPolicy class controls retries of single request. +class IRequestRetryPolicy + : public virtual TThrRefBase +{ +public: + // Helper function that returns text description of current attempt, e.g. + // "attempt 3 / 10" + // used in logs. + virtual TString GetAttemptDescription() const = 0; + + // Library code calls this function before any request attempt. + virtual void NotifyNewAttempt() = 0; + + // OnRetriableError is called whenever client gets YT error that can be retried (e.g. operation limit exceeded). + // OnGenericError is called whenever request failed due to generic error like network error. + // + // Both methods must return nothing if policy doesn't want to retry this error. + // Otherwise method should return backoff time. + virtual TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) = 0; + virtual TMaybe<TDuration> OnGenericError(const std::exception& e) = 0; + + // OnIgnoredError is called whenever client gets an error but is going to ignore it. + virtual void OnIgnoredError(const TErrorResponse& /*e*/) = 0; +}; +using IRequestRetryPolicyPtr = ::TIntrusivePtr<IRequestRetryPolicy>; + +//////////////////////////////////////////////////////////////////////////////// + +// IClientRetryPolicy controls creation of policies for individual requests. +class IClientRetryPolicy + : public virtual TThrRefBase +{ +public: + virtual IRequestRetryPolicyPtr CreatePolicyForGenericRequest() = 0; + virtual IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() = 0; +}; + + +//////////////////////////////////////////////////////////////////////////////// + +class TAttemptLimitedRetryPolicy + : public IRequestRetryPolicy +{ +public: + explicit TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config); + + void NotifyNewAttempt() override; + + TMaybe<TDuration> OnGenericError(const std::exception& e) override; + TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override; + void OnIgnoredError(const TErrorResponse& e) override; + TString GetAttemptDescription() const override; + + bool IsAttemptLimitExceeded() const; + +protected: + const TConfigPtr Config_; + +private: + const ui32 AttemptLimit_; + ui32 Attempt_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config); +IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config); +IRetryConfigProviderPtr CreateDefaultRetryConfigProvider(); + +//////////////////////////////////////////////////////////////////////////////// + +// Check if error returned by YT can be retried +bool IsRetriable(const TErrorResponse& errorResponse); +bool IsRetriable(const std::exception& ex); + +// Get backoff duration for errors returned by YT. +TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config); + +// Get backoff duration for errors that are not TErrorResponse. +TDuration GetBackoffDuration(const std::exception& error, const TConfigPtr& config); +TDuration GetBackoffDuration(const TConfigPtr& config); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/wait_proxy.cpp b/yt/cpp/mapreduce/common/wait_proxy.cpp new file mode 100644 index 00000000000..3db034a0980 --- /dev/null +++ b/yt/cpp/mapreduce/common/wait_proxy.cpp @@ -0,0 +1,118 @@ +#include "wait_proxy.h" + + +#include <library/cpp/threading/future/future.h> + +#include <util/system/event.h> +#include <util/system/condvar.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +bool TDefaultWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TDuration timeout) +{ + return future.Wait(timeout); +} + +bool TDefaultWaitProxy::WaitEvent(TSystemEvent& event, TDuration timeout) +{ + return event.WaitT(timeout); +} + +bool TDefaultWaitProxy::WaitCondVar(TCondVar &condVar, TMutex &mutex, TDuration timeout) +{ + return condVar.WaitT(mutex, timeout); +} + +void TDefaultWaitProxy::Sleep(TDuration timeout) +{ + ::Sleep(timeout); +} + +//////////////////////////////////////////////////////////////////////////////// + +TWaitProxy::TWaitProxy() + : Proxy_(::MakeIntrusive<TDefaultWaitProxy>()) +{ } + +TWaitProxy* TWaitProxy::Get() +{ + return Singleton<TWaitProxy>(); +} + +void TWaitProxy::SetProxy(::TIntrusivePtr<IWaitProxy> proxy) +{ + Proxy_ = std::move(proxy); +} + +bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future) +{ + return Proxy_->WaitFuture(future, TDuration::Max()); +} + +bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TInstant deadLine) +{ + return Proxy_->WaitFuture(future, deadLine - TInstant::Now()); +} + +bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TDuration timeout) +{ + return Proxy_->WaitFuture(future, timeout); +} + +bool TWaitProxy::WaitEventD(TSystemEvent& event, TInstant deadLine) +{ + return Proxy_->WaitEvent(event, deadLine - TInstant::Now()); +} + +bool TWaitProxy::WaitEventT(TSystemEvent& event, TDuration timeout) +{ + return Proxy_->WaitEvent(event, timeout); +} + +void TWaitProxy::WaitEventI(TSystemEvent& event) +{ + Proxy_->WaitEvent(event, TDuration::Max()); +} + +bool TWaitProxy::WaitEvent(TSystemEvent& event) +{ + return Proxy_->WaitEvent(event, TDuration::Max()); +} + +bool TWaitProxy::WaitCondVarD(TCondVar& condVar, TMutex& m, TInstant deadLine) +{ + return Proxy_->WaitCondVar(condVar, m, deadLine - TInstant::Now()); +} + +bool TWaitProxy::WaitCondVarT(TCondVar& condVar, TMutex& m, TDuration timeOut) +{ + return Proxy_->WaitCondVar(condVar, m, timeOut); +} + +void TWaitProxy::WaitCondVarI(TCondVar& condVar, TMutex& m) +{ + Proxy_->WaitCondVar(condVar, m, TDuration::Max()); +} + +void TWaitProxy::WaitCondVar(TCondVar& condVar, TMutex& m) +{ + Proxy_->WaitCondVar(condVar, m, TDuration::Max()); +} + +void TWaitProxy::Sleep(TDuration timeout) +{ + Proxy_->Sleep(timeout); +} + +void TWaitProxy::SleepUntil(TInstant instant) +{ + Proxy_->Sleep(instant - TInstant::Now()); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/wait_proxy.h b/yt/cpp/mapreduce/common/wait_proxy.h new file mode 100644 index 00000000000..e7c944cf24e --- /dev/null +++ b/yt/cpp/mapreduce/common/wait_proxy.h @@ -0,0 +1,53 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/wait_proxy.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class TDefaultWaitProxy + : public IWaitProxy +{ +public: + bool WaitFuture(const ::NThreading::TFuture<void>& future, TDuration timeout) override; + bool WaitEvent(TSystemEvent& event, TDuration timeout) override; + bool WaitCondVar(TCondVar& condVar, TMutex& mutex, TDuration timeout) override; + void Sleep(TDuration timeout) override; +}; + +class TWaitProxy { +public: + TWaitProxy(); + + static TWaitProxy* Get(); + + // NB: Non thread-safe, should be called only in initialization code. + void SetProxy(::TIntrusivePtr<IWaitProxy> proxy); + + bool WaitFuture(const ::NThreading::TFuture<void>& future); + bool WaitFuture(const ::NThreading::TFuture<void>& future, TInstant deadLine); + bool WaitFuture(const ::NThreading::TFuture<void>& future, TDuration timeout); + + bool WaitEventD(TSystemEvent& event, TInstant deadLine); + bool WaitEventT(TSystemEvent& event, TDuration timeout); + void WaitEventI(TSystemEvent& event); + bool WaitEvent(TSystemEvent& event); + + bool WaitCondVarD(TCondVar& condVar, TMutex& m, TInstant deadLine); + bool WaitCondVarT(TCondVar& condVar, TMutex& m, TDuration timeOut); + void WaitCondVarI(TCondVar& condVar, TMutex& m); + void WaitCondVar(TCondVar& condVar, TMutex& m); + + void Sleep(TDuration timeout); + void SleepUntil(TInstant instant); + +private: + ::TIntrusivePtr<IWaitProxy> Proxy_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/ya.make b/yt/cpp/mapreduce/common/ya.make new file mode 100644 index 00000000000..004708cb44a --- /dev/null +++ b/yt/cpp/mapreduce/common/ya.make @@ -0,0 +1,23 @@ +LIBRARY() + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +SRCS( + debug_metrics.cpp + helpers.cpp + retry_lib.cpp + wait_proxy.cpp +) + +PEERDIR( + library/cpp/json + library/cpp/svnversion + library/cpp/threading/future + library/cpp/yson + library/cpp/yson/json + library/cpp/yson/node + yt/cpp/mapreduce/interface + yt/cpp/mapreduce/interface/logging +) + +END() |