diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-09-14 19:08:20 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-09-14 19:08:20 +0300 |
commit | 3527ca0991c82f384df1b3aa6bb8b3c809dc7ef0 (patch) | |
tree | c037fcbabd86cf71f6133774baeb8ef06df91f9d | |
parent | 9d5c457703952e82b9139f2601fa2353dabb89c4 (diff) | |
download | ydb-3527ca0991c82f384df1b3aa6bb8b3c809dc7ef0.tar.gz |
Retry several other http codes by default. Fix S3 read actor
13 files changed, 154 insertions, 104 deletions
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index c1d8ff91da..0df3120a4f 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -44,6 +44,7 @@ #include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_factory.h> #include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_dq_transform.h> #include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> #include <util/stream/file.h> #include <util/system/hostname.h> @@ -154,12 +155,13 @@ void Init( } if (protoConfig.GetPrivateApi().GetEnabled()) { + auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::MilliSeconds(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig().GetMaxRetryTimeMs())); // if MaxRetryTimeMs is not set, default http gateway will use the default one RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, - httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig())); + httpGateway, s3HttpRetryPolicy); RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory, - httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig())); + httpGateway, s3HttpRetryPolicy); RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway); RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp index 498833ec3f..be50dc2c72 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp @@ -2,14 +2,31 @@ namespace NYql { -IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy() { +IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime) { + if (!maxTime) { + maxTime = TDuration::Minutes(5); + } return IRetryPolicy<long>::GetExponentialBackoffPolicy([](long httpCode) { switch (httpCode) { - case 0: return ERetryErrorClass::ShortRetry; - case 503:return ERetryErrorClass::LongRetry; - default: return ERetryErrorClass::NoRetry; + case 0: + return ERetryErrorClass::ShortRetry; + case 408: // Request Timeout + case 425: // Too Early + case 429: // Too Many Requests + case 500: // Internal Server Error + case 502: // Bad Gateway + case 503: // Service Unavailable + case 504: // Gateway Timeout + return ERetryErrorClass::LongRetry; + default: + return ERetryErrorClass::NoRetry; } - }); + }, + TDuration::MilliSeconds(10), // minDelay + TDuration::MilliSeconds(200), // minLongRetryDelay + TDuration::Seconds(30), // maxDelay + std::numeric_limits<size_t>::max(), // maxRetries + maxTime); // maxTime } } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h index 14057228da..425f1a55e0 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h @@ -4,6 +4,6 @@ namespace NYql { -IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(); +IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime = TDuration::Zero()); // Zero means default maxTime } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 06847584c7..7ee9dd2af8 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -89,6 +89,10 @@ using namespace ::NYql::NS3Details; namespace { +struct TS3ReadError : public yexception { + using yexception::yexception; +}; + struct TEvPrivate { // Event ids enum EEv : ui32 { @@ -124,7 +128,16 @@ struct TEvPrivate { const long HttpResponseCode; }; - struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {}; + struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> { + TEvReadFinished() = default; + + TEvReadFinished(TIssues&& issues) + : Issues(std::move(issues)) + { + } + + TIssues Issues; + }; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { TEvReadError(TIssues&& error, size_t pathInd = std::numeric_limits<size_t>::max()) : Error(std::move(error)), PathIndex(pathInd) {} @@ -158,12 +171,14 @@ public: bool addPathIndex, ui64 startPathIndex, const NActors::TActorId& computeActorId, - ui64 sizeLimit + ui64 sizeLimit, + const IRetryPolicy<long>::TPtr& retryPolicy ) : Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) , TxId(txId) , ComputeActorId(computeActorId) + , RetryPolicy(retryPolicy) , ActorSystem(TActivationContext::ActorSystem()) , Url(url) , Headers(MakeHeader(token)) @@ -182,7 +197,7 @@ public: auto id = pathInd + StartPathIndex; LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id); Gateway->Download(url, Headers, std::min(std::get<size_t>(path), SizeLimit), - std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, id), {}, GetHTTPDefaultRetryPolicy()); + std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, id), {}, RetryPolicy); }; } @@ -284,6 +299,7 @@ private: const ui64 InputIndex; const TTxId TxId; const NActors::TActorId ComputeActorId; + const IRetryPolicy<long>::TPtr RetryPolicy; TActorSystem* const ActorSystem; @@ -314,8 +330,9 @@ struct TRetryStuff { TString url, const IHTTPGateway::THeaders& headers, std::size_t sizeLimit, - const TTxId& txId - ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), Offset(0U), SizeLimit(sizeLimit), TxId(txId), RetryState(GetHTTPDefaultRetryPolicy()->CreateRetryState()), Cancelled(false) + const TTxId& txId, + const IRetryPolicy<long>::TPtr& retryPolicy + ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), Offset(0U), SizeLimit(sizeLimit), TxId(txId), RetryState(retryPolicy->CreateRetryState()), Cancelled(false) {} const IHTTPGateway::TPtr Gateway; @@ -341,6 +358,26 @@ struct TRetryStuff { } }; +void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, long httpResponseCode) { + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(httpResponseCode))); +} + +void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, IHTTPGateway::TCountedContent&& data) { + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data)))); +} + +void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, TIssues issues) { + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(std::move(issues)))); +} + +void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent) { + retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url, + retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit, + std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1), + std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1), + std::bind(&OnDownloadFinished, actorSystem, self, parent, std::placeholders::_1)); +} + class TS3ReadCoroImpl : public TActorCoroImpl { private: class TReadBufferFromStream : public NDB::ReadBuffer { @@ -350,11 +387,12 @@ private: {} private: bool nextImpl() final { - if (Coro->Next(Value)) { - working_buffer = NDB::BufferBase::Buffer(const_cast<char*>(Value.data()), const_cast<char*>(Value.data()) + Value.size()); - return true; + while (Coro->Next(Value)) { + if (!Value.empty()) { + working_buffer = NDB::BufferBase::Buffer(const_cast<char*>(Value.data()), const_cast<char*>(Value.data()) + Value.size()); + return true; + } } - return false; } @@ -372,24 +410,37 @@ public: if (InputFinished) return false; - const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); + const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>(); switch (const auto etype = ev->GetTypeRewrite()) { case TEvPrivate::TEvReadStarted::EventType: - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadStarted, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); ErrorText.clear(); Issues.Clear(); value.clear(); RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode); + LOG_CORO_D("TS3ReadCoroImpl", "TEvReadStarted, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", Http code: " << HttpResponseCode << ", retry after: " << RetryStuff->NextRetryDelay); return true; case TEvPrivate::TEvReadFinished::EventType: - InputFinished = true; - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText()); - return false; - case TEvPrivate::TEvReadError::EventType: - InputFinished = true; - Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error); - LOG_CORO_W("TS3ReadCoroImpl", "TEvReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText()); - return false; + Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues); + if (Issues) { + if (RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(0L); !RetryStuff->NextRetryDelay) { + InputFinished = true; + LOG_CORO_W("TS3ReadCoroImpl", "ReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText()); + throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format + } + } + + if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) { + LOG_CORO_D("TS3ReadCoroImpl", "TS3ReadCoroActor" << ": " << SelfActorId << ", TxId: " << RetryStuff->TxId << ". Retry Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); + GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId)))); + } else { + LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText()); + InputFinished = true; + if (ServerReturnedError) { + throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format + } + return false; // end of data (real data, not an error) + } + return true; case TEvPrivate::TEvDataPart::EventType: if (200L == HttpResponseCode || 206L == HttpResponseCode) { value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract(); @@ -400,6 +451,7 @@ public: LOG_CORO_T("TS3ReadCoroImpl", "TEvDataPart, size: " << value.size() << ", Url: " << RetryStuff->Url << ", Offset (updated): " << RetryStuff->Offset); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } else if (HttpResponseCode && !RetryStuff->IsCancelled() && !RetryStuff->NextRetryDelay) { + ServerReturnedError = true; if (ErrorText.size() < 256_KB) ErrorText.append(ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract()); else if (!ErrorText.EndsWith(TruncatedSuffix)) @@ -419,22 +471,15 @@ private: return; while (true) { - const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadError, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>(); + const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>(); const auto etype = ev->GetTypeRewrite(); - if (etype == TEvPrivate::TEvDataPart::EventType) { - // just ignore all data parts event to drain event queue - continue; - } switch (etype) { case TEvPrivate::TEvReadFinished::EventType: - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished"); - break; - case TEvPrivate::TEvReadError::EventType: - Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error); - LOG_CORO_W("TS3ReadCoroImpl", "TEvReadError: " << Issues.ToOneLineString()); + Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues); + LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Issues.ToOneLineString()); break; default: - break; + continue; } InputFinished = true; return; @@ -457,9 +502,12 @@ private: while (auto block = stream.read()) { Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex)); } + } catch (const TS3ReadError&) { + // Finish reading. Add error from server to issues } catch (const std::exception& err) { exceptIssue.Message = TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what(); fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; + RetryStuff->Cancel(); } WaitFinish(); @@ -535,6 +583,7 @@ private: bool InputFinished = false; long HttpResponseCode = 0L; + bool ServerReturnedError = false; TString ErrorText; TIssues Issues; @@ -549,42 +598,9 @@ public: , RetryStuff(std::move(retryStuff)) {} private: - static void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, long httpResponseCode) { - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(httpResponseCode))); - } - - static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, IHTTPGateway::TCountedContent&& data) { - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data)))); - } - - static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, TIssues issues) { - if (issues) { - if (retryStuff->NextRetryDelay = retryStuff->RetryState->GetNextRetryDelay(0L); !retryStuff->NextRetryDelay) { - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(std::move(issues)))); - return; - } - } - - auto nextRetryDelay = retryStuff->NextRetryDelay; - if (!retryStuff->IsCancelled() && nextRetryDelay && retryStuff->SizeLimit > 0ULL) { - LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << self << ", TxId: " << retryStuff->TxId << ". " << "Retry Download, Url: " << retryStuff->Url << ", Offset: " << retryStuff->Offset); - actorSystem->Schedule(*nextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, actorSystem, self, parent)))); - } else { - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished)); - } - } - - static void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent) { - retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url, - retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit, - std::bind(&TS3ReadCoroActor::OnDownloadStart, actorSystem, self, parent, std::placeholders::_1), - std::bind(&TS3ReadCoroActor::OnNewData, actorSystem, self, parent, std::placeholders::_1), - std::bind(&TS3ReadCoroActor::OnDownloadFinished, actorSystem, self, parent, retryStuff, std::placeholders::_1)); - } - void Registered(TActorSystem* actorSystem, const TActorId& parent) override { TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself. - LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Retry Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); + LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); DownloadStart(RetryStuff, actorSystem, SelfId(), parent); } @@ -608,12 +624,14 @@ public: bool addPathIndex, ui64 startPathIndex, const TReadSpec::TPtr& readSpec, - const NActors::TActorId& computeActorId + const NActors::TActorId& computeActorId, + const IRetryPolicy<long>::TPtr& retryPolicy ) : Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) , TxId(txId) , ComputeActorId(computeActorId) + , RetryPolicy(retryPolicy) , Url(url) , Headers(MakeHeader(token)) , Paths(std::move(paths)) @@ -628,7 +646,7 @@ public: Become(&TS3StreamReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; - auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId); + auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId, RetryPolicy); RetryStuffForFile.push_back(stuff); auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path)); RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release()); @@ -736,7 +754,7 @@ private: const ui64 InputIndex; const TTxId TxId; const NActors::TActorId ComputeActorId; - + const IRetryPolicy<long>::TPtr RetryPolicy; const TString Url; const IHTTPGateway::THeaders Headers; @@ -863,7 +881,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId, - ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const IRetryPolicy<long>::TPtr& retryPolicy) { const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry(); @@ -921,7 +940,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( #undef SUPPORTED_FLAGS const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId); + std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy); return {actor, actor}; } else { ui64 sizeLimit = std::numeric_limits<ui64>::max(); @@ -929,7 +948,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( sizeLimit = FromString<ui64>(it->second); const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit); + std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy); return {actor, actor}; } } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 60df4432bc..099740d829 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -19,6 +19,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId, - ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const IRetryPolicy<long>::TPtr& retryPolicy); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp index bd31304e34..5de1839338 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp @@ -5,9 +5,9 @@ namespace NYql::NDq { -void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>&) { +void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const IRetryPolicy<long>::TPtr& retryPolicy) { factory.RegisterSink<NS3::TSink>("S3Sink", - [credentialsFactory, gateway](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { + [credentialsFactory, gateway, retryPolicy](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { TStringBuilder prefixBuilder; @@ -16,12 +16,12 @@ void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAcco prefixBuilder << jobId << "_"; } - auto restartCount = args.TaskParams.Value("fq.restart_count", ""); + auto restartCount = args.TaskParams.Value("fq.restart_count", ""); if (restartCount) { prefixBuilder << restartCount << "_"; } - return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.TxId, prefixBuilder, args.SecureParams, args.Callback, credentialsFactory); + return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.TxId, prefixBuilder, args.SecureParams, args.Callback, credentialsFactory, retryPolicy); }); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h index 80b7fbf7ca..99522d7815 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h @@ -6,6 +6,7 @@ #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> namespace NYql::NDq { @@ -14,5 +15,5 @@ void RegisterS3WriteActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, - const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr); + const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy()); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index 3e22af2c6b..b795c701b3 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -14,12 +14,12 @@ void RegisterS3ReadActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, - const std::shared_ptr<NYql::NS3::TRetryConfig>&) { + const IRetryPolicy<long>::TPtr& retryPolicy) { #if defined(_linux_) || defined(_darwin_) NDB::registerFormats(); factory.RegisterSource<NS3::TSource>("S3Source", - [credentialsFactory, gateway](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { - return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory); + [credentialsFactory, gateway, retryPolicy](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { + return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy); }); #else Y_UNUSED(factory); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 65a07b1b2e..83eb05d427 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -6,6 +6,7 @@ #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> namespace NYql::NDq { @@ -14,6 +15,6 @@ void RegisterS3ReadActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway = IHTTPGateway::Make(), - const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr); + const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy()); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 90fbc2870a..ef7658508a 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -94,10 +94,12 @@ public: const TTxId& txId, IHTTPGateway::TPtr gateway, NYdb::TCredentialsProviderPtr credProvider, - const TString& key, const TString& url, size_t sizeLimit, const std::string_view& compression) + const TString& key, const TString& url, size_t sizeLimit, const std::string_view& compression, + const IRetryPolicy<long>::TPtr& retryPolicy) : TxId(txId) , Gateway(std::move(gateway)) , CredProvider(std::move(credProvider)) + , RetryPolicy(retryPolicy) , ActorSystem(TActivationContext::ActorSystem()) , Key(key), Url(url), SizeLimit(sizeLimit), Parts(MakeCompressorQueue(compression)) { @@ -111,10 +113,10 @@ public: const auto size = Parts->Volume(); InFlight += size; SentSize += size; - Gateway->Upload(Url, MakeHeader(), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), true, GetHTTPDefaultRetryPolicy()); + Gateway->Upload(Url, MakeHeader(), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), true, RetryPolicy); } else { Become(&TS3FileWriteActor::InitialStateFunc); - Gateway->Upload(Url + "?uploads", MakeHeader(), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, GetHTTPDefaultRetryPolicy()); + Gateway->Upload(Url + "?uploads", MakeHeader(), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, RetryPolicy); } } @@ -278,7 +280,7 @@ private: Tags.emplace_back(); InFlight += size; SentSize += size; - Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(part), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true, GetHTTPDefaultRetryPolicy()); + Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(part), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true, RetryPolicy); } } @@ -291,7 +293,7 @@ private: for (const auto& tag : Tags) xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl; xml << "</CompleteMultipartUpload>" << Endl; - Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), false, GetHTTPDefaultRetryPolicy()); + Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), false, RetryPolicy); } IHTTPGateway::THeaders MakeHeader() const { @@ -307,6 +309,7 @@ private: const TTxId TxId; const IHTTPGateway::TPtr Gateway; const NYdb::TCredentialsProviderPtr CredProvider; + const IRetryPolicy<long>::TPtr RetryPolicy; TActorSystem* const ActorSystem; TActorId ParentId; @@ -335,10 +338,12 @@ public: const size_t memoryLimit, const size_t maxFileSize, const TString& compression, - IDqComputeActorAsyncOutput::ICallbacks* callbacks) + IDqComputeActorAsyncOutput::ICallbacks* callbacks, + const IRetryPolicy<long>::TPtr& retryPolicy) : Gateway(std::move(gateway)) , CredProvider(std::move(credProvider)) , RandomProvider(randomProvider) + , RetryPolicy(retryPolicy) , OutputIndex(outputIndex) , TxId(txId) , Prefix(prefix) @@ -402,7 +407,7 @@ private: const auto& key = MakePartitionKey(v); const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>()); if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) { - auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName(), MaxFileSize, Compression); + auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName(), MaxFileSize, Compression, RetryPolicy); ins.first->second.emplace_back(fileWrite.get()); RegisterWithSameMailbox(fileWrite.release()); } @@ -464,8 +469,9 @@ private: const IHTTPGateway::TPtr Gateway; const NYdb::TCredentialsProviderPtr CredProvider; - IRandomProvider * RandomProvider; + IRandomProvider* RandomProvider; TIntrusivePtr<IRandomProvider> DefaultRandomProvider; + const IRetryPolicy<long>::TPtr RetryPolicy; const ui64 OutputIndex; const TTxId TxId; @@ -497,7 +503,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const TString& prefix, const THashMap<TString, TString>& secureParams, IDqComputeActorAsyncOutput::ICallbacks* callbacks, - ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const IRetryPolicy<long>::TPtr& retryPolicy) { const auto token = secureParams.Value(params.GetToken(), TString{}); const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); @@ -513,7 +520,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB, params.HasMaxFileSize() ? params.GetMaxFileSize() : 50_MB, params.HasCompression() ? params.GetCompression() : "", - callbacks); + callbacks, + retryPolicy); return {actor, actor}; } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h index df19730ba4..47eaafc414 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h @@ -20,7 +20,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const TString& prefix, const THashMap<TString, TString>& secureParams, IDqComputeActorAsyncOutput::ICallbacks* callbacks, - ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const IRetryPolicy<long>::TPtr& retryPolicy); } // namespace NYql::NDq - diff --git a/ydb/library/yql/providers/s3/proto/retry_config.proto b/ydb/library/yql/providers/s3/proto/retry_config.proto index 3477562bb6..4232be91fc 100644 --- a/ydb/library/yql/providers/s3/proto/retry_config.proto +++ b/ydb/library/yql/providers/s3/proto/retry_config.proto @@ -4,7 +4,8 @@ option cc_enable_arenas = true; package NYql.NS3; message TRetryConfig { - uint64 InitialDelayMs = 1; - uint32 MaxRetriesPerPath = 2; - double Epsilon = 3; + uint64 InitialDelayMs = 1; // Not used + uint32 MaxRetriesPerPath = 2; // Not used + double Epsilon = 3; // Not used + uint64 MaxRetryTimeMs = 4; } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp index 155f4078ef..bbcb5a695b 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp @@ -36,7 +36,7 @@ static void skipTSVRow(ReadBuffer & in, const size_t num_columns) */ static void checkForCarriageReturn(ReadBuffer & in) { - if (in.position()[0] == '\r' || (in.position() != in.buffer().begin() && in.position()[-1] == '\r')) + if (!in.eof() && (in.position()[0] == '\r' || (in.position() != in.buffer().begin() && in.position()[-1] == '\r'))) throw Exception("\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row." "\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format." " You must transform your file to Unix format." |