diff options
| author | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
| commit | 3e1899838408bbad47622007aa382bc8a2b01f87 (patch) | |
| tree | 0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/common | |
| parent | 5463eb3f5e72a86f858a3d27c886470a724ede34 (diff) | |
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing
changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/common')
| -rw-r--r-- | yt/cpp/mapreduce/common/debug_metrics.cpp | 62 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/debug_metrics.h | 22 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/fwd.h | 11 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/helpers.cpp | 126 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/helpers.h | 37 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/node_builder.h | 4 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/node_visitor.h | 4 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.cpp | 267 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.h | 100 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/wait_proxy.cpp | 118 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/wait_proxy.h | 53 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/ya.make | 23 |
12 files changed, 0 insertions, 827 deletions
diff --git a/yt/cpp/mapreduce/common/debug_metrics.cpp b/yt/cpp/mapreduce/common/debug_metrics.cpp deleted file mode 100644 index 6235e55f7e6..00000000000 --- a/yt/cpp/mapreduce/common/debug_metrics.cpp +++ /dev/null @@ -1,62 +0,0 @@ -#include "debug_metrics.h" - -#include <util/generic/hash.h> -#include <util/generic/singleton.h> - -#include <util/string/cast.h> -#include <util/system/mutex.h> - -namespace NYT { -namespace NDetail { - -//////////////////////////////////////////////////////////////////////////////// - -class TDebugMetrics { -public: - static TDebugMetrics& Get() - { - return *Singleton<TDebugMetrics>(); - } - - void Inc(TStringBuf name) - { - auto g = Guard(Lock_); - auto it = Metrics_.find(name); - if (it == Metrics_.end()) { - it = Metrics_.emplace(ToString(name), 0).first; - } - ++it->second; - } - - ui64 Get(TStringBuf name) const - { - auto g = Guard(Lock_); - auto it = Metrics_.find(name); - if (it == Metrics_.end()) { - return 0; - } else { - return it->second; - } - } - -private: - TMutex Lock_; - THashMap<TString, ui64> Metrics_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -void IncDebugMetricImpl(TStringBuf name) -{ - TDebugMetrics::Get().Inc(name); -} - -ui64 GetDebugMetric(TStringBuf name) -{ - return TDebugMetrics::Get().Get(name); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NDetail -} // namespace NYT diff --git a/yt/cpp/mapreduce/common/debug_metrics.h b/yt/cpp/mapreduce/common/debug_metrics.h deleted file mode 100644 index 6ebbc89f72c..00000000000 --- a/yt/cpp/mapreduce/common/debug_metrics.h +++ /dev/null @@ -1,22 +0,0 @@ -#pragma once - -#include <yt/cpp/mapreduce/interface/config.h> - -#include <util/generic/strbuf.h> - -namespace NYT { -namespace NDetail { - -void IncDebugMetricImpl(TStringBuf name); - -// Helper functions that allows to track various events inside YT library, useful for testing. -inline void IncDebugMetric(TStringBuf name) -{ - if (TConfig::Get()->EnableDebugMetrics) { - IncDebugMetricImpl(name); - } -} -ui64 GetDebugMetric(TStringBuf name); - -} // namespace NDetail -} // namespace NYT diff --git a/yt/cpp/mapreduce/common/fwd.h b/yt/cpp/mapreduce/common/fwd.h deleted file mode 100644 index a195e727be4..00000000000 --- a/yt/cpp/mapreduce/common/fwd.h +++ /dev/null @@ -1,11 +0,0 @@ -#pragma once - -#include <util/generic/fwd.h> - -namespace NYT { - class IRequestRetryPolicy; - using IRequestRetryPolicyPtr = ::TIntrusivePtr<IRequestRetryPolicy>; - - class IClientRetryPolicy; - using IClientRetryPolicyPtr = ::TIntrusivePtr<IClientRetryPolicy>; -} diff --git a/yt/cpp/mapreduce/common/helpers.cpp b/yt/cpp/mapreduce/common/helpers.cpp deleted file mode 100644 index 95924d812c9..00000000000 --- a/yt/cpp/mapreduce/common/helpers.cpp +++ /dev/null @@ -1,126 +0,0 @@ -#include "helpers.h" - -#include <yt/cpp/mapreduce/interface/config.h> -#include <yt/cpp/mapreduce/interface/serialize.h> -#include <yt/cpp/mapreduce/interface/fluent.h> - -#include <library/cpp/yson/node/node_builder.h> -#include <library/cpp/yson/node/node_visitor.h> - -#include <library/cpp/yson/parser.h> -#include <library/cpp/yson/writer.h> - -#include <library/cpp/json/json_reader.h> -#include <library/cpp/json/json_value.h> - -#include <util/stream/input.h> -#include <util/stream/output.h> -#include <util/stream/str.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -TString NodeListToYsonString(const TNode::TListType& nodes) -{ - TStringStream stream; - ::NYson::TYsonWriter writer(&stream, NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); - auto list = BuildYsonListFluently(&writer); - for (const auto& node : nodes) { - list.Item().Value(node); - } - return stream.Str(); -} - -TNode PathToNode(const TRichYPath& path) -{ - TNode result; - TNodeBuilder builder(&result); - Serialize(path, &builder); - return result; -} - -TNode PathToParamNode(const TRichYPath& path) -{ - return TNode()("path", PathToNode(path)); -} - -TString AttributesToYsonString(const TNode& node) -{ - return BuildYsonStringFluently().BeginMap() - .Item("attributes").Value(node) - .EndMap(); -} - -TString AttributeFilterToYsonString(const TAttributeFilter& filter) -{ - return BuildYsonStringFluently().BeginMap() - .Item("attributes").Value(filter) - .EndMap(); -} - -TNode NodeFromTableSchema(const TTableSchema& schema) -{ - TNode result; - TNodeBuilder builder(&result); - Serialize(schema, &builder); - return result; -} - -void MergeNodes(TNode& dst, const TNode& src) -{ - if (dst.IsMap() && src.IsMap()) { - auto& dstMap = dst.AsMap(); - const auto& srcMap = src.AsMap(); - for (const auto& srcItem : srcMap) { - const auto& key = srcItem.first; - auto dstItem = dstMap.find(key); - if (dstItem != dstMap.end()) { - MergeNodes(dstItem->second, srcItem.second); - } else { - dstMap[key] = srcItem.second; - } - } - } else { - if (dst.GetType() == src.GetType() && src.HasAttributes()) { - auto attributes = dst.GetAttributes(); - MergeNodes(attributes, src.GetAttributes()); - dst = src; - dst.Attributes() = attributes; - } else { - dst = src; - } - } -} - -TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix) -{ - if (path.StartsWith("//") || path.StartsWith("#")) { - return path; - } - return pathPrefix + path; -} - -TString GetWriteTableCommand(const TString& apiVersion) -{ - return apiVersion == "v2" ? "write" : "write_table"; -} - -TString GetReadTableCommand(const TString& apiVersion) -{ - return apiVersion == "v2" ? "read" : "read_table"; -} - -TString GetWriteFileCommand(const TString& apiVersion) -{ - return apiVersion == "v2" ? "upload" : "write_file"; -} - -TString GetReadFileCommand(const TString& apiVersion) -{ - return apiVersion == "v2" ? "download" : "read_file"; -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT diff --git a/yt/cpp/mapreduce/common/helpers.h b/yt/cpp/mapreduce/common/helpers.h deleted file mode 100644 index 2174ba820b5..00000000000 --- a/yt/cpp/mapreduce/common/helpers.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include "fwd.h" - -#include <library/cpp/yson/node/node_io.h> // backward compatibility - -#include <yt/cpp/mapreduce/interface/node.h> -#include <yt/cpp/mapreduce/interface/common.h> -#include <library/cpp/yson/public.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -TString NodeListToYsonString(const TNode::TListType& nodes); - -TNode PathToNode(const TRichYPath& path); -TNode PathToParamNode(const TRichYPath& path); - -TString AttributesToYsonString(const TNode& attributes); - -TString AttributeFilterToYsonString(const TAttributeFilter& filter); - -TNode NodeFromTableSchema(const TTableSchema& schema); - -void MergeNodes(TNode& dst, const TNode& src); - -TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix); - -TString GetWriteTableCommand(const TString& apiVersion); -TString GetReadTableCommand(const TString& apiVersion); -TString GetWriteFileCommand(const TString& apiVersion); -TString GetReadFileCommand(const TString& apiVersion); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT diff --git a/yt/cpp/mapreduce/common/node_builder.h b/yt/cpp/mapreduce/common/node_builder.h deleted file mode 100644 index c7f731cf09c..00000000000 --- a/yt/cpp/mapreduce/common/node_builder.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once - -// Backward compatibility. -#include <library/cpp/yson/node/node_builder.h> diff --git a/yt/cpp/mapreduce/common/node_visitor.h b/yt/cpp/mapreduce/common/node_visitor.h deleted file mode 100644 index a8bde52b5a5..00000000000 --- a/yt/cpp/mapreduce/common/node_visitor.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once - -// Backward compatibility. -#include <library/cpp/yson/node/node_visitor.h> diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp deleted file mode 100644 index cf2c021eb44..00000000000 --- a/yt/cpp/mapreduce/common/retry_lib.cpp +++ /dev/null @@ -1,267 +0,0 @@ -#include "retry_lib.h" - -#include <yt/cpp/mapreduce/interface/config.h> -#include <yt/cpp/mapreduce/interface/errors.h> -#include <yt/cpp/mapreduce/interface/error_codes.h> -#include <yt/cpp/mapreduce/interface/retry_policy.h> - -#include <util/string/builder.h> -#include <util/generic/set.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config) - : Config_(config) - , AttemptLimit_(attemptLimit) -{ } - -void TAttemptLimitedRetryPolicy::NotifyNewAttempt() -{ - ++Attempt_; -} - -TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnGenericError(const std::exception& e) -{ - if (IsAttemptLimitExceeded()) { - return Nothing(); - } - return GetBackoffDuration(e, Config_); -} - -TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnRetriableError(const TErrorResponse& e) -{ - if (IsAttemptLimitExceeded()) { - return Nothing(); - } - return GetBackoffDuration(e, Config_); -} - -void TAttemptLimitedRetryPolicy::OnIgnoredError(const TErrorResponse& /*e*/) -{ - --Attempt_; -} - -TString TAttemptLimitedRetryPolicy::GetAttemptDescription() const -{ - return ::TStringBuilder() << "attempt " << Attempt_ << " of " << AttemptLimit_; -} - -bool TAttemptLimitedRetryPolicy::IsAttemptLimitExceeded() const -{ - return Attempt_ >= AttemptLimit_; -} -//////////////////////////////////////////////////////////////////////////////// - -class TTimeLimitedRetryPolicy - : public IRequestRetryPolicy -{ -public: - TTimeLimitedRetryPolicy(IRequestRetryPolicyPtr retryPolicy, TDuration timeout) - : RetryPolicy_(retryPolicy) - , Deadline_(TInstant::Now() + timeout) - , Timeout_(timeout) - { } - void NotifyNewAttempt() override - { - if (TInstant::Now() >= Deadline_) { - ythrow TRequestRetriesTimeout() << "retry timeout exceeded (timeout: " << Timeout_ << ")"; - } - RetryPolicy_->NotifyNewAttempt(); - } - - TMaybe<TDuration> OnGenericError(const std::exception& e) override - { - return RetryPolicy_->OnGenericError(e); - } - - TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override - { - return RetryPolicy_->OnRetriableError(e); - } - - void OnIgnoredError(const TErrorResponse& e) override - { - return RetryPolicy_->OnIgnoredError(e); - } - - TString GetAttemptDescription() const override - { - return RetryPolicy_->GetAttemptDescription(); - } - -private: - const IRequestRetryPolicyPtr RetryPolicy_; - const TInstant Deadline_; - const TDuration Timeout_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TDefaultClientRetryPolicy - : public IClientRetryPolicy -{ -public: - explicit TDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config) - : RetryConfigProvider_(std::move(retryConfigProvider)) - , Config_(config) - { } - - IRequestRetryPolicyPtr CreatePolicyForGenericRequest() override - { - return Wrap(CreateDefaultRequestRetryPolicy(Config_)); - } - - IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() override - { - return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_)); - } - - IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy) - { - auto config = RetryConfigProvider_->CreateRetryConfig(); - if (config.RetriesTimeLimit < TDuration::Max()) { - return ::MakeIntrusive<TTimeLimitedRetryPolicy>(std::move(basePolicy), config.RetriesTimeLimit); - } - return basePolicy; - } - -private: - IRetryConfigProviderPtr RetryConfigProvider_; - const TConfigPtr Config_; -}; - -class TDefaultRetryConfigProvider - : public IRetryConfigProvider -{ -public: - TRetryConfig CreateRetryConfig() override - { - return {}; - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config) -{ - return MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(config->RetryCount), config); -} - -IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config) -{ - return MakeIntrusive<TDefaultClientRetryPolicy>(std::move(retryConfigProvider), config); -} -IRetryConfigProviderPtr CreateDefaultRetryConfigProvider() -{ - return MakeIntrusive<TDefaultRetryConfigProvider>(); -} - -//////////////////////////////////////////////////////////////////////////////// - -static bool IsChunkError(int code) -{ - return code / 100 == 7; -} - -// Check whether: -// 1) codes contain at least one chunk error AND -// 2) codes don't contain non-retriable chunk errors. -static bool IsRetriableChunkError(const TSet<int>& codes) -{ - using namespace NClusterErrorCodes; - auto isChunkError = false; - for (auto code : codes) { - switch (code) { - case NChunkClient::SessionAlreadyExists: - case NChunkClient::ChunkAlreadyExists: - case NChunkClient::WindowError: - case NChunkClient::BlockContentMismatch: - case NChunkClient::InvalidBlockChecksum: - case NChunkClient::BlockOutOfRange: - case NChunkClient::MissingExtension: - case NChunkClient::NoSuchBlock: - case NChunkClient::NoSuchChunk: - case NChunkClient::NoSuchChunkList: - case NChunkClient::NoSuchChunkTree: - case NChunkClient::NoSuchChunkView: - case NChunkClient::NoSuchMedium: - return false; - default: - isChunkError |= IsChunkError(code); - break; - } - } - return isChunkError; -} - -static TMaybe<TDuration> TryGetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config) -{ - int httpCode = errorResponse.GetHttpCode(); - if (httpCode / 100 != 4 && !errorResponse.IsFromTrailers()) { - return config->RetryInterval; - } - - auto allCodes = errorResponse.GetError().GetAllErrorCodes(); - using namespace NClusterErrorCodes; - if (httpCode == 429 - || allCodes.count(NSecurityClient::RequestQueueSizeLimitExceeded) - || allCodes.count(NRpc::RequestQueueSizeLimitExceeded)) - { - // request rate limit exceeded - return config->RateLimitExceededRetryInterval; - } - if (errorResponse.IsConcurrentOperationsLimitReached()) { - // limit for the number of concurrent operations exceeded - return config->StartOperationRetryInterval; - } - if (IsRetriableChunkError(allCodes)) { - // chunk client errors - return config->ChunkErrorsRetryInterval; - } - for (auto code : TVector<int>{ - NRpc::TransportError, - NRpc::Unavailable, - NApi::RetriableArchiveError, - Canceled, - }) { - if (allCodes.contains(code)) { - return config->RetryInterval; - } - } - return Nothing(); -} - -TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config) -{ - return TryGetBackoffDuration(errorResponse, config).GetOrElse(config->RetryInterval); -} - -bool IsRetriable(const TErrorResponse& errorResponse) -{ - // Retriability of an error doesn't depend on config, so just use global one. - return TryGetBackoffDuration(errorResponse, TConfig::Get()).Defined(); -} - -bool IsRetriable(const std::exception& ex) -{ - if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) { - return false; - } - return true; -} - -TDuration GetBackoffDuration(const std::exception& /*error*/, const TConfigPtr& config) -{ - return GetBackoffDuration(config); -} - -TDuration GetBackoffDuration(const TConfigPtr& config) -{ - return config->RetryInterval; -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT diff --git a/yt/cpp/mapreduce/common/retry_lib.h b/yt/cpp/mapreduce/common/retry_lib.h deleted file mode 100644 index c6c061f614b..00000000000 --- a/yt/cpp/mapreduce/common/retry_lib.h +++ /dev/null @@ -1,100 +0,0 @@ -#pragma once - -#include "fwd.h" - -#include <yt/cpp/mapreduce/interface/fwd.h> - -#include <util/datetime/base.h> -#include <util/generic/maybe.h> -#include <util/generic/ptr.h> -#include <util/generic/string.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -// IRequestRetryPolicy class controls retries of single request. -class IRequestRetryPolicy - : public virtual TThrRefBase -{ -public: - // Helper function that returns text description of current attempt, e.g. - // "attempt 3 / 10" - // used in logs. - virtual TString GetAttemptDescription() const = 0; - - // Library code calls this function before any request attempt. - virtual void NotifyNewAttempt() = 0; - - // OnRetriableError is called whenever client gets YT error that can be retried (e.g. operation limit exceeded). - // OnGenericError is called whenever request failed due to generic error like network error. - // - // Both methods must return nothing if policy doesn't want to retry this error. - // Otherwise method should return backoff time. - virtual TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) = 0; - virtual TMaybe<TDuration> OnGenericError(const std::exception& e) = 0; - - // OnIgnoredError is called whenever client gets an error but is going to ignore it. - virtual void OnIgnoredError(const TErrorResponse& /*e*/) = 0; -}; -using IRequestRetryPolicyPtr = ::TIntrusivePtr<IRequestRetryPolicy>; - -//////////////////////////////////////////////////////////////////////////////// - -// IClientRetryPolicy controls creation of policies for individual requests. -class IClientRetryPolicy - : public virtual TThrRefBase -{ -public: - virtual IRequestRetryPolicyPtr CreatePolicyForGenericRequest() = 0; - virtual IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() = 0; -}; - - -//////////////////////////////////////////////////////////////////////////////// - -class TAttemptLimitedRetryPolicy - : public IRequestRetryPolicy -{ -public: - explicit TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config); - - void NotifyNewAttempt() override; - - TMaybe<TDuration> OnGenericError(const std::exception& e) override; - TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override; - void OnIgnoredError(const TErrorResponse& e) override; - TString GetAttemptDescription() const override; - - bool IsAttemptLimitExceeded() const; - -protected: - const TConfigPtr Config_; - -private: - const ui32 AttemptLimit_; - ui32 Attempt_ = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config); -IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config); -IRetryConfigProviderPtr CreateDefaultRetryConfigProvider(); - -//////////////////////////////////////////////////////////////////////////////// - -// Check if error returned by YT can be retried -bool IsRetriable(const TErrorResponse& errorResponse); -bool IsRetriable(const std::exception& ex); - -// Get backoff duration for errors returned by YT. -TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config); - -// Get backoff duration for errors that are not TErrorResponse. -TDuration GetBackoffDuration(const std::exception& error, const TConfigPtr& config); -TDuration GetBackoffDuration(const TConfigPtr& config); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT diff --git a/yt/cpp/mapreduce/common/wait_proxy.cpp b/yt/cpp/mapreduce/common/wait_proxy.cpp deleted file mode 100644 index 3db034a0980..00000000000 --- a/yt/cpp/mapreduce/common/wait_proxy.cpp +++ /dev/null @@ -1,118 +0,0 @@ -#include "wait_proxy.h" - - -#include <library/cpp/threading/future/future.h> - -#include <util/system/event.h> -#include <util/system/condvar.h> - -namespace NYT { -namespace NDetail { - -//////////////////////////////////////////////////////////////////////////////// - -bool TDefaultWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TDuration timeout) -{ - return future.Wait(timeout); -} - -bool TDefaultWaitProxy::WaitEvent(TSystemEvent& event, TDuration timeout) -{ - return event.WaitT(timeout); -} - -bool TDefaultWaitProxy::WaitCondVar(TCondVar &condVar, TMutex &mutex, TDuration timeout) -{ - return condVar.WaitT(mutex, timeout); -} - -void TDefaultWaitProxy::Sleep(TDuration timeout) -{ - ::Sleep(timeout); -} - -//////////////////////////////////////////////////////////////////////////////// - -TWaitProxy::TWaitProxy() - : Proxy_(::MakeIntrusive<TDefaultWaitProxy>()) -{ } - -TWaitProxy* TWaitProxy::Get() -{ - return Singleton<TWaitProxy>(); -} - -void TWaitProxy::SetProxy(::TIntrusivePtr<IWaitProxy> proxy) -{ - Proxy_ = std::move(proxy); -} - -bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future) -{ - return Proxy_->WaitFuture(future, TDuration::Max()); -} - -bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TInstant deadLine) -{ - return Proxy_->WaitFuture(future, deadLine - TInstant::Now()); -} - -bool TWaitProxy::WaitFuture(const NThreading::TFuture<void>& future, TDuration timeout) -{ - return Proxy_->WaitFuture(future, timeout); -} - -bool TWaitProxy::WaitEventD(TSystemEvent& event, TInstant deadLine) -{ - return Proxy_->WaitEvent(event, deadLine - TInstant::Now()); -} - -bool TWaitProxy::WaitEventT(TSystemEvent& event, TDuration timeout) -{ - return Proxy_->WaitEvent(event, timeout); -} - -void TWaitProxy::WaitEventI(TSystemEvent& event) -{ - Proxy_->WaitEvent(event, TDuration::Max()); -} - -bool TWaitProxy::WaitEvent(TSystemEvent& event) -{ - return Proxy_->WaitEvent(event, TDuration::Max()); -} - -bool TWaitProxy::WaitCondVarD(TCondVar& condVar, TMutex& m, TInstant deadLine) -{ - return Proxy_->WaitCondVar(condVar, m, deadLine - TInstant::Now()); -} - -bool TWaitProxy::WaitCondVarT(TCondVar& condVar, TMutex& m, TDuration timeOut) -{ - return Proxy_->WaitCondVar(condVar, m, timeOut); -} - -void TWaitProxy::WaitCondVarI(TCondVar& condVar, TMutex& m) -{ - Proxy_->WaitCondVar(condVar, m, TDuration::Max()); -} - -void TWaitProxy::WaitCondVar(TCondVar& condVar, TMutex& m) -{ - Proxy_->WaitCondVar(condVar, m, TDuration::Max()); -} - -void TWaitProxy::Sleep(TDuration timeout) -{ - Proxy_->Sleep(timeout); -} - -void TWaitProxy::SleepUntil(TInstant instant) -{ - Proxy_->Sleep(instant - TInstant::Now()); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NDetail -} // namespace NYT diff --git a/yt/cpp/mapreduce/common/wait_proxy.h b/yt/cpp/mapreduce/common/wait_proxy.h deleted file mode 100644 index e7c944cf24e..00000000000 --- a/yt/cpp/mapreduce/common/wait_proxy.h +++ /dev/null @@ -1,53 +0,0 @@ -#pragma once - -#include <yt/cpp/mapreduce/interface/wait_proxy.h> - -namespace NYT { -namespace NDetail { - -//////////////////////////////////////////////////////////////////////////////// - -class TDefaultWaitProxy - : public IWaitProxy -{ -public: - bool WaitFuture(const ::NThreading::TFuture<void>& future, TDuration timeout) override; - bool WaitEvent(TSystemEvent& event, TDuration timeout) override; - bool WaitCondVar(TCondVar& condVar, TMutex& mutex, TDuration timeout) override; - void Sleep(TDuration timeout) override; -}; - -class TWaitProxy { -public: - TWaitProxy(); - - static TWaitProxy* Get(); - - // NB: Non thread-safe, should be called only in initialization code. - void SetProxy(::TIntrusivePtr<IWaitProxy> proxy); - - bool WaitFuture(const ::NThreading::TFuture<void>& future); - bool WaitFuture(const ::NThreading::TFuture<void>& future, TInstant deadLine); - bool WaitFuture(const ::NThreading::TFuture<void>& future, TDuration timeout); - - bool WaitEventD(TSystemEvent& event, TInstant deadLine); - bool WaitEventT(TSystemEvent& event, TDuration timeout); - void WaitEventI(TSystemEvent& event); - bool WaitEvent(TSystemEvent& event); - - bool WaitCondVarD(TCondVar& condVar, TMutex& m, TInstant deadLine); - bool WaitCondVarT(TCondVar& condVar, TMutex& m, TDuration timeOut); - void WaitCondVarI(TCondVar& condVar, TMutex& m); - void WaitCondVar(TCondVar& condVar, TMutex& m); - - void Sleep(TDuration timeout); - void SleepUntil(TInstant instant); - -private: - ::TIntrusivePtr<IWaitProxy> Proxy_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NDetail -} // namespace NYT diff --git a/yt/cpp/mapreduce/common/ya.make b/yt/cpp/mapreduce/common/ya.make deleted file mode 100644 index 004708cb44a..00000000000 --- a/yt/cpp/mapreduce/common/ya.make +++ /dev/null @@ -1,23 +0,0 @@ -LIBRARY() - -INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) - -SRCS( - debug_metrics.cpp - helpers.cpp - retry_lib.cpp - wait_proxy.cpp -) - -PEERDIR( - library/cpp/json - library/cpp/svnversion - library/cpp/threading/future - library/cpp/yson - library/cpp/yson/json - library/cpp/yson/node - yt/cpp/mapreduce/interface - yt/cpp/mapreduce/interface/logging -) - -END() |
