aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-04-14 11:53:59 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-04-14 11:53:59 +0300
commit2f8e8eb079c25dfe7caab049c7cd43959dde5408 (patch)
tree19de4d7c880ea7374032ff931ac6f2b3efea5b86
parent0e003a271468efc31f26aa2995d245b526f8096f (diff)
downloadydb-2f8e8eb079c25dfe7caab049c7cd43959dde5408.tar.gz
YQ-727 S3 retry download in streaming mode.
ref:1fb768f57503ba60b8764f8b5b2b910c2fa2724e
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp185
1 files changed, 111 insertions, 74 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 717d126796e..9f3330f4a99 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -46,63 +46,72 @@ struct TEvPrivate {
// Events
struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> {
- TEvReadResult(IHTTPGateway::TContent&& result, size_t pathInd): Result(std::move(result)), PathIndex(pathInd) {}
+ TEvReadResult(IHTTPGateway::TContent&& result, size_t pathInd = 0U): Result(std::move(result)), PathIndex(pathInd) {}
IHTTPGateway::TContent Result;
- const size_t PathIndex = 0;
+ const size_t PathIndex;
};
- struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {
- TEvReadFinished(size_t pathInd) : PathIndex(pathInd) {}
- const size_t PathIndex = 0;
- };
+ struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {};
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
- TEvReadError(TIssues&& error, size_t pathInd) : Error(std::move(error)), PathIndex(pathInd) {}
+ TEvReadError(TIssues&& error, size_t pathInd = 0U) : Error(std::move(error)), PathIndex(pathInd) {}
const TIssues Error;
- const size_t PathIndex = 0;
+ const size_t PathIndex;
};
struct TEvRetryEvent : public NActors::TEventLocal<TEvRetryEvent, EvRetry> {
explicit TEvRetryEvent(size_t pathIndex) : PathIndex(pathIndex) {}
- const size_t PathIndex = 0;
+ const size_t PathIndex;
+ };
+
+ struct TEvRetryEventFunc : public NActors::TEventLocal<TEvRetryEventFunc, EvRetry> {
+ explicit TEvRetryEventFunc(std::function<void()> functor) : Functor(std::move(functor)) {}
+ const std::function<void()> Functor;
};
};
using TPath = std::tuple<TString, size_t>;
using TPathList = std::vector<TPath>;
-class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqSourceActor {
-private:
- class TRetryParams {
- public:
- TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig) {
- if (retryConfig) {
- DelayMs = retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : DelayMs;
- Epsilon = retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : Epsilon;
- Y_VERIFY(0.0 < Epsilon && Epsilon < 1.0);
- }
- }
+class TRetryParams {
+public:
+ TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
+ : MaxRetries(retryConfig && retryConfig->GetMaxRetriesPerPath() ? retryConfig->GetMaxRetriesPerPath() : 3U)
+ , InitDelayMs(retryConfig && retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : TDuration::MilliSeconds(100))
+ , InitEpsilon(retryConfig && retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : 0.1)
+ {
+ Y_VERIFY(0. < InitEpsilon && InitEpsilon < 1.);
+ Reset();
+ }
- TDuration GetNextDelay(ui32 maxRetries) {
- if (Retries > maxRetries) return TDuration::Zero();
- return DelayMs = GenerateNextDelay();
- };
+ void Reset() {
+ Retries = 0U;
+ DelayMs = InitDelayMs;
+ Epsilon = InitEpsilon;
+ }
- void IncRetries() {
- ++Retries;
- }
+ TDuration GetNextDelay() {
+ if (++Retries > MaxRetries)
+ return TDuration::Zero();
+ return DelayMs = GenerateNextDelay();
+ }
+private:
+ TDuration GenerateNextDelay() {
+ const auto low = 1. - Epsilon;
+ const auto jitter = low + std::rand() / (RAND_MAX / (2. * Epsilon));
+ return DelayMs * jitter;
+ }
- private:
- TDuration GenerateNextDelay() {
- double low = 1 - Epsilon;
- auto jitter = low + std::rand() / (RAND_MAX / (2 * Epsilon));
- return DelayMs * jitter;
- }
+ const ui32 MaxRetries;
+ const TDuration InitDelayMs;
+ const double InitEpsilon;
- ui32 Retries = 0;
- TDuration DelayMs = TDuration::MilliSeconds(100);
- double Epsilon = 0.1;
- };
+ ui32 Retries;
+ TDuration DelayMs;
+ double Epsilon;
+};
+
+class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqSourceActor {
public:
TS3ReadActor(ui64 inputIndex,
IHTTPGateway::TPtr gateway,
@@ -119,11 +128,7 @@ public:
, Headers(MakeHeader(token))
, Paths(std::move(paths))
, RetryConfig(retryConfig)
- {
- if (RetryConfig) {
- MaxRetriesPerPath = RetryConfig->GetMaxRetriesPerPath() ? RetryConfig->GetMaxRetriesPerPath() : MaxRetriesPerPath;
- }
- }
+ {}
void Bootstrap() {
Become(&TS3ReadActor::StateFunc);
@@ -192,7 +197,6 @@ private:
void HandleRetry(TEvPrivate::TEvRetryEvent::TPtr& ev) {
const auto pathInd = ev->Get()->PathIndex;
- RetriesPerPath[pathInd].IncRetries();
Gateway->Download(Url + std::get<TString>(Paths[pathInd]),
Headers, std::get<size_t>(Paths[pathInd]),
std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd));
@@ -201,7 +205,7 @@ private:
void Handle(TEvPrivate::TEvReadError::TPtr& result) {
const auto pathInd = result->Get()->PathIndex;
Y_VERIFY(pathInd < RetriesPerPath.size());
- if (auto nextDelayMs = RetriesPerPath[pathInd].GetNextDelay(MaxRetriesPerPath)) {
+ if (auto nextDelayMs = RetriesPerPath[pathInd].GetNextDelay()) {
Schedule(nextDelayMs, new TEvPrivate::TEvRetryEvent(pathInd));
return;
}
@@ -236,7 +240,6 @@ private:
std::vector<TRetryParams> RetriesPerPath;
const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
- ui32 MaxRetriesPerPath = 3;
};
using namespace NKikimr::NMiniKQL;
@@ -286,7 +289,7 @@ private:
public:
TS3ReadCoroImpl(const TTypeEnvironment& typeEnv, const IFunctionRegistry& functionRegistry, ui64 inputIndex, const NActors::TActorId& computeActorId, ui64, TString format, TString rowType, TOutput::TPtr outputs)
- : TActorCoroImpl(256_KB), TypeEnv(typeEnv), FunctionRegistry(functionRegistry), InputIndex(inputIndex), Format(std::move(format)), RowType(std::move(rowType)), ComputeActorId(computeActorId), Outputs(std::move(outputs))
+ : TActorCoroImpl(512_KB), TypeEnv(typeEnv), FunctionRegistry(functionRegistry), InputIndex(inputIndex), Format(std::move(format)), RowType(std::move(rowType)), ComputeActorId(computeActorId), Outputs(std::move(outputs))
{}
bool Next(NUdf::TUnboxedValue& value) {
@@ -305,6 +308,7 @@ public:
return false;
case TEvPrivate::TEvReadResult::EventType:
value = MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result)));
+ Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex));
return true;
default:
return false;
@@ -343,6 +347,8 @@ private:
Outputs->Data.emplace_back(std::move(v));
Outputs.reset();
+ Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex));
+
} catch (const yexception& err) {
Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue(err.what())}, true));
return;
@@ -362,6 +368,26 @@ private:
};
class TS3ReadCoroActor : public TActorCoro {
+ struct TRetryStuff {
+ using TPtr = std::shared_ptr<TRetryStuff>;
+
+ TRetryStuff(
+ IHTTPGateway::TPtr gateway,
+ TString url,
+ const IHTTPGateway::THeaders& headers,
+ const std::shared_ptr<NS3::TRetryConfig>& retryConfig,
+ std::size_t expectedSize
+ ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryParams(retryConfig)
+ {}
+
+ const IHTTPGateway::TPtr Gateway;
+ const TString Url;
+ const IHTTPGateway::THeaders Headers;
+ const std::size_t ExpectedSize;
+
+ std::size_t Offset = 0U;
+ TRetryParams RetryParams;
+ };
public:
TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl,
IHTTPGateway::TPtr gateway,
@@ -369,33 +395,38 @@ public:
const IHTTPGateway::THeaders& headers,
const TString& path,
const std::size_t expectedSize,
+ const std::shared_ptr<NS3::TRetryConfig>& retryConfig,
TOutput::TPtr outputs)
: TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
- , Gateway(std::move(gateway))
- , Url(url)
- , Headers(headers)
- , Path(path)
- , ExpectedSize(expectedSize)
+ , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, retryConfig, expectedSize))
, Outputs(std::move(outputs))
{}
private:
- static void OnNewData(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TContent&& data) {
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(data), 0)));
+ static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TContent&& data) {
+ retryStuff->Offset += data.size();
+ retryStuff->RetryParams.Reset();
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadResult(std::move(data))));
}
- static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, std::optional<TIssues> result) {
+ static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::optional<TIssues> result) {
if (result)
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(TIssues{*result}, 0)));
+ if (const auto nextDelayMs = retryStuff->RetryParams.GetNextDelay())
+ actorSystem->Schedule(nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
+ else
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(TIssues{*result})));
else
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadFinished(0)));
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished));
}
- TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) {
- Gateway->Download(Url + Path,
- Headers, 0ULL, ExpectedSize,
- std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1),
- std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1));
+ static void DownloadStart(const TRetryStuff::TPtr& retryStuff, const TActorId& self, const TActorId& parent) {
+ retryStuff->Gateway->Download(retryStuff->Url,
+ retryStuff->Headers, retryStuff->Offset, retryStuff->ExpectedSize > retryStuff->Offset ? retryStuff->ExpectedSize - retryStuff->Offset : 0U,
+ std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1),
+ std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1));
+ }
+ TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) {
+ DownloadStart(RetryStuff, self, parent);
return TActorCoro::AfterRegister(self, parent);
}
@@ -403,12 +434,7 @@ private:
return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token};
}
- const IHTTPGateway::TPtr Gateway;
-
- const TString Url;
- const IHTTPGateway::THeaders Headers;
- const TString Path;
- const std::size_t ExpectedSize;
+ const TRetryStuff::TPtr RetryStuff;
TOutput::TPtr Outputs;
};
@@ -424,7 +450,8 @@ public:
TPathList&& paths,
TString format,
TString rowType,
- const NActors::TActorId& computeActorId
+ const NActors::TActorId& computeActorId,
+ const std::shared_ptr<NS3::TRetryConfig>& retryConfig
) : TypeEnv(typeEnv)
, FunctionRegistry(functionRegistry)
, Gateway(std::move(gateway))
@@ -435,15 +462,16 @@ public:
, Paths(std::move(paths))
, Format(format)
, RowType(rowType)
+ , RetryConfig(retryConfig)
, Outputs(std::make_shared<TOutput>())
{}
void Bootstrap() {
- for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
- const TPath& path = Paths[pathInd];
+ Become(&TS3StreamReadActor::StateFunc);
+ for (const auto& path : Paths) {
auto impl = MakeHolder<TS3ReadCoroImpl>(TypeEnv, FunctionRegistry, InputIndex, ComputeActorId, Paths.size(), Format, RowType, Outputs);
- RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), Outputs).Release());
- };
+ RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), RetryConfig, Outputs).Release());
+ }
}
static constexpr char ActorName[] = "S3_READ_ACTOR";
@@ -478,7 +506,15 @@ private:
static IHTTPGateway::THeaders MakeHeader(const TString& token) {
return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token};
}
-private:
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry);
+ )
+
+ void HandleRetry(TEvPrivate::TEvRetryEventFunc::TPtr& retry) {
+ return retry->Get()->Functor();
+ }
+
const TTypeEnvironment& TypeEnv;
const IFunctionRegistry& FunctionRegistry;
@@ -491,6 +527,7 @@ private:
const IHTTPGateway::THeaders Headers;
const TPathList Paths;
const TString Format, RowType, Compression;
+ const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
TOutput::TPtr Outputs;
};
@@ -532,7 +569,7 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
if (params.HasFormat() && params.HasRowType()) {
- const auto actor = new TS3StreamReadActor(typeEnv, functionRegistry, inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), params.GetFormat(), params.GetRowType(), computeActorId);
+ const auto actor = new TS3StreamReadActor(typeEnv, functionRegistry, inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), params.GetFormat(), params.GetRowType(), computeActorId, retryConfig);
return {actor, actor};
} else {
const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig);