diff options
author | a-romanov <[email protected]> | 2022-08-01 19:00:47 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-08-01 19:00:47 +0300 |
commit | 102d954c33ca380bca270945028c545723e69e11 (patch) | |
tree | 1e5ca8b64d8fc010d1189c87c1180f3501ab79bb | |
parent | 41e73b0260fb91fbeadbdd05d6ab2a8e5dd1587b (diff) |
Common retry policy for read stream from S3.
5 files changed, 60 insertions, 75 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 1ea067cfcc2..4933e2fcf30 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -229,12 +229,12 @@ private: return OnFinish(TIssues{error}); } - void Done(CURLcode result, long) final { + void Done(CURLcode result, long httpResponseCode) final { if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); Working = false; - return OnFinish(std::nullopt); + return OnFinish(httpResponseCode); } size_t Write(void* contents, size_t size, size_t nmemb) final { diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 373fa86c04a..7f0fbb3c00b 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -95,7 +95,7 @@ public: }; using TOnNewDataPart = std::function<void(TCountedContent&&)>; - using TOnDownloadFinish = std::function<void(std::optional<TIssues>)>; + using TOnDownloadFinish = std::function<void(std::variant<long, TIssues>)>; // http code or issues. virtual void Download( TString url, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 77ff5012b2d..25ab825ea9a 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -82,7 +82,10 @@ struct TEvPrivate { IHTTPGateway::TCountedContent Result; }; - struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {}; + struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> { + explicit TEvReadFinished(long httpResponseCode) : HttpResponseCode(httpResponseCode) {} + const long HttpResponseCode; + }; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { TEvReadError(TIssues&& error, size_t pathInd = std::numeric_limits<size_t>::max()) : Error(std::move(error)), PathIndex(pathInd) {} @@ -102,44 +105,6 @@ struct TEvPrivate { }; }; -class TRetryParams { -public: - explicit TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig) - : MaxRetries(retryConfig && retryConfig->GetMaxRetriesPerPath() ? retryConfig->GetMaxRetriesPerPath() : 3U) - , InitDelayMs(retryConfig && retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : TDuration::MilliSeconds(100)) - , InitEpsilon(retryConfig && retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : 0.1) - { - Y_VERIFY(0. < InitEpsilon && InitEpsilon < 1.); - Reset(); - } - - void Reset() { - Retries = 0U; - DelayMs = InitDelayMs; - Epsilon = InitEpsilon; - } - - TDuration GetNextDelay() { - if (++Retries > MaxRetries) - return TDuration::Zero(); - return DelayMs = GenerateNextDelay(); - } -private: - TDuration GenerateNextDelay() { - const auto low = 1. - Epsilon; - const auto jitter = low + std::rand() / (RAND_MAX / (2. * Epsilon)); - return DelayMs * jitter; - } - - const ui32 MaxRetries; - const TDuration InitDelayMs; - const double InitEpsilon; - - ui32 Retries; - TDuration DelayMs; - double Epsilon; -}; - using namespace NKikimr::NMiniKQL; class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput { @@ -308,15 +273,16 @@ public: {} bool Next(TString& value) { - if (Finished) + if (HttpResponseCode) return false; const auto ev = WaitForSpecificEvent<TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); switch (const auto etype = ev->GetTypeRewrite()) { case TEvPrivate::TEvReadFinished::EventType: - Finished = true; + HttpResponseCode = ev->Get<TEvPrivate::TEvReadFinished>()->HttpResponseCode; return false; case TEvPrivate::TEvReadError::EventType: + HttpResponseCode = 0L; Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true)); return false; case TEvPrivate::TEvDataPart::EventType: @@ -328,6 +294,24 @@ public: } } private: + void WaitFinish() { + if (HttpResponseCode) + return; + + const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); + switch (const auto etype = ev->GetTypeRewrite()) { + case TEvPrivate::TEvReadFinished::EventType: + HttpResponseCode = ev->Get<TEvPrivate::TEvReadFinished>()->HttpResponseCode; + break; + case TEvPrivate::TEvReadError::EventType: + HttpResponseCode = 0L; + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true)); + break; + default: + break; + } + } + void Run() final try { TReadBufferFromStream buffer(this); const auto decompress(MakeDecompressor(buffer, ReadSpec->Compression)); @@ -337,7 +321,10 @@ private: while (auto block = stream.read()) Send(SourceActorId, new TEvPrivate::TEvNextBlock(block, PathIndex)); - Send(SourceActorId, new TEvPrivate::TEvReadFinished); + WaitFinish(); + + if (*HttpResponseCode) + Send(SourceActorId, new TEvPrivate::TEvReadFinished(*HttpResponseCode)); } catch (const TDtorException&) { throw; } catch (const std::exception& err) { @@ -356,7 +343,7 @@ private: const NActors::TActorId ComputeActorId; const size_t PathIndex; const TString Path; - bool Finished = false; + std::optional<long> HttpResponseCode; }; class TS3ReadCoroActor : public TActorCoro { @@ -367,9 +354,8 @@ class TS3ReadCoroActor : public TActorCoro { IHTTPGateway::TPtr gateway, TString url, const IHTTPGateway::THeaders& headers, - const std::shared_ptr<NS3::TRetryConfig>& retryConfig, std::size_t expectedSize - ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryParams(retryConfig) + ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState()) {} const IHTTPGateway::TPtr Gateway; @@ -378,7 +364,7 @@ class TS3ReadCoroActor : public TActorCoro { const std::size_t ExpectedSize; std::size_t Offset = 0U; - TRetryParams RetryParams; + const IRetryPolicy<long>::IRetryState::TPtr RetryState; }; public: TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, @@ -386,26 +372,31 @@ public: const TString& url, const IHTTPGateway::THeaders& headers, const TString& path, - const std::size_t expectedSize, - const std::shared_ptr<NS3::TRetryConfig>& retryConfig) + const std::size_t expectedSize) : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) - , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, retryConfig, expectedSize)) + , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, expectedSize)) {} private: static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TCountedContent&& data) { retryStuff->Offset += data.size(); - retryStuff->RetryParams.Reset(); actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data)))); } - static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::optional<TIssues> result) { - if (result) - if (const auto nextDelayMs = retryStuff->RetryParams.GetNextDelay()) - actorSystem->Schedule(nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); - else - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(TIssues{*result}))); - else - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished)); + static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::variant<long, TIssues> result) { + switch (result.index()) { + case 0U: + if (const auto httpCode = std::get<long>(result); const auto nextDelayMs = retryStuff->RetryState->GetNextRetryDelay(httpCode)) + actorSystem->Schedule(*nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); + else + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(httpCode))); + break; + case 1U: + if (const auto nextDelayMs = retryStuff->RetryState->GetNextRetryDelay(0L)) + actorSystem->Schedule(*nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); + else + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result))))); + break; + } } static void DownloadStart(const TRetryStuff::TPtr& retryStuff, const TActorId& self, const TActorId& parent) { @@ -439,8 +430,7 @@ public: bool addPathIndex, ui64 startPathIndex, const TReadSpec::TPtr& readSpec, - const NActors::TActorId& computeActorId, - const std::shared_ptr<NS3::TRetryConfig>& retryConfig + const NActors::TActorId& computeActorId ) : Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) @@ -451,7 +441,6 @@ public: , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) , ReadSpec(readSpec) - , RetryConfig(retryConfig) , Count(Paths.size()) {} @@ -460,7 +449,7 @@ public: for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, SelfId(), ComputeActorId, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path)); - RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), RetryConfig).Release()); + RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path)).Release()); } } @@ -566,7 +555,6 @@ private: const bool AddPathIndex; const ui64 StartPathIndex; const TReadSpec::TPtr ReadSpec; - const std::shared_ptr<NS3::TRetryConfig> RetryConfig; std::deque<std::tuple<NDB::Block, size_t>> Blocks; ui32 Count; }; @@ -650,8 +638,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId, - ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const std::shared_ptr<NS3::TRetryConfig>& retryConfig) + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry(); @@ -709,7 +696,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( #undef SUPPORTED_FLAGS const auto actor = new TS3StreamReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryConfig); + std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId); return {actor, actor}; } else { const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index d8b153ccbcc..e8103ff2b07 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -18,7 +18,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId, - ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr); + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index fa2ce935761..f5817ad55f9 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -12,18 +12,17 @@ void RegisterS3ReadActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, - const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) { + const std::shared_ptr<NYql::NS3::TRetryConfig>&) { #ifdef __linux__ NDB::registerFormats(); factory.RegisterSource<NS3::TSource>("S3Source", - [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { - return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig); + [credentialsFactory, gateway](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { + return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory); }); #else Y_UNUSED(factory); Y_UNUSED(credentialsFactory); Y_UNUSED(gateway); - Y_UNUSED(retryConfig); #endif } |