aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-09-14 19:08:20 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-09-14 19:08:20 +0300
commit3527ca0991c82f384df1b3aa6bb8b3c809dc7ef0 (patch)
treec037fcbabd86cf71f6133774baeb8ef06df91f9d
parent9d5c457703952e82b9139f2601fa2353dabb89c4 (diff)
downloadydb-3527ca0991c82f384df1b3aa6bb8b3c809dc7ef0.tar.gz
Retry several other http codes by default. Fix S3 read actor
-rw-r--r--ydb/core/yq/libs/init/init.cpp6
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp27
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp159
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp8
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp6
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp28
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h4
-rw-r--r--ydb/library/yql/providers/s3/proto/retry_config.proto7
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp2
13 files changed, 154 insertions, 104 deletions
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index c1d8ff91da..0df3120a4f 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -44,6 +44,7 @@
#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_factory.h>
#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_dq_transform.h>
#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <util/stream/file.h>
#include <util/system/hostname.h>
@@ -154,12 +155,13 @@ void Init(
}
if (protoConfig.GetPrivateApi().GetEnabled()) {
+ auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::MilliSeconds(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig().GetMaxRetryTimeMs())); // if MaxRetryTimeMs is not set, default http gateway will use the default one
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory,
- httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig()));
+ httpGateway, s3HttpRetryPolicy);
RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
- httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig()));
+ httpGateway, s3HttpRetryPolicy);
RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway);
RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp
index 498833ec3f..be50dc2c72 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp
@@ -2,14 +2,31 @@
namespace NYql {
-IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy() {
+IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime) {
+ if (!maxTime) {
+ maxTime = TDuration::Minutes(5);
+ }
return IRetryPolicy<long>::GetExponentialBackoffPolicy([](long httpCode) {
switch (httpCode) {
- case 0: return ERetryErrorClass::ShortRetry;
- case 503:return ERetryErrorClass::LongRetry;
- default: return ERetryErrorClass::NoRetry;
+ case 0:
+ return ERetryErrorClass::ShortRetry;
+ case 408: // Request Timeout
+ case 425: // Too Early
+ case 429: // Too Many Requests
+ case 500: // Internal Server Error
+ case 502: // Bad Gateway
+ case 503: // Service Unavailable
+ case 504: // Gateway Timeout
+ return ERetryErrorClass::LongRetry;
+ default:
+ return ERetryErrorClass::NoRetry;
}
- });
+ },
+ TDuration::MilliSeconds(10), // minDelay
+ TDuration::MilliSeconds(200), // minLongRetryDelay
+ TDuration::Seconds(30), // maxDelay
+ std::numeric_limits<size_t>::max(), // maxRetries
+ maxTime); // maxTime
}
}
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h
index 14057228da..425f1a55e0 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h
@@ -4,6 +4,6 @@
namespace NYql {
-IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy();
+IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime = TDuration::Zero()); // Zero means default maxTime
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 06847584c7..7ee9dd2af8 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -89,6 +89,10 @@ using namespace ::NYql::NS3Details;
namespace {
+struct TS3ReadError : public yexception {
+ using yexception::yexception;
+};
+
struct TEvPrivate {
// Event ids
enum EEv : ui32 {
@@ -124,7 +128,16 @@ struct TEvPrivate {
const long HttpResponseCode;
};
- struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {};
+ struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {
+ TEvReadFinished() = default;
+
+ TEvReadFinished(TIssues&& issues)
+ : Issues(std::move(issues))
+ {
+ }
+
+ TIssues Issues;
+ };
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
TEvReadError(TIssues&& error, size_t pathInd = std::numeric_limits<size_t>::max()) : Error(std::move(error)), PathIndex(pathInd) {}
@@ -158,12 +171,14 @@ public:
bool addPathIndex,
ui64 startPathIndex,
const NActors::TActorId& computeActorId,
- ui64 sizeLimit
+ ui64 sizeLimit,
+ const IRetryPolicy<long>::TPtr& retryPolicy
) : Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
, TxId(txId)
, ComputeActorId(computeActorId)
+ , RetryPolicy(retryPolicy)
, ActorSystem(TActivationContext::ActorSystem())
, Url(url)
, Headers(MakeHeader(token))
@@ -182,7 +197,7 @@ public:
auto id = pathInd + StartPathIndex;
LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id);
Gateway->Download(url, Headers, std::min(std::get<size_t>(path), SizeLimit),
- std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, id), {}, GetHTTPDefaultRetryPolicy());
+ std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, id), {}, RetryPolicy);
};
}
@@ -284,6 +299,7 @@ private:
const ui64 InputIndex;
const TTxId TxId;
const NActors::TActorId ComputeActorId;
+ const IRetryPolicy<long>::TPtr RetryPolicy;
TActorSystem* const ActorSystem;
@@ -314,8 +330,9 @@ struct TRetryStuff {
TString url,
const IHTTPGateway::THeaders& headers,
std::size_t sizeLimit,
- const TTxId& txId
- ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), Offset(0U), SizeLimit(sizeLimit), TxId(txId), RetryState(GetHTTPDefaultRetryPolicy()->CreateRetryState()), Cancelled(false)
+ const TTxId& txId,
+ const IRetryPolicy<long>::TPtr& retryPolicy
+ ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), Offset(0U), SizeLimit(sizeLimit), TxId(txId), RetryState(retryPolicy->CreateRetryState()), Cancelled(false)
{}
const IHTTPGateway::TPtr Gateway;
@@ -341,6 +358,26 @@ struct TRetryStuff {
}
};
+void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, long httpResponseCode) {
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(httpResponseCode)));
+}
+
+void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, IHTTPGateway::TCountedContent&& data) {
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));
+}
+
+void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, TIssues issues) {
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(std::move(issues))));
+}
+
+void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent) {
+ retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url,
+ retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit,
+ std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1),
+ std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1),
+ std::bind(&OnDownloadFinished, actorSystem, self, parent, std::placeholders::_1));
+}
+
class TS3ReadCoroImpl : public TActorCoroImpl {
private:
class TReadBufferFromStream : public NDB::ReadBuffer {
@@ -350,11 +387,12 @@ private:
{}
private:
bool nextImpl() final {
- if (Coro->Next(Value)) {
- working_buffer = NDB::BufferBase::Buffer(const_cast<char*>(Value.data()), const_cast<char*>(Value.data()) + Value.size());
- return true;
+ while (Coro->Next(Value)) {
+ if (!Value.empty()) {
+ working_buffer = NDB::BufferBase::Buffer(const_cast<char*>(Value.data()), const_cast<char*>(Value.data()) + Value.size());
+ return true;
+ }
}
-
return false;
}
@@ -372,24 +410,37 @@ public:
if (InputFinished)
return false;
- const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
+ const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>();
switch (const auto etype = ev->GetTypeRewrite()) {
case TEvPrivate::TEvReadStarted::EventType:
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadStarted, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset);
ErrorText.clear();
Issues.Clear();
value.clear();
RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode);
+ LOG_CORO_D("TS3ReadCoroImpl", "TEvReadStarted, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", Http code: " << HttpResponseCode << ", retry after: " << RetryStuff->NextRetryDelay);
return true;
case TEvPrivate::TEvReadFinished::EventType:
- InputFinished = true;
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
- return false;
- case TEvPrivate::TEvReadError::EventType:
- InputFinished = true;
- Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error);
- LOG_CORO_W("TS3ReadCoroImpl", "TEvReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
- return false;
+ Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues);
+ if (Issues) {
+ if (RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(0L); !RetryStuff->NextRetryDelay) {
+ InputFinished = true;
+ LOG_CORO_W("TS3ReadCoroImpl", "ReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
+ throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
+ }
+ }
+
+ if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) {
+ LOG_CORO_D("TS3ReadCoroImpl", "TS3ReadCoroActor" << ": " << SelfActorId << ", TxId: " << RetryStuff->TxId << ". Retry Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset);
+ GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId))));
+ } else {
+ LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText());
+ InputFinished = true;
+ if (ServerReturnedError) {
+ throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
+ }
+ return false; // end of data (real data, not an error)
+ }
+ return true;
case TEvPrivate::TEvDataPart::EventType:
if (200L == HttpResponseCode || 206L == HttpResponseCode) {
value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();
@@ -400,6 +451,7 @@ public:
LOG_CORO_T("TS3ReadCoroImpl", "TEvDataPart, size: " << value.size() << ", Url: " << RetryStuff->Url << ", Offset (updated): " << RetryStuff->Offset);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
} else if (HttpResponseCode && !RetryStuff->IsCancelled() && !RetryStuff->NextRetryDelay) {
+ ServerReturnedError = true;
if (ErrorText.size() < 256_KB)
ErrorText.append(ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract());
else if (!ErrorText.EndsWith(TruncatedSuffix))
@@ -419,22 +471,15 @@ private:
return;
while (true) {
- const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadError, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>();
+ const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>();
const auto etype = ev->GetTypeRewrite();
- if (etype == TEvPrivate::TEvDataPart::EventType) {
- // just ignore all data parts event to drain event queue
- continue;
- }
switch (etype) {
case TEvPrivate::TEvReadFinished::EventType:
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished");
- break;
- case TEvPrivate::TEvReadError::EventType:
- Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error);
- LOG_CORO_W("TS3ReadCoroImpl", "TEvReadError: " << Issues.ToOneLineString());
+ Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues);
+ LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Issues.ToOneLineString());
break;
default:
- break;
+ continue;
}
InputFinished = true;
return;
@@ -457,9 +502,12 @@ private:
while (auto block = stream.read()) {
Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex));
}
+ } catch (const TS3ReadError&) {
+ // Finish reading. Add error from server to issues
} catch (const std::exception& err) {
exceptIssue.Message = TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what();
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
+ RetryStuff->Cancel();
}
WaitFinish();
@@ -535,6 +583,7 @@ private:
bool InputFinished = false;
long HttpResponseCode = 0L;
+ bool ServerReturnedError = false;
TString ErrorText;
TIssues Issues;
@@ -549,42 +598,9 @@ public:
, RetryStuff(std::move(retryStuff))
{}
private:
- static void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, long httpResponseCode) {
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(httpResponseCode)));
- }
-
- static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, IHTTPGateway::TCountedContent&& data) {
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));
- }
-
- static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, TIssues issues) {
- if (issues) {
- if (retryStuff->NextRetryDelay = retryStuff->RetryState->GetNextRetryDelay(0L); !retryStuff->NextRetryDelay) {
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(std::move(issues))));
- return;
- }
- }
-
- auto nextRetryDelay = retryStuff->NextRetryDelay;
- if (!retryStuff->IsCancelled() && nextRetryDelay && retryStuff->SizeLimit > 0ULL) {
- LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << self << ", TxId: " << retryStuff->TxId << ". " << "Retry Download, Url: " << retryStuff->Url << ", Offset: " << retryStuff->Offset);
- actorSystem->Schedule(*nextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, actorSystem, self, parent))));
- } else {
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished));
- }
- }
-
- static void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent) {
- retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url,
- retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit,
- std::bind(&TS3ReadCoroActor::OnDownloadStart, actorSystem, self, parent, std::placeholders::_1),
- std::bind(&TS3ReadCoroActor::OnNewData, actorSystem, self, parent, std::placeholders::_1),
- std::bind(&TS3ReadCoroActor::OnDownloadFinished, actorSystem, self, parent, retryStuff, std::placeholders::_1));
- }
-
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
- LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Retry Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset);
+ LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset);
DownloadStart(RetryStuff, actorSystem, SelfId(), parent);
}
@@ -608,12 +624,14 @@ public:
bool addPathIndex,
ui64 startPathIndex,
const TReadSpec::TPtr& readSpec,
- const NActors::TActorId& computeActorId
+ const NActors::TActorId& computeActorId,
+ const IRetryPolicy<long>::TPtr& retryPolicy
) : Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
, TxId(txId)
, ComputeActorId(computeActorId)
+ , RetryPolicy(retryPolicy)
, Url(url)
, Headers(MakeHeader(token))
, Paths(std::move(paths))
@@ -628,7 +646,7 @@ public:
Become(&TS3StreamReadActor::StateFunc);
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
- auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId);
+ auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId, RetryPolicy);
RetryStuffForFile.push_back(stuff);
auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path));
RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release());
@@ -736,7 +754,7 @@ private:
const ui64 InputIndex;
const TTxId TxId;
const NActors::TActorId ComputeActorId;
-
+ const IRetryPolicy<long>::TPtr RetryPolicy;
const TString Url;
const IHTTPGateway::THeaders Headers;
@@ -863,7 +881,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const THashMap<TString, TString>& secureParams,
const THashMap<TString, TString>& taskParams,
const NActors::TActorId& computeActorId,
- ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory)
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const IRetryPolicy<long>::TPtr& retryPolicy)
{
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();
@@ -921,7 +940,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
#undef SUPPORTED_FLAGS
const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken,
- std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId);
+ std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy);
return {actor, actor};
} else {
ui64 sizeLimit = std::numeric_limits<ui64>::max();
@@ -929,7 +948,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
sizeLimit = FromString<ui64>(it->second);
const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken,
- std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit);
+ std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy);
return {actor, actor};
}
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index 60df4432bc..099740d829 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -19,6 +19,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
const THashMap<TString, TString>& secureParams,
const THashMap<TString, TString>& taskParams,
const NActors::TActorId& computeActorId,
- ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const IRetryPolicy<long>::TPtr& retryPolicy);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
index bd31304e34..5de1839338 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
@@ -5,9 +5,9 @@
namespace NYql::NDq {
-void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>&) {
+void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const IRetryPolicy<long>::TPtr& retryPolicy) {
factory.RegisterSink<NS3::TSink>("S3Sink",
- [credentialsFactory, gateway](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) {
+ [credentialsFactory, gateway, retryPolicy](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) {
TStringBuilder prefixBuilder;
@@ -16,12 +16,12 @@ void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAcco
prefixBuilder << jobId << "_";
}
- auto restartCount = args.TaskParams.Value("fq.restart_count", "");
+ auto restartCount = args.TaskParams.Value("fq.restart_count", "");
if (restartCount) {
prefixBuilder << restartCount << "_";
}
- return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.TxId, prefixBuilder, args.SecureParams, args.Callback, credentialsFactory);
+ return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.TxId, prefixBuilder, args.SecureParams, args.Callback, credentialsFactory, retryPolicy);
});
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h
index 80b7fbf7ca..99522d7815 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h
@@ -6,6 +6,7 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
namespace NYql::NDq {
@@ -14,5 +15,5 @@ void RegisterS3WriteActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway,
- const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr);
+ const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy());
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index 3e22af2c6b..b795c701b3 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -14,12 +14,12 @@ void RegisterS3ReadActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway,
- const std::shared_ptr<NYql::NS3::TRetryConfig>&) {
+ const IRetryPolicy<long>::TPtr& retryPolicy) {
#if defined(_linux_) || defined(_darwin_)
NDB::registerFormats();
factory.RegisterSource<NS3::TSource>("S3Source",
- [credentialsFactory, gateway](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
- return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory);
+ [credentialsFactory, gateway, retryPolicy](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
+ return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy);
});
#else
Y_UNUSED(factory);
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index 65a07b1b2e..83eb05d427 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -6,6 +6,7 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
namespace NYql::NDq {
@@ -14,6 +15,6 @@ void RegisterS3ReadActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
- const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr);
+ const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy());
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 90fbc2870a..ef7658508a 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -94,10 +94,12 @@ public:
const TTxId& txId,
IHTTPGateway::TPtr gateway,
NYdb::TCredentialsProviderPtr credProvider,
- const TString& key, const TString& url, size_t sizeLimit, const std::string_view& compression)
+ const TString& key, const TString& url, size_t sizeLimit, const std::string_view& compression,
+ const IRetryPolicy<long>::TPtr& retryPolicy)
: TxId(txId)
, Gateway(std::move(gateway))
, CredProvider(std::move(credProvider))
+ , RetryPolicy(retryPolicy)
, ActorSystem(TActivationContext::ActorSystem())
, Key(key), Url(url), SizeLimit(sizeLimit), Parts(MakeCompressorQueue(compression))
{
@@ -111,10 +113,10 @@ public:
const auto size = Parts->Volume();
InFlight += size;
SentSize += size;
- Gateway->Upload(Url, MakeHeader(), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), true, GetHTTPDefaultRetryPolicy());
+ Gateway->Upload(Url, MakeHeader(), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), true, RetryPolicy);
} else {
Become(&TS3FileWriteActor::InitialStateFunc);
- Gateway->Upload(Url + "?uploads", MakeHeader(), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, GetHTTPDefaultRetryPolicy());
+ Gateway->Upload(Url + "?uploads", MakeHeader(), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, RetryPolicy);
}
}
@@ -278,7 +280,7 @@ private:
Tags.emplace_back();
InFlight += size;
SentSize += size;
- Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(part), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true, GetHTTPDefaultRetryPolicy());
+ Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(part), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true, RetryPolicy);
}
}
@@ -291,7 +293,7 @@ private:
for (const auto& tag : Tags)
xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl;
xml << "</CompleteMultipartUpload>" << Endl;
- Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), false, GetHTTPDefaultRetryPolicy());
+ Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), false, RetryPolicy);
}
IHTTPGateway::THeaders MakeHeader() const {
@@ -307,6 +309,7 @@ private:
const TTxId TxId;
const IHTTPGateway::TPtr Gateway;
const NYdb::TCredentialsProviderPtr CredProvider;
+ const IRetryPolicy<long>::TPtr RetryPolicy;
TActorSystem* const ActorSystem;
TActorId ParentId;
@@ -335,10 +338,12 @@ public:
const size_t memoryLimit,
const size_t maxFileSize,
const TString& compression,
- IDqComputeActorAsyncOutput::ICallbacks* callbacks)
+ IDqComputeActorAsyncOutput::ICallbacks* callbacks,
+ const IRetryPolicy<long>::TPtr& retryPolicy)
: Gateway(std::move(gateway))
, CredProvider(std::move(credProvider))
, RandomProvider(randomProvider)
+ , RetryPolicy(retryPolicy)
, OutputIndex(outputIndex)
, TxId(txId)
, Prefix(prefix)
@@ -402,7 +407,7 @@ private:
const auto& key = MakePartitionKey(v);
const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) {
- auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName(), MaxFileSize, Compression);
+ auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName(), MaxFileSize, Compression, RetryPolicy);
ins.first->second.emplace_back(fileWrite.get());
RegisterWithSameMailbox(fileWrite.release());
}
@@ -464,8 +469,9 @@ private:
const IHTTPGateway::TPtr Gateway;
const NYdb::TCredentialsProviderPtr CredProvider;
- IRandomProvider * RandomProvider;
+ IRandomProvider* RandomProvider;
TIntrusivePtr<IRandomProvider> DefaultRandomProvider;
+ const IRetryPolicy<long>::TPtr RetryPolicy;
const ui64 OutputIndex;
const TTxId TxId;
@@ -497,7 +503,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const TString& prefix,
const THashMap<TString, TString>& secureParams,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
- ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory)
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const IRetryPolicy<long>::TPtr& retryPolicy)
{
const auto token = secureParams.Value(params.GetToken(), TString{});
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
@@ -513,7 +520,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB,
params.HasMaxFileSize() ? params.GetMaxFileSize() : 50_MB,
params.HasCompression() ? params.GetCompression() : "",
- callbacks);
+ callbacks,
+ retryPolicy);
return {actor, actor};
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
index df19730ba4..47eaafc414 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
@@ -20,7 +20,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const TString& prefix,
const THashMap<TString, TString>& secureParams,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
- ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const IRetryPolicy<long>::TPtr& retryPolicy);
} // namespace NYql::NDq
-
diff --git a/ydb/library/yql/providers/s3/proto/retry_config.proto b/ydb/library/yql/providers/s3/proto/retry_config.proto
index 3477562bb6..4232be91fc 100644
--- a/ydb/library/yql/providers/s3/proto/retry_config.proto
+++ b/ydb/library/yql/providers/s3/proto/retry_config.proto
@@ -4,7 +4,8 @@ option cc_enable_arenas = true;
package NYql.NS3;
message TRetryConfig {
- uint64 InitialDelayMs = 1;
- uint32 MaxRetriesPerPath = 2;
- double Epsilon = 3;
+ uint64 InitialDelayMs = 1; // Not used
+ uint32 MaxRetriesPerPath = 2; // Not used
+ double Epsilon = 3; // Not used
+ uint64 MaxRetryTimeMs = 4;
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp
index 155f4078ef..bbcb5a695b 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp
@@ -36,7 +36,7 @@ static void skipTSVRow(ReadBuffer & in, const size_t num_columns)
*/
static void checkForCarriageReturn(ReadBuffer & in)
{
- if (in.position()[0] == '\r' || (in.position() != in.buffer().begin() && in.position()[-1] == '\r'))
+ if (!in.eof() && (in.position()[0] == '\r' || (in.position() != in.buffer().begin() && in.position()[-1] == '\r')))
throw Exception("\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format."