diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/common | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/common')
-rw-r--r-- | yt/cpp/mapreduce/common/debug_metrics.cpp | 62 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/debug_metrics.h | 22 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/fwd.h | 11 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/helpers.cpp | 126 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/helpers.h | 37 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/node_builder.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/node_visitor.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.cpp | 267 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.h | 100 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/wait_proxy.cpp | 118 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/wait_proxy.h | 53 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/ya.make | 23 |
12 files changed, 827 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/common/debug_metrics.cpp b/yt/cpp/mapreduce/common/debug_metrics.cpp new file mode 100644 index 0000000000..6235e55f7e --- /dev/null +++ b/yt/cpp/mapreduce/common/debug_metrics.cpp @@ -0,0 +1,62 @@ +#include "debug_metrics.h" + +#include <util/generic/hash.h> +#include <util/generic/singleton.h> + +#include <util/string/cast.h> +#include <util/system/mutex.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class TDebugMetrics { +public: + static TDebugMetrics& Get() + { + return *Singleton<TDebugMetrics>(); + } + + void Inc(TStringBuf name) + { + auto g = Guard(Lock_); + auto it = Metrics_.find(name); + if (it == Metrics_.end()) { + it = Metrics_.emplace(ToString(name), 0).first; + } + ++it->second; + } + + ui64 Get(TStringBuf name) const + { + auto g = Guard(Lock_); + auto it = Metrics_.find(name); + if (it == Metrics_.end()) { + return 0; + } else { + return it->second; + } + } + +private: + TMutex Lock_; + THashMap<TString, ui64> Metrics_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +void IncDebugMetricImpl(TStringBuf name) +{ + TDebugMetrics::Get().Inc(name); +} + +ui64 GetDebugMetric(TStringBuf name) +{ + return TDebugMetrics::Get().Get(name); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/debug_metrics.h b/yt/cpp/mapreduce/common/debug_metrics.h new file mode 100644 index 0000000000..6ebbc89f72 --- /dev/null +++ b/yt/cpp/mapreduce/common/debug_metrics.h @@ -0,0 +1,22 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <util/generic/strbuf.h> + +namespace NYT { +namespace NDetail { + +void IncDebugMetricImpl(TStringBuf name); + +// Helper functions that allows to track various events inside YT library, useful for testing. +inline void IncDebugMetric(TStringBuf name) +{ + if (TConfig::Get()->EnableDebugMetrics) { + IncDebugMetricImpl(name); + } +} +ui64 GetDebugMetric(TStringBuf name); + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/fwd.h b/yt/cpp/mapreduce/common/fwd.h new file mode 100644 index 0000000000..a195e727be --- /dev/null +++ b/yt/cpp/mapreduce/common/fwd.h @@ -0,0 +1,11 @@ +#pragma once + +#include <util/generic/fwd.h> + +namespace NYT { + class IRequestRetryPolicy; + using IRequestRetryPolicyPtr = ::TIntrusivePtr<IRequestRetryPolicy>; + + class IClientRetryPolicy; + using IClientRetryPolicyPtr = ::TIntrusivePtr<IClientRetryPolicy>; +} diff --git a/yt/cpp/mapreduce/common/helpers.cpp b/yt/cpp/mapreduce/common/helpers.cpp new file mode 100644 index 0000000000..95924d812c --- /dev/null +++ b/yt/cpp/mapreduce/common/helpers.cpp @@ -0,0 +1,126 @@ +#include "helpers.h" + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/serialize.h> +#include <yt/cpp/mapreduce/interface/fluent.h> + +#include <library/cpp/yson/node/node_builder.h> +#include <library/cpp/yson/node/node_visitor.h> + +#include <library/cpp/yson/parser.h> +#include <library/cpp/yson/writer.h> + +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_value.h> + +#include <util/stream/input.h> +#include <util/stream/output.h> +#include <util/stream/str.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TString NodeListToYsonString(const TNode::TListType& nodes) +{ + TStringStream stream; + ::NYson::TYsonWriter writer(&stream, NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); + auto list = BuildYsonListFluently(&writer); + for (const auto& node : nodes) { + list.Item().Value(node); + } + return stream.Str(); +} + +TNode PathToNode(const TRichYPath& path) +{ + TNode result; + TNodeBuilder builder(&result); + Serialize(path, &builder); + return result; +} + +TNode PathToParamNode(const TRichYPath& path) +{ + return TNode()("path", PathToNode(path)); +} + +TString AttributesToYsonString(const TNode& node) +{ + return BuildYsonStringFluently().BeginMap() + .Item("attributes").Value(node) + .EndMap(); +} + +TString AttributeFilterToYsonString(const TAttributeFilter& filter) +{ + return BuildYsonStringFluently().BeginMap() + .Item("attributes").Value(filter) + .EndMap(); +} + +TNode NodeFromTableSchema(const TTableSchema& schema) +{ + TNode result; + TNodeBuilder builder(&result); + Serialize(schema, &builder); + return result; +} + +void MergeNodes(TNode& dst, const TNode& src) +{ + if (dst.IsMap() && src.IsMap()) { + auto& dstMap = dst.AsMap(); + const auto& srcMap = src.AsMap(); + for (const auto& srcItem : srcMap) { + const auto& key = srcItem.first; + auto dstItem = dstMap.find(key); + if (dstItem != dstMap.end()) { + MergeNodes(dstItem->second, srcItem.second); + } else { + dstMap[key] = srcItem.second; + } + } + } else { + if (dst.GetType() == src.GetType() && src.HasAttributes()) { + auto attributes = dst.GetAttributes(); + MergeNodes(attributes, src.GetAttributes()); + dst = src; + dst.Attributes() = attributes; + } else { + dst = src; + } + } +} + +TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix) +{ + if (path.StartsWith("//") || path.StartsWith("#")) { + return path; + } + return pathPrefix + path; +} + +TString GetWriteTableCommand(const TString& apiVersion) +{ + return apiVersion == "v2" ? "write" : "write_table"; +} + +TString GetReadTableCommand(const TString& apiVersion) +{ + return apiVersion == "v2" ? "read" : "read_table"; +} + +TString GetWriteFileCommand(const TString& apiVersion) +{ + return apiVersion == "v2" ? "upload" : "write_file"; +} + +TString GetReadFileCommand(const TString& apiVersion) +{ + return apiVersion == "v2" ? "download" : "read_file"; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/helpers.h b/yt/cpp/mapreduce/common/helpers.h new file mode 100644 index 0000000000..2174ba820b --- /dev/null +++ b/yt/cpp/mapreduce/common/helpers.h @@ -0,0 +1,37 @@ +#pragma once + +#include "fwd.h" + +#include <library/cpp/yson/node/node_io.h> // backward compatibility + +#include <yt/cpp/mapreduce/interface/node.h> +#include <yt/cpp/mapreduce/interface/common.h> +#include <library/cpp/yson/public.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TString NodeListToYsonString(const TNode::TListType& nodes); + +TNode PathToNode(const TRichYPath& path); +TNode PathToParamNode(const TRichYPath& path); + +TString AttributesToYsonString(const TNode& attributes); + +TString AttributeFilterToYsonString(const TAttributeFilter& filter); + +TNode NodeFromTableSchema(const TTableSchema& schema); + +void MergeNodes(TNode& dst, const TNode& src); + +TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix); + +TString GetWriteTableCommand(const TString& apiVersion); +TString GetReadTableCommand(const TString& apiVersion); +TString GetWriteFileCommand(const TString& apiVersion); +TString GetReadFileCommand(const TString& apiVersion); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/node_builder.h b/yt/cpp/mapreduce/common/node_builder.h new file mode 100644 index 0000000000..c7f731cf09 --- /dev/null +++ b/yt/cpp/mapreduce/common/node_builder.h @@ -0,0 +1,4 @@ +#pragma once + +// Backward compatibility. +#include <library/cpp/yson/node/node_builder.h> diff --git a/yt/cpp/mapreduce/common/node_visitor.h b/yt/cpp/mapreduce/common/node_visitor.h new file mode 100644 index 0000000000..a8bde52b5a --- /dev/null +++ b/yt/cpp/mapreduce/common/node_visitor.h @@ -0,0 +1,4 @@ +#pragma once + +// Backward compatibility. +#include <library/cpp/yson/node/node_visitor.h> diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp new file mode 100644 index 0000000000..cf2c021eb4 --- /dev/null +++ b/yt/cpp/mapreduce/common/retry_lib.cpp @@ -0,0 +1,267 @@ +#include "retry_lib.h" + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> +#include <yt/cpp/mapreduce/interface/retry_policy.h> + +#include <util/string/builder.h> +#include <util/generic/set.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config) + : Config_(config) + , AttemptLimit_(attemptLimit) +{ } + +void TAttemptLimitedRetryPolicy::NotifyNewAttempt() +{ + ++Attempt_; +} + +TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnGenericError(const std::exception& e) +{ + if (IsAttemptLimitExceeded()) { + return Nothing(); + } + return GetBackoffDuration(e, Config_); +} + +TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnRetriableError(const TErrorResponse& e) +{ + if (IsAttemptLimitExceeded()) { + return Nothing(); + } + return GetBackoffDuration(e, Config_); +} + +void TAttemptLimitedRetryPolicy::OnIgnoredError(const TErrorResponse& /*e*/) +{ + --Attempt_; +} + +TString TAttemptLimitedRetryPolicy::GetAttemptDescription() const +{ + return ::TStringBuilder() << "attempt " << Attempt_ << " of " << AttemptLimit_; +} + +bool TAttemptLimitedRetryPolicy::IsAttemptLimitExceeded() const +{ + return Attempt_ >= AttemptLimit_; +} +//////////////////////////////////////////////////////////////////////////////// + +class TTimeLimitedRetryPolicy + : public IRequestRetryPolicy +{ +public: + TTimeLimitedRetryPolicy(IRequestRetryPolicyPtr retryPolicy, TDuration timeout) + : RetryPolicy_(retryPolicy) + , Deadline_(TInstant::Now() + timeout) + , Timeout_(timeout) + { } + void NotifyNewAttempt() override + { + if (TInstant::Now() >= Deadline_) { + ythrow TRequestRetriesTimeout() << "retry timeout exceeded (timeout: " << Timeout_ << ")"; + } + RetryPolicy_->NotifyNewAttempt(); + } + + TMaybe<TDuration> OnGenericError(const std::exception& e) override + { + return RetryPolicy_->OnGenericError(e); + } + + TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override + { + return RetryPolicy_->OnRetriableError(e); + } + + void OnIgnoredError(const TErrorResponse& e) override + { + return RetryPolicy_->OnIgnoredError(e); + } + + TString GetAttemptDescription() const override + { + return RetryPolicy_->GetAttemptDescription(); + } + +private: + const IRequestRetryPolicyPtr RetryPolicy_; + const TInstant Deadline_; + const TDuration Timeout_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TDefaultClientRetryPolicy + : public IClientRetryPolicy +{ +public: + explicit TDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config) + : RetryConfigProvider_(std::move(retryConfigProvider)) + , Config_(config) + { } + + IRequestRetryPolicyPtr CreatePolicyForGenericRequest() override + { + return Wrap(CreateDefaultRequestRetryPolicy(Config_)); + } + + IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() override + { + return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_)); + } + + IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy) + { + auto config = RetryConfigProvider_->CreateRetryConfig(); + if (config.RetriesTimeLimit < TDuration::Max()) { + return ::MakeIntrusive<TTimeLimitedRetryPolicy>(std::move(basePolicy), config.RetriesTimeLimit); + } + return basePolicy; + } + +private: + IRetryConfigProviderPtr RetryConfigProvider_; + const TConfigPtr Config_; +}; + +class TDefaultRetryConfigProvider + : public IRetryConfigProvider +{ +public: + TRetryConfig CreateRetryConfig() override + { + return {}; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config) +{ + return MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(config->RetryCount), config); +} + +IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config) +{ + return MakeIntrusive<TDefaultClientRetryPolicy>(std::move(retryConfigProvider), config); +} +IRetryConfigProviderPtr CreateDefaultRetryConfigProvider() +{ + return MakeIntrusive<TDefaultRetryConfigProvider>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +static bool IsChunkError(int code) +{ + return code / 100 == 7; +} + +// Check whether: +// 1) codes contain at least one chunk error AND +// 2) codes don't contain non-retriable chunk errors. +static bool IsRetriableChunkError(const TSet<int>& codes) +{ + using namespace NClusterErrorCodes; + auto isChunkError = false; + for (auto code : codes) { + switch (code) { + case NChunkClient::SessionAlreadyExists: + case NChunkClient::ChunkAlreadyExists: + case NChunkClient::WindowError: + case NChunkClient::BlockContentMismatch: + case NChunkClient::InvalidBlockChecksum: + case NChunkClient::BlockOutOfRange: + case NChunkClient::MissingExtension: + case NChunkClient::NoSuchBlock: + case NChunkClient::NoSuchChunk: + case NChunkClient::NoSuchChunkList: + case NChunkClient::NoSuchChunkTree: + case NChunkClient::NoSuchChunkView: + case NChunkClient::NoSuchMedium: + return false; + default: + isChunkError |= IsChunkError(code); + break; + } + } + return isChunkError; +} + +static TMaybe<TDuration> TryGetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config) +{ + int httpCode = errorResponse.GetHttpCode(); + if (httpCode / 100 != 4 && !errorResponse.IsFromTrailers()) { + return config->RetryInterval; + } + + auto allCodes = errorResponse.GetError().GetAllErrorCodes(); + using namespace NClusterErrorCodes; + if (httpCode == 429 + || allCodes.count(NSecurityClient::RequestQueueSizeLimitExceeded) + || allCodes.count(NRpc::RequestQueueSizeLimitExceeded)) + { + // request rate limit exceeded + return config->RateLimitExceededRetryInterval; + } + if (errorResponse.IsConcurrentOperationsLimitReached()) { + // limit for the number of concurrent operations exceeded + return config->StartOperationRetryInterval; + } + if (IsRetriableChunkError(allCodes)) { + // chunk client errors + return config->ChunkErrorsRetryInterval; + } + for (auto code : TVector<int>{ + NRpc::TransportError, + NRpc::Unavailable, + NApi::RetriableArchiveError, + Canceled, + }) { + if (allCodes.contains(code)) { + return config->RetryInterval; + } + } + return Nothing(); +} + +TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config) +{ + return TryGetBackoffDuration(errorResponse, config).GetOrElse(config->RetryInterval); +} + +bool IsRetriable(const TErrorResponse& errorResponse) +{ + // Retriability of an error doesn't depend on config, so just use global one. + return TryGetBackoffDuration(errorResponse, TConfig::Get()).Defined(); +} + +bool IsRetriable(const std::exception& ex) +{ + if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) { + return false; + } + return true; +} + +TDuration GetBackoffDuration(const std::exception& /*error*/, const TConfigPtr& config) +{ + return GetBackoffDuration(config); +} + +TDuration GetBackoffDuration(const TConfigPtr& config) +{ + return config->RetryInterval; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/retry_lib.h b/yt/cpp/mapreduce/common/retry_lib.h new file mode 100644 index 0000000000..c6c061f614 --- /dev/null +++ b/yt/cpp/mapreduce/common/retry_lib.h @@ -0,0 +1,100 @@ +#pragma once + +#include "fwd.h" + +#include <yt/cpp/mapreduce/interface/fwd.h> + +#include <util/datetime/base.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +// IRequestRetryPolicy class controls retries of single request. +class IRequestRetryPolicy + : public virtual TThrRefBase +{ +public: + // Helper function that returns text description of current attempt, e.g. + // "attempt 3 / 10" + // used in logs. + virtual TString GetAttemptDescription() const = 0; + + // Library code calls this function before any request attempt. + virtual void NotifyNewAttempt() = 0; + + // OnRetriableError is called whenever client gets YT error that can be retried (e.g. operation limit exceeded). + // OnGenericError is called whenever request failed due to generic error like network error. + // + // Both methods must return nothing if policy doesn't want to retry this error. + // Otherwise method should return backoff time. + virtual TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) = 0; + virtual TMaybe<TDuration> OnGenericError(const std::exception& e) = 0; + + // OnIgnoredError is called whenever client gets an error but is going to ignore it. + virtual void OnIgnoredError(const TErrorResponse& /*e*/) = 0; +}; +using IRequestRetryPolicyPtr = ::TIntrusivePtr<IRequestRetryPolicy>; + +//////////////////////////////////////////////////////////////////////////////// + +// IClientRetryPolicy controls creation of policies for individual requests. +class IClientRetryPolicy + : public virtual TThrRefBase +{ +public: + virtual IRequestRetryPolicyPtr CreatePolicyForGenericRequest() = 0; + virtual IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() = 0; +}; + + +//////////////////////////////////////////////////////////////////////////////// + +class TAttemptLimitedRetryPolicy + : public IRequestRetryPolicy +{ +public: + explicit TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config); + + void NotifyNewAttempt() override; + + TMaybe<TDuration> OnGenericError(const std::exception& e) override; + TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override; + void OnIgnoredError(const TErrorResponse& e) override; + TString GetAttemptDescription() const override; + + bool IsAttemptLimitExceeded() const; + +protected: + const TConfigPtr Config_; + +private: + const ui32 AttemptLimit_; + ui32 Attempt_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config); +IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config); +IRetryConfigProviderPtr CreateDefaultRetryConfigProvider(); + +//////////////////////////////////////////////////////////////////////////////// + +// Check if error returned by YT can be retried +bool IsRetriable(const TErrorResponse& errorResponse); +bool IsRetriable(const std::exception& ex); + +// Get backoff duration for errors returned by YT. +TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config); + +// Get backoff duration for errors that are not TErrorResponse. +TDuration GetBackoffDuration(const std::exception& error, const TConfigPtr& config); +TDuration GetBackoffDuration(const TConfigPtr& config); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/wait_proxy.cpp b/yt/cpp/mapreduce/common/wait_proxy.cpp new file mode 100644 index 0000000000..3db034a098 --- /dev/null +++ b/yt/cpp/mapreduce/common/wait_proxy.cpp @@ -0,0 +1,118 @@ +#include "wait_proxy.h" + + +#include <library/cpp/threading/future/future.h> + +#include <util/system/event.h> +#include <util/system/condvar.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +bool TDefaultWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TDuration timeout) +{ + return future.Wait(timeout); +} + +bool TDefaultWaitProxy::WaitEvent(TSystemEvent& event, TDuration timeout) +{ + return event.WaitT(timeout); +} + +bool TDefaultWaitProxy::WaitCondVar(TCondVar &condVar, TMutex &mutex, TDuration timeout) +{ + return condVar.WaitT(mutex, timeout); +} + +void TDefaultWaitProxy::Sleep(TDuration timeout) +{ + ::Sleep(timeout); +} + +//////////////////////////////////////////////////////////////////////////////// + +TWaitProxy::TWaitProxy() + : Proxy_(::MakeIntrusive<TDefaultWaitProxy>()) +{ } + +TWaitProxy* TWaitProxy::Get() +{ + return Singleton<TWaitProxy>(); +} + +void TWaitProxy::SetProxy(::TIntrusivePtr<IWaitProxy> proxy) +{ + Proxy_ = std::move(proxy); +} + +bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future) +{ + return Proxy_->WaitFuture(future, TDuration::Max()); +} + +bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TInstant deadLine) +{ + return Proxy_->WaitFuture(future, deadLine - TInstant::Now()); +} + +bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TDuration timeout) +{ + return Proxy_->WaitFuture(future, timeout); +} + +bool TWaitProxy::WaitEventD(TSystemEvent& event, TInstant deadLine) +{ + return Proxy_->WaitEvent(event, deadLine - TInstant::Now()); +} + +bool TWaitProxy::WaitEventT(TSystemEvent& event, TDuration timeout) +{ + return Proxy_->WaitEvent(event, timeout); +} + +void TWaitProxy::WaitEventI(TSystemEvent& event) +{ + Proxy_->WaitEvent(event, TDuration::Max()); +} + +bool TWaitProxy::WaitEvent(TSystemEvent& event) +{ + return Proxy_->WaitEvent(event, TDuration::Max()); +} + +bool TWaitProxy::WaitCondVarD(TCondVar& condVar, TMutex& m, TInstant deadLine) +{ + return Proxy_->WaitCondVar(condVar, m, deadLine - TInstant::Now()); +} + +bool TWaitProxy::WaitCondVarT(TCondVar& condVar, TMutex& m, TDuration timeOut) +{ + return Proxy_->WaitCondVar(condVar, m, timeOut); +} + +void TWaitProxy::WaitCondVarI(TCondVar& condVar, TMutex& m) +{ + Proxy_->WaitCondVar(condVar, m, TDuration::Max()); +} + +void TWaitProxy::WaitCondVar(TCondVar& condVar, TMutex& m) +{ + Proxy_->WaitCondVar(condVar, m, TDuration::Max()); +} + +void TWaitProxy::Sleep(TDuration timeout) +{ + Proxy_->Sleep(timeout); +} + +void TWaitProxy::SleepUntil(TInstant instant) +{ + Proxy_->Sleep(instant - TInstant::Now()); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/wait_proxy.h b/yt/cpp/mapreduce/common/wait_proxy.h new file mode 100644 index 0000000000..e7c944cf24 --- /dev/null +++ b/yt/cpp/mapreduce/common/wait_proxy.h @@ -0,0 +1,53 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/wait_proxy.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class TDefaultWaitProxy + : public IWaitProxy +{ +public: + bool WaitFuture(const ::NThreading::TFuture<void>& future, TDuration timeout) override; + bool WaitEvent(TSystemEvent& event, TDuration timeout) override; + bool WaitCondVar(TCondVar& condVar, TMutex& mutex, TDuration timeout) override; + void Sleep(TDuration timeout) override; +}; + +class TWaitProxy { +public: + TWaitProxy(); + + static TWaitProxy* Get(); + + // NB: Non thread-safe, should be called only in initialization code. + void SetProxy(::TIntrusivePtr<IWaitProxy> proxy); + + bool WaitFuture(const ::NThreading::TFuture<void>& future); + bool WaitFuture(const ::NThreading::TFuture<void>& future, TInstant deadLine); + bool WaitFuture(const ::NThreading::TFuture<void>& future, TDuration timeout); + + bool WaitEventD(TSystemEvent& event, TInstant deadLine); + bool WaitEventT(TSystemEvent& event, TDuration timeout); + void WaitEventI(TSystemEvent& event); + bool WaitEvent(TSystemEvent& event); + + bool WaitCondVarD(TCondVar& condVar, TMutex& m, TInstant deadLine); + bool WaitCondVarT(TCondVar& condVar, TMutex& m, TDuration timeOut); + void WaitCondVarI(TCondVar& condVar, TMutex& m); + void WaitCondVar(TCondVar& condVar, TMutex& m); + + void Sleep(TDuration timeout); + void SleepUntil(TInstant instant); + +private: + ::TIntrusivePtr<IWaitProxy> Proxy_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/common/ya.make b/yt/cpp/mapreduce/common/ya.make new file mode 100644 index 0000000000..004708cb44 --- /dev/null +++ b/yt/cpp/mapreduce/common/ya.make @@ -0,0 +1,23 @@ +LIBRARY() + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +SRCS( + debug_metrics.cpp + helpers.cpp + retry_lib.cpp + wait_proxy.cpp +) + +PEERDIR( + library/cpp/json + library/cpp/svnversion + library/cpp/threading/future + library/cpp/yson + library/cpp/yson/json + library/cpp/yson/node + yt/cpp/mapreduce/interface + yt/cpp/mapreduce/interface/logging +) + +END() |