summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-08-01 19:00:47 +0300
committera-romanov <[email protected]>2022-08-01 19:00:47 +0300
commit102d954c33ca380bca270945028c545723e69e11 (patch)
tree1e5ca8b64d8fc010d1189c87c1180f3501ab79bb
parent41e73b0260fb91fbeadbdd05d6ab2a8e5dd1587b (diff)
Common retry policy for read stream from S3.
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp4
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp119
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp7
5 files changed, 60 insertions, 75 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 1ea067cfcc2..4933e2fcf30 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -229,12 +229,12 @@ private:
return OnFinish(TIssues{error});
}
- void Done(CURLcode result, long) final {
+ void Done(CURLcode result, long httpResponseCode) final {
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
Working = false;
- return OnFinish(std::nullopt);
+ return OnFinish(httpResponseCode);
}
size_t Write(void* contents, size_t size, size_t nmemb) final {
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 373fa86c04a..7f0fbb3c00b 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -95,7 +95,7 @@ public:
};
using TOnNewDataPart = std::function<void(TCountedContent&&)>;
- using TOnDownloadFinish = std::function<void(std::optional<TIssues>)>;
+ using TOnDownloadFinish = std::function<void(std::variant<long, TIssues>)>; // http code or issues.
virtual void Download(
TString url,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 77ff5012b2d..25ab825ea9a 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -82,7 +82,10 @@ struct TEvPrivate {
IHTTPGateway::TCountedContent Result;
};
- struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {};
+ struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {
+ explicit TEvReadFinished(long httpResponseCode) : HttpResponseCode(httpResponseCode) {}
+ const long HttpResponseCode;
+ };
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
TEvReadError(TIssues&& error, size_t pathInd = std::numeric_limits<size_t>::max()) : Error(std::move(error)), PathIndex(pathInd) {}
@@ -102,44 +105,6 @@ struct TEvPrivate {
};
};
-class TRetryParams {
-public:
- explicit TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
- : MaxRetries(retryConfig && retryConfig->GetMaxRetriesPerPath() ? retryConfig->GetMaxRetriesPerPath() : 3U)
- , InitDelayMs(retryConfig && retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : TDuration::MilliSeconds(100))
- , InitEpsilon(retryConfig && retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : 0.1)
- {
- Y_VERIFY(0. < InitEpsilon && InitEpsilon < 1.);
- Reset();
- }
-
- void Reset() {
- Retries = 0U;
- DelayMs = InitDelayMs;
- Epsilon = InitEpsilon;
- }
-
- TDuration GetNextDelay() {
- if (++Retries > MaxRetries)
- return TDuration::Zero();
- return DelayMs = GenerateNextDelay();
- }
-private:
- TDuration GenerateNextDelay() {
- const auto low = 1. - Epsilon;
- const auto jitter = low + std::rand() / (RAND_MAX / (2. * Epsilon));
- return DelayMs * jitter;
- }
-
- const ui32 MaxRetries;
- const TDuration InitDelayMs;
- const double InitEpsilon;
-
- ui32 Retries;
- TDuration DelayMs;
- double Epsilon;
-};
-
using namespace NKikimr::NMiniKQL;
class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput {
@@ -308,15 +273,16 @@ public:
{}
bool Next(TString& value) {
- if (Finished)
+ if (HttpResponseCode)
return false;
const auto ev = WaitForSpecificEvent<TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
switch (const auto etype = ev->GetTypeRewrite()) {
case TEvPrivate::TEvReadFinished::EventType:
- Finished = true;
+ HttpResponseCode = ev->Get<TEvPrivate::TEvReadFinished>()->HttpResponseCode;
return false;
case TEvPrivate::TEvReadError::EventType:
+ HttpResponseCode = 0L;
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
return false;
case TEvPrivate::TEvDataPart::EventType:
@@ -328,6 +294,24 @@ public:
}
}
private:
+ void WaitFinish() {
+ if (HttpResponseCode)
+ return;
+
+ const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
+ switch (const auto etype = ev->GetTypeRewrite()) {
+ case TEvPrivate::TEvReadFinished::EventType:
+ HttpResponseCode = ev->Get<TEvPrivate::TEvReadFinished>()->HttpResponseCode;
+ break;
+ case TEvPrivate::TEvReadError::EventType:
+ HttpResponseCode = 0L;
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
+ break;
+ default:
+ break;
+ }
+ }
+
void Run() final try {
TReadBufferFromStream buffer(this);
const auto decompress(MakeDecompressor(buffer, ReadSpec->Compression));
@@ -337,7 +321,10 @@ private:
while (auto block = stream.read())
Send(SourceActorId, new TEvPrivate::TEvNextBlock(block, PathIndex));
- Send(SourceActorId, new TEvPrivate::TEvReadFinished);
+ WaitFinish();
+
+ if (*HttpResponseCode)
+ Send(SourceActorId, new TEvPrivate::TEvReadFinished(*HttpResponseCode));
} catch (const TDtorException&) {
throw;
} catch (const std::exception& err) {
@@ -356,7 +343,7 @@ private:
const NActors::TActorId ComputeActorId;
const size_t PathIndex;
const TString Path;
- bool Finished = false;
+ std::optional<long> HttpResponseCode;
};
class TS3ReadCoroActor : public TActorCoro {
@@ -367,9 +354,8 @@ class TS3ReadCoroActor : public TActorCoro {
IHTTPGateway::TPtr gateway,
TString url,
const IHTTPGateway::THeaders& headers,
- const std::shared_ptr<NS3::TRetryConfig>& retryConfig,
std::size_t expectedSize
- ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryParams(retryConfig)
+ ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState())
{}
const IHTTPGateway::TPtr Gateway;
@@ -378,7 +364,7 @@ class TS3ReadCoroActor : public TActorCoro {
const std::size_t ExpectedSize;
std::size_t Offset = 0U;
- TRetryParams RetryParams;
+ const IRetryPolicy<long>::IRetryState::TPtr RetryState;
};
public:
TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl,
@@ -386,26 +372,31 @@ public:
const TString& url,
const IHTTPGateway::THeaders& headers,
const TString& path,
- const std::size_t expectedSize,
- const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
+ const std::size_t expectedSize)
: TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
- , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, retryConfig, expectedSize))
+ , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, expectedSize))
{}
private:
static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TCountedContent&& data) {
retryStuff->Offset += data.size();
- retryStuff->RetryParams.Reset();
actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));
}
- static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::optional<TIssues> result) {
- if (result)
- if (const auto nextDelayMs = retryStuff->RetryParams.GetNextDelay())
- actorSystem->Schedule(nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
- else
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(TIssues{*result})));
- else
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished));
+ static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::variant<long, TIssues> result) {
+ switch (result.index()) {
+ case 0U:
+ if (const auto httpCode = std::get<long>(result); const auto nextDelayMs = retryStuff->RetryState->GetNextRetryDelay(httpCode))
+ actorSystem->Schedule(*nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
+ else
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(httpCode)));
+ break;
+ case 1U:
+ if (const auto nextDelayMs = retryStuff->RetryState->GetNextRetryDelay(0L))
+ actorSystem->Schedule(*nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
+ else
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result)))));
+ break;
+ }
}
static void DownloadStart(const TRetryStuff::TPtr& retryStuff, const TActorId& self, const TActorId& parent) {
@@ -439,8 +430,7 @@ public:
bool addPathIndex,
ui64 startPathIndex,
const TReadSpec::TPtr& readSpec,
- const NActors::TActorId& computeActorId,
- const std::shared_ptr<NS3::TRetryConfig>& retryConfig
+ const NActors::TActorId& computeActorId
) : Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
@@ -451,7 +441,6 @@ public:
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
, ReadSpec(readSpec)
- , RetryConfig(retryConfig)
, Count(Paths.size())
{}
@@ -460,7 +449,7 @@ public:
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, SelfId(), ComputeActorId, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path));
- RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), RetryConfig).Release());
+ RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path)).Release());
}
}
@@ -566,7 +555,6 @@ private:
const bool AddPathIndex;
const ui64 StartPathIndex;
const TReadSpec::TPtr ReadSpec;
- const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
std::deque<std::tuple<NDB::Block, size_t>> Blocks;
ui32 Count;
};
@@ -650,8 +638,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const THashMap<TString, TString>& secureParams,
const THashMap<TString, TString>& taskParams,
const NActors::TActorId& computeActorId,
- ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory)
{
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();
@@ -709,7 +696,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
#undef SUPPORTED_FLAGS
const auto actor = new TS3StreamReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken,
- std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryConfig);
+ std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId);
return {actor, actor};
} else {
const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index d8b153ccbcc..e8103ff2b07 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -18,7 +18,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
const THashMap<TString, TString>& secureParams,
const THashMap<TString, TString>& taskParams,
const NActors::TActorId& computeActorId,
- ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr);
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index fa2ce935761..f5817ad55f9 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -12,18 +12,17 @@ void RegisterS3ReadActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway,
- const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) {
+ const std::shared_ptr<NYql::NS3::TRetryConfig>&) {
#ifdef __linux__
NDB::registerFormats();
factory.RegisterSource<NS3::TSource>("S3Source",
- [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
- return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig);
+ [credentialsFactory, gateway](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
+ return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory);
});
#else
Y_UNUSED(factory);
Y_UNUSED(credentialsFactory);
Y_UNUSED(gateway);
- Y_UNUSED(retryConfig);
#endif
}