diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-04-14 11:53:59 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-04-14 11:53:59 +0300 |
commit | 2f8e8eb079c25dfe7caab049c7cd43959dde5408 (patch) | |
tree | 19de4d7c880ea7374032ff931ac6f2b3efea5b86 | |
parent | 0e003a271468efc31f26aa2995d245b526f8096f (diff) | |
download | ydb-2f8e8eb079c25dfe7caab049c7cd43959dde5408.tar.gz |
YQ-727 S3 retry download in streaming mode.
ref:1fb768f57503ba60b8764f8b5b2b910c2fa2724e
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 185 |
1 files changed, 111 insertions, 74 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 717d126796e..9f3330f4a99 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -46,63 +46,72 @@ struct TEvPrivate { // Events struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> { - TEvReadResult(IHTTPGateway::TContent&& result, size_t pathInd): Result(std::move(result)), PathIndex(pathInd) {} + TEvReadResult(IHTTPGateway::TContent&& result, size_t pathInd = 0U): Result(std::move(result)), PathIndex(pathInd) {} IHTTPGateway::TContent Result; - const size_t PathIndex = 0; + const size_t PathIndex; }; - struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> { - TEvReadFinished(size_t pathInd) : PathIndex(pathInd) {} - const size_t PathIndex = 0; - }; + struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {}; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { - TEvReadError(TIssues&& error, size_t pathInd) : Error(std::move(error)), PathIndex(pathInd) {} + TEvReadError(TIssues&& error, size_t pathInd = 0U) : Error(std::move(error)), PathIndex(pathInd) {} const TIssues Error; - const size_t PathIndex = 0; + const size_t PathIndex; }; struct TEvRetryEvent : public NActors::TEventLocal<TEvRetryEvent, EvRetry> { explicit TEvRetryEvent(size_t pathIndex) : PathIndex(pathIndex) {} - const size_t PathIndex = 0; + const size_t PathIndex; + }; + + struct TEvRetryEventFunc : public NActors::TEventLocal<TEvRetryEventFunc, EvRetry> { + explicit TEvRetryEventFunc(std::function<void()> functor) : Functor(std::move(functor)) {} + const std::function<void()> Functor; }; }; using TPath = std::tuple<TString, size_t>; using TPathList = std::vector<TPath>; -class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqSourceActor { -private: - class TRetryParams { - public: - TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig) { - if (retryConfig) { - DelayMs = retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : DelayMs; - Epsilon = retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : Epsilon; - Y_VERIFY(0.0 < Epsilon && Epsilon < 1.0); - } - } +class TRetryParams { +public: + TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig) + : MaxRetries(retryConfig && retryConfig->GetMaxRetriesPerPath() ? retryConfig->GetMaxRetriesPerPath() : 3U) + , InitDelayMs(retryConfig && retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : TDuration::MilliSeconds(100)) + , InitEpsilon(retryConfig && retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : 0.1) + { + Y_VERIFY(0. < InitEpsilon && InitEpsilon < 1.); + Reset(); + } - TDuration GetNextDelay(ui32 maxRetries) { - if (Retries > maxRetries) return TDuration::Zero(); - return DelayMs = GenerateNextDelay(); - }; + void Reset() { + Retries = 0U; + DelayMs = InitDelayMs; + Epsilon = InitEpsilon; + } - void IncRetries() { - ++Retries; - } + TDuration GetNextDelay() { + if (++Retries > MaxRetries) + return TDuration::Zero(); + return DelayMs = GenerateNextDelay(); + } +private: + TDuration GenerateNextDelay() { + const auto low = 1. - Epsilon; + const auto jitter = low + std::rand() / (RAND_MAX / (2. * Epsilon)); + return DelayMs * jitter; + } - private: - TDuration GenerateNextDelay() { - double low = 1 - Epsilon; - auto jitter = low + std::rand() / (RAND_MAX / (2 * Epsilon)); - return DelayMs * jitter; - } + const ui32 MaxRetries; + const TDuration InitDelayMs; + const double InitEpsilon; - ui32 Retries = 0; - TDuration DelayMs = TDuration::MilliSeconds(100); - double Epsilon = 0.1; - }; + ui32 Retries; + TDuration DelayMs; + double Epsilon; +}; + +class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqSourceActor { public: TS3ReadActor(ui64 inputIndex, IHTTPGateway::TPtr gateway, @@ -119,11 +128,7 @@ public: , Headers(MakeHeader(token)) , Paths(std::move(paths)) , RetryConfig(retryConfig) - { - if (RetryConfig) { - MaxRetriesPerPath = RetryConfig->GetMaxRetriesPerPath() ? RetryConfig->GetMaxRetriesPerPath() : MaxRetriesPerPath; - } - } + {} void Bootstrap() { Become(&TS3ReadActor::StateFunc); @@ -192,7 +197,6 @@ private: void HandleRetry(TEvPrivate::TEvRetryEvent::TPtr& ev) { const auto pathInd = ev->Get()->PathIndex; - RetriesPerPath[pathInd].IncRetries(); Gateway->Download(Url + std::get<TString>(Paths[pathInd]), Headers, std::get<size_t>(Paths[pathInd]), std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd)); @@ -201,7 +205,7 @@ private: void Handle(TEvPrivate::TEvReadError::TPtr& result) { const auto pathInd = result->Get()->PathIndex; Y_VERIFY(pathInd < RetriesPerPath.size()); - if (auto nextDelayMs = RetriesPerPath[pathInd].GetNextDelay(MaxRetriesPerPath)) { + if (auto nextDelayMs = RetriesPerPath[pathInd].GetNextDelay()) { Schedule(nextDelayMs, new TEvPrivate::TEvRetryEvent(pathInd)); return; } @@ -236,7 +240,6 @@ private: std::vector<TRetryParams> RetriesPerPath; const std::shared_ptr<NS3::TRetryConfig> RetryConfig; - ui32 MaxRetriesPerPath = 3; }; using namespace NKikimr::NMiniKQL; @@ -286,7 +289,7 @@ private: public: TS3ReadCoroImpl(const TTypeEnvironment& typeEnv, const IFunctionRegistry& functionRegistry, ui64 inputIndex, const NActors::TActorId& computeActorId, ui64, TString format, TString rowType, TOutput::TPtr outputs) - : TActorCoroImpl(256_KB), TypeEnv(typeEnv), FunctionRegistry(functionRegistry), InputIndex(inputIndex), Format(std::move(format)), RowType(std::move(rowType)), ComputeActorId(computeActorId), Outputs(std::move(outputs)) + : TActorCoroImpl(512_KB), TypeEnv(typeEnv), FunctionRegistry(functionRegistry), InputIndex(inputIndex), Format(std::move(format)), RowType(std::move(rowType)), ComputeActorId(computeActorId), Outputs(std::move(outputs)) {} bool Next(NUdf::TUnboxedValue& value) { @@ -305,6 +308,7 @@ public: return false; case TEvPrivate::TEvReadResult::EventType: value = MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result))); + Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex)); return true; default: return false; @@ -343,6 +347,8 @@ private: Outputs->Data.emplace_back(std::move(v)); Outputs.reset(); + Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex)); + } catch (const yexception& err) { Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue(err.what())}, true)); return; @@ -362,6 +368,26 @@ private: }; class TS3ReadCoroActor : public TActorCoro { + struct TRetryStuff { + using TPtr = std::shared_ptr<TRetryStuff>; + + TRetryStuff( + IHTTPGateway::TPtr gateway, + TString url, + const IHTTPGateway::THeaders& headers, + const std::shared_ptr<NS3::TRetryConfig>& retryConfig, + std::size_t expectedSize + ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryParams(retryConfig) + {} + + const IHTTPGateway::TPtr Gateway; + const TString Url; + const IHTTPGateway::THeaders Headers; + const std::size_t ExpectedSize; + + std::size_t Offset = 0U; + TRetryParams RetryParams; + }; public: TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, IHTTPGateway::TPtr gateway, @@ -369,33 +395,38 @@ public: const IHTTPGateway::THeaders& headers, const TString& path, const std::size_t expectedSize, + const std::shared_ptr<NS3::TRetryConfig>& retryConfig, TOutput::TPtr outputs) : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) - , Gateway(std::move(gateway)) - , Url(url) - , Headers(headers) - , Path(path) - , ExpectedSize(expectedSize) + , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, retryConfig, expectedSize)) , Outputs(std::move(outputs)) {} private: - static void OnNewData(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TContent&& data) { - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(data), 0))); + static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TContent&& data) { + retryStuff->Offset += data.size(); + retryStuff->RetryParams.Reset(); + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadResult(std::move(data)))); } - static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, std::optional<TIssues> result) { + static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::optional<TIssues> result) { if (result) - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(TIssues{*result}, 0))); + if (const auto nextDelayMs = retryStuff->RetryParams.GetNextDelay()) + actorSystem->Schedule(nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); + else + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(TIssues{*result}))); else - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadFinished(0))); + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished)); } - TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) { - Gateway->Download(Url + Path, - Headers, 0ULL, ExpectedSize, - std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1), - std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1)); + static void DownloadStart(const TRetryStuff::TPtr& retryStuff, const TActorId& self, const TActorId& parent) { + retryStuff->Gateway->Download(retryStuff->Url, + retryStuff->Headers, retryStuff->Offset, retryStuff->ExpectedSize > retryStuff->Offset ? retryStuff->ExpectedSize - retryStuff->Offset : 0U, + std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1), + std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1)); + } + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) { + DownloadStart(RetryStuff, self, parent); return TActorCoro::AfterRegister(self, parent); } @@ -403,12 +434,7 @@ private: return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token}; } - const IHTTPGateway::TPtr Gateway; - - const TString Url; - const IHTTPGateway::THeaders Headers; - const TString Path; - const std::size_t ExpectedSize; + const TRetryStuff::TPtr RetryStuff; TOutput::TPtr Outputs; }; @@ -424,7 +450,8 @@ public: TPathList&& paths, TString format, TString rowType, - const NActors::TActorId& computeActorId + const NActors::TActorId& computeActorId, + const std::shared_ptr<NS3::TRetryConfig>& retryConfig ) : TypeEnv(typeEnv) , FunctionRegistry(functionRegistry) , Gateway(std::move(gateway)) @@ -435,15 +462,16 @@ public: , Paths(std::move(paths)) , Format(format) , RowType(rowType) + , RetryConfig(retryConfig) , Outputs(std::make_shared<TOutput>()) {} void Bootstrap() { - for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { - const TPath& path = Paths[pathInd]; + Become(&TS3StreamReadActor::StateFunc); + for (const auto& path : Paths) { auto impl = MakeHolder<TS3ReadCoroImpl>(TypeEnv, FunctionRegistry, InputIndex, ComputeActorId, Paths.size(), Format, RowType, Outputs); - RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), Outputs).Release()); - }; + RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), RetryConfig, Outputs).Release()); + } } static constexpr char ActorName[] = "S3_READ_ACTOR"; @@ -478,7 +506,15 @@ private: static IHTTPGateway::THeaders MakeHeader(const TString& token) { return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token}; } -private: + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry); + ) + + void HandleRetry(TEvPrivate::TEvRetryEventFunc::TPtr& retry) { + return retry->Get()->Functor(); + } + const TTypeEnvironment& TypeEnv; const IFunctionRegistry& FunctionRegistry; @@ -491,6 +527,7 @@ private: const IHTTPGateway::THeaders Headers; const TPathList Paths; const TString Format, RowType, Compression; + const std::shared_ptr<NS3::TRetryConfig> RetryConfig; TOutput::TPtr Outputs; }; @@ -532,7 +569,7 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor( const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); if (params.HasFormat() && params.HasRowType()) { - const auto actor = new TS3StreamReadActor(typeEnv, functionRegistry, inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), params.GetFormat(), params.GetRowType(), computeActorId); + const auto actor = new TS3StreamReadActor(typeEnv, functionRegistry, inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), params.GetFormat(), params.GetRowType(), computeActorId, retryConfig); return {actor, actor}; } else { const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig); |