diff options
author | a-romanov <[email protected]> | 2022-07-28 14:15:01 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-07-28 14:15:01 +0300 |
commit | b333b9c0b2519d13f1b00518e4ad398b6c06ace5 (patch) | |
tree | 9df212b1572957b9f07baa013ee2e524cad44233 | |
parent | ceb7e240f642d3a6c3ba7cd26e9e31a79b289741 (diff) |
S3 Common policy for read and write retries and some cleanup code.
6 files changed, 58 insertions, 102 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 2defa4ffaee..5eedf136446 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -75,7 +75,7 @@ public: } virtual void Fail(const TIssue& error) = 0; - virtual void Done(CURLcode result) = 0; + virtual void Done(CURLcode result, long httpResponseCode) = 0; virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0; virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0; @@ -153,13 +153,10 @@ private: } } - void Done(CURLcode result) final { + void Done(CURLcode result, long httpResponseCode) final { if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); - long httpResponseCode = 0; - curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); - const std::unique_lock lock(SyncCallbacks); while (!Callbacks.empty()) { if (1U == Callbacks.size()) @@ -225,7 +222,7 @@ private: return OnFinish(TIssues{error}); } - void Done(CURLcode result) final { + void Done(CURLcode result, long) final { if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); @@ -426,12 +423,13 @@ private: void Done(CURL* handle, CURLcode result) { TEasyCurl::TPtr easy; + long httpResponseCode = 0L; { const std::unique_lock lock(Sync); if (const auto it = Allocated.find(handle); Allocated.cend() != it) { - long httpResponseCode = 0; easy = std::move(it->second); - curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); + if (CURLE_OK == result) + curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); if (auto buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) { if (const auto& nextRetryDelay = buffer->GetNextRetryDelay(httpResponseCode)) { @@ -447,7 +445,7 @@ private: Requests.clear(); } if (easy) { - easy->Done(result); + easy->Done(result, httpResponseCode); } } diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt index fdeab19adc1..910d8cab69a 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt @@ -38,6 +38,7 @@ target_link_libraries(providers-s3-actors PUBLIC ) target_sources(providers-s3-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 67ef5f0a8c3..71439673161 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -20,9 +20,10 @@ #endif #include "yql_s3_read_actor.h" +#include "yql_s3_retry_policy.h" #include <ydb/library/yql/minikql/mkql_string_util.h> -#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> #include <ydb/library/yql/minikql/mkql_function_registry.h> @@ -89,11 +90,6 @@ struct TEvPrivate { const size_t PathIndex; }; - struct TEvRetryEvent : public NActors::TEventLocal<TEvRetryEvent, EvRetry> { - explicit TEvRetryEvent(size_t pathIndex) : PathIndex(pathIndex) {} - const size_t PathIndex; - }; - struct TEvRetryEventFunc : public NActors::TEventLocal<TEvRetryEventFunc, EvRetry> { explicit TEvRetryEventFunc(std::function<void()> functor) : Functor(std::move(functor)) {} const std::function<void()> Functor; @@ -144,18 +140,19 @@ private: double Epsilon; }; +using namespace NKikimr::NMiniKQL; + class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput { public: TS3ReadActor(ui64 inputIndex, IHTTPGateway::TPtr gateway, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const THolderFactory& holderFactory, const TString& url, const TString& token, TPathList&& paths, bool addPathIndex, ui64 startPathIndex, - const NActors::TActorId& computeActorId, - const std::shared_ptr<NS3::TRetryConfig>& retryConfig + const NActors::TActorId& computeActorId ) : Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) @@ -166,17 +163,15 @@ public: , Paths(std::move(paths)) , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) - , RetryConfig(retryConfig) {} void Bootstrap() { Become(&TS3ReadActor::StateFunc); - RetriesPerPath.resize(Paths.size(), TRetryParams(RetryConfig)); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; Gateway->Download(Url + std::get<TString>(path), Headers, std::get<size_t>(path), - std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex)); + std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex), {}, GetS3RetryPolicy()); }; } @@ -191,7 +186,6 @@ private: STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvReadResult, Handle); hFunc(TEvPrivate::TEvReadError, Handle); - hFunc(TEvPrivate::TEvRetryEvent, HandleRetry); ) static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result, size_t pathInd) { @@ -207,14 +201,14 @@ private: } } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { i64 total = 0LL; if (!Blocks.empty()) { buffer.reserve(buffer.size() + Blocks.size()); do { auto& content = std::get<IHTTPGateway::TContent>(Blocks.front()); const auto size = content.size(); - auto value = NKikimr::NMiniKQL::MakeString(std::string_view(content)); + auto value = MakeString(std::string_view(content)); if (AddPathIndex) { NUdf::TUnboxedValue* tupleItems = nullptr; auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems); @@ -245,20 +239,7 @@ private: Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } - void HandleRetry(TEvPrivate::TEvRetryEvent::TPtr& ev) { - const auto pathInd = ev->Get()->PathIndex; - Gateway->Download(Url + std::get<TString>(Paths[pathInd]), - Headers, std::get<size_t>(Paths[pathInd]), - std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd)); - } - void Handle(TEvPrivate::TEvReadError::TPtr& result) { - const auto pathInd = result->Get()->PathIndex; - Y_VERIFY(pathInd < RetriesPerPath.size()); - if (auto nextDelayMs = RetriesPerPath[pathInd].GetNextDelay()) { - Schedule(nextDelayMs, new TEvPrivate::TEvRetryEvent(pathInd)); - return; - } ++IsDoneCounter; Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, true)); } @@ -276,8 +257,8 @@ private: size_t IsDoneCounter = 0U; const IHTTPGateway::TPtr Gateway; - const NKikimr::NMiniKQL::THolderFactory& HolderFactory; - NKikimr::NMiniKQL::TPlainContainerCache ContainerCache; + const THolderFactory& HolderFactory; + TPlainContainerCache ContainerCache; const ui64 InputIndex; const NActors::TActorId ComputeActorId; @@ -291,9 +272,6 @@ private: const ui64 StartPathIndex; std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks; - - std::vector<TRetryParams> RetriesPerPath; - const std::shared_ptr<NS3::TRetryConfig> RetryConfig; }; struct TReadSpec { @@ -451,7 +429,7 @@ public: TS3StreamReadActor( ui64 inputIndex, IHTTPGateway::TPtr gateway, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const THolderFactory& holderFactory, const TString& url, const TString& token, TPathList&& paths, @@ -486,9 +464,11 @@ public: static constexpr char ActorName[] = "S3_READ_ACTOR"; private: - class TBoxedBlock : public NUdf::TBoxedValueBase { + class TBoxedBlock : public TComputationValue<TBoxedBlock> { public: - TBoxedBlock(NDB::Block& block) { + TBoxedBlock(TMemoryUsageInfo* memInfo, NDB::Block& block) + : TComputationValue(memInfo) + { Block.swap(block); } private: @@ -508,13 +488,13 @@ private: void CommitState(const NDqProto::TCheckpoint&) final {} ui64 GetInputIndex() const final { return InputIndex; } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& output, bool& finished, i64 free) final { + i64 GetAsyncInputData(TUnboxedValueVector& output, bool& finished, i64 free) final { i64 total = 0LL; if (!Blocks.empty()) do { auto& block = std::get<NDB::Block>(Blocks.front()); const i64 s = block.bytes(); - auto value = NUdf::TUnboxedValuePod(new TBoxedBlock(block)); + auto value = HolderFactory.Create<TBoxedBlock>(block); if (AddPathIndex) { NUdf::TUnboxedValue* tupleItems = nullptr; auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems); @@ -570,8 +550,8 @@ private: } const IHTTPGateway::TPtr Gateway; - const NKikimr::NMiniKQL::THolderFactory& HolderFactory; - NKikimr::NMiniKQL::TPlainContainerCache ContainerCache; + const THolderFactory& HolderFactory; + TPlainContainerCache ContainerCache; const ui64 InputIndex; const NActors::TActorId ComputeActorId; @@ -660,7 +640,7 @@ using namespace NKikimr::NMiniKQL; std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const TTypeEnvironment& typeEnv, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const THolderFactory& holderFactory, IHTTPGateway::TPtr gateway, NS3::TSource&& params, ui64 inputIndex, @@ -730,7 +710,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( return {actor, actor}; } else { const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, computeActorId, retryConfig); + std::move(paths), addPathIndex, startPathIndex, computeActorId); return {actor, actor}; } } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp new file mode 100644 index 00000000000..9e6b4e5a073 --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp @@ -0,0 +1,15 @@ +#include "yql_s3_retry_policy.h" + +namespace NYql { + +IRetryPolicy<long>::TPtr GetS3RetryPolicy() { + return IRetryPolicy<long>::GetExponentialBackoffPolicy([](long httpCode) { + switch (httpCode) { + case 0: return ERetryErrorClass::ShortRetry; + case 503: return ERetryErrorClass::LongRetry; + default: return ERetryErrorClass::NoRetry; + } + }); +} + +} diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.h b/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.h new file mode 100644 index 00000000000..729985331fa --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.h @@ -0,0 +1,9 @@ +#pragma once + +#include <library/cpp/retry/retry_policy.h> + +namespace NYql { + +IRetryPolicy<long>::TPtr GetS3RetryPolicy(); + +} diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 503f35d80c7..f4f5952e37f 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -1,4 +1,5 @@ #include "yql_s3_write_actor.h" +#include "yql_s3_retry_policy.h" #include <ydb/library/yql/utils/yql_panic.h> @@ -26,10 +27,6 @@ using namespace NActors; namespace { -ERetryErrorClass RetryS3SlowDown(long httpResponseCode) { - return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503 -} - struct TEvPrivate { // Event ids enum EEv : ui32 { @@ -68,46 +65,6 @@ struct TEvPrivate { }; }; -using TPath = std::tuple<TString, size_t>; -using TPathList = std::vector<TPath>; - -class TRetryParams { -public: - TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig) - : MaxRetries(retryConfig && retryConfig->GetMaxRetriesPerPath() ? retryConfig->GetMaxRetriesPerPath() : 3U) - , InitDelayMs(retryConfig && retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : TDuration::MilliSeconds(100)) - , InitEpsilon(retryConfig && retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : 0.1) - { - Y_VERIFY(0. < InitEpsilon && InitEpsilon < 1.); - Reset(); - } - - void Reset() { - Retries = 0U; - DelayMs = InitDelayMs; - Epsilon = InitEpsilon; - } - - TDuration GetNextDelay() { - if (++Retries > MaxRetries) - return TDuration::Zero(); - return DelayMs = GenerateNextDelay(); - } -private: - TDuration GenerateNextDelay() { - const auto low = 1. - Epsilon; - const auto jitter = low + std::rand() / (RAND_MAX / (2. * Epsilon)); - return DelayMs * jitter; - } - - const ui32 MaxRetries; - const TDuration InitDelayMs; - const double InitEpsilon; - - ui32 Retries; - TDuration DelayMs; - double Epsilon; -}; using namespace NKikimr::NMiniKQL; @@ -126,7 +83,7 @@ public: void Bootstrap(const TActorId& parentId) { ParentId = parentId; Become(&TS3FileWriteActor::InitialStateFunc); - Gateway->Upload(Url + "?uploads", MakeHeader(), "", std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown)); + Gateway->Upload(Url + "?uploads", MakeHeader(), "", std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, GetS3RetryPolicy()); } static constexpr char ActorName[] = "S3_FILE_WRITE_ACTOR"; @@ -256,7 +213,7 @@ private: const auto index = Tags.size(); Tags.emplace_back(); InFlight += size; - Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true, IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown)); + Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true, GetS3RetryPolicy()); } } @@ -269,7 +226,7 @@ private: for (const auto& tag : Tags) xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl; xml << "</CompleteMultipartUpload>" << Endl; - Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, ParentId, Key, std::placeholders::_1), false, IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown)); + Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, ParentId, Key, std::placeholders::_1), false, GetS3RetryPolicy()); } IHTTPGateway::THeaders MakeHeader() const { @@ -295,8 +252,6 @@ private: std::queue<TString> Parts; std::vector<TString> Tags; - std::vector<TRetryParams> RetriesPerPath; - TString UploadId; }; @@ -401,8 +356,6 @@ private: const TString Path; const std::vector<TString> Keys; - std::vector<TRetryParams> RetriesPerPath; - std::unordered_map<TString, TS3FileWriteActor*> FileWriteActors; }; |