summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-07-28 14:15:01 +0300
committera-romanov <[email protected]>2022-07-28 14:15:01 +0300
commitb333b9c0b2519d13f1b00518e4ad398b6c06ace5 (patch)
tree9df212b1572957b9f07baa013ee2e524cad44233
parentceb7e240f642d3a6c3ba7cd26e9e31a79b289741 (diff)
S3 Common policy for read and write retries and some cleanup code.
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp16
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp64
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp15
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.h9
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp55
6 files changed, 58 insertions, 102 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 2defa4ffaee..5eedf136446 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -75,7 +75,7 @@ public:
}
virtual void Fail(const TIssue& error) = 0;
- virtual void Done(CURLcode result) = 0;
+ virtual void Done(CURLcode result, long httpResponseCode) = 0;
virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0;
virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0;
@@ -153,13 +153,10 @@ private:
}
}
- void Done(CURLcode result) final {
+ void Done(CURLcode result, long httpResponseCode) final {
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
- long httpResponseCode = 0;
- curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
-
const std::unique_lock lock(SyncCallbacks);
while (!Callbacks.empty()) {
if (1U == Callbacks.size())
@@ -225,7 +222,7 @@ private:
return OnFinish(TIssues{error});
}
- void Done(CURLcode result) final {
+ void Done(CURLcode result, long) final {
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
@@ -426,12 +423,13 @@ private:
void Done(CURL* handle, CURLcode result) {
TEasyCurl::TPtr easy;
+ long httpResponseCode = 0L;
{
const std::unique_lock lock(Sync);
if (const auto it = Allocated.find(handle); Allocated.cend() != it) {
- long httpResponseCode = 0;
easy = std::move(it->second);
- curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
+ if (CURLE_OK == result)
+ curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
if (auto buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) {
if (const auto& nextRetryDelay = buffer->GetNextRetryDelay(httpResponseCode)) {
@@ -447,7 +445,7 @@ private:
Requests.clear();
}
if (easy) {
- easy->Done(result);
+ easy->Done(result, httpResponseCode);
}
}
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
index fdeab19adc1..910d8cab69a 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
@@ -38,6 +38,7 @@ target_link_libraries(providers-s3-actors PUBLIC
)
target_sources(providers-s3-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 67ef5f0a8c3..71439673161 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -20,9 +20,10 @@
#endif
#include "yql_s3_read_actor.h"
+#include "yql_s3_retry_policy.h"
#include <ydb/library/yql/minikql/mkql_string_util.h>
-#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
#include <ydb/library/yql/minikql/mkql_function_registry.h>
@@ -89,11 +90,6 @@ struct TEvPrivate {
const size_t PathIndex;
};
- struct TEvRetryEvent : public NActors::TEventLocal<TEvRetryEvent, EvRetry> {
- explicit TEvRetryEvent(size_t pathIndex) : PathIndex(pathIndex) {}
- const size_t PathIndex;
- };
-
struct TEvRetryEventFunc : public NActors::TEventLocal<TEvRetryEventFunc, EvRetry> {
explicit TEvRetryEventFunc(std::function<void()> functor) : Functor(std::move(functor)) {}
const std::function<void()> Functor;
@@ -144,18 +140,19 @@ private:
double Epsilon;
};
+using namespace NKikimr::NMiniKQL;
+
class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput {
public:
TS3ReadActor(ui64 inputIndex,
IHTTPGateway::TPtr gateway,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const THolderFactory& holderFactory,
const TString& url,
const TString& token,
TPathList&& paths,
bool addPathIndex,
ui64 startPathIndex,
- const NActors::TActorId& computeActorId,
- const std::shared_ptr<NS3::TRetryConfig>& retryConfig
+ const NActors::TActorId& computeActorId
) : Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
@@ -166,17 +163,15 @@ public:
, Paths(std::move(paths))
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
- , RetryConfig(retryConfig)
{}
void Bootstrap() {
Become(&TS3ReadActor::StateFunc);
- RetriesPerPath.resize(Paths.size(), TRetryParams(RetryConfig));
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
Gateway->Download(Url + std::get<TString>(path),
Headers, std::get<size_t>(path),
- std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex));
+ std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex), {}, GetS3RetryPolicy());
};
}
@@ -191,7 +186,6 @@ private:
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvReadResult, Handle);
hFunc(TEvPrivate::TEvReadError, Handle);
- hFunc(TEvPrivate::TEvRetryEvent, HandleRetry);
)
static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result, size_t pathInd) {
@@ -207,14 +201,14 @@ private:
}
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
i64 total = 0LL;
if (!Blocks.empty()) {
buffer.reserve(buffer.size() + Blocks.size());
do {
auto& content = std::get<IHTTPGateway::TContent>(Blocks.front());
const auto size = content.size();
- auto value = NKikimr::NMiniKQL::MakeString(std::string_view(content));
+ auto value = MakeString(std::string_view(content));
if (AddPathIndex) {
NUdf::TUnboxedValue* tupleItems = nullptr;
auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems);
@@ -245,20 +239,7 @@ private:
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
- void HandleRetry(TEvPrivate::TEvRetryEvent::TPtr& ev) {
- const auto pathInd = ev->Get()->PathIndex;
- Gateway->Download(Url + std::get<TString>(Paths[pathInd]),
- Headers, std::get<size_t>(Paths[pathInd]),
- std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd));
- }
-
void Handle(TEvPrivate::TEvReadError::TPtr& result) {
- const auto pathInd = result->Get()->PathIndex;
- Y_VERIFY(pathInd < RetriesPerPath.size());
- if (auto nextDelayMs = RetriesPerPath[pathInd].GetNextDelay()) {
- Schedule(nextDelayMs, new TEvPrivate::TEvRetryEvent(pathInd));
- return;
- }
++IsDoneCounter;
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, true));
}
@@ -276,8 +257,8 @@ private:
size_t IsDoneCounter = 0U;
const IHTTPGateway::TPtr Gateway;
- const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
- NKikimr::NMiniKQL::TPlainContainerCache ContainerCache;
+ const THolderFactory& HolderFactory;
+ TPlainContainerCache ContainerCache;
const ui64 InputIndex;
const NActors::TActorId ComputeActorId;
@@ -291,9 +272,6 @@ private:
const ui64 StartPathIndex;
std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;
-
- std::vector<TRetryParams> RetriesPerPath;
- const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
};
struct TReadSpec {
@@ -451,7 +429,7 @@ public:
TS3StreamReadActor(
ui64 inputIndex,
IHTTPGateway::TPtr gateway,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const THolderFactory& holderFactory,
const TString& url,
const TString& token,
TPathList&& paths,
@@ -486,9 +464,11 @@ public:
static constexpr char ActorName[] = "S3_READ_ACTOR";
private:
- class TBoxedBlock : public NUdf::TBoxedValueBase {
+ class TBoxedBlock : public TComputationValue<TBoxedBlock> {
public:
- TBoxedBlock(NDB::Block& block) {
+ TBoxedBlock(TMemoryUsageInfo* memInfo, NDB::Block& block)
+ : TComputationValue(memInfo)
+ {
Block.swap(block);
}
private:
@@ -508,13 +488,13 @@ private:
void CommitState(const NDqProto::TCheckpoint&) final {}
ui64 GetInputIndex() const final { return InputIndex; }
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& output, bool& finished, i64 free) final {
+ i64 GetAsyncInputData(TUnboxedValueVector& output, bool& finished, i64 free) final {
i64 total = 0LL;
if (!Blocks.empty()) do {
auto& block = std::get<NDB::Block>(Blocks.front());
const i64 s = block.bytes();
- auto value = NUdf::TUnboxedValuePod(new TBoxedBlock(block));
+ auto value = HolderFactory.Create<TBoxedBlock>(block);
if (AddPathIndex) {
NUdf::TUnboxedValue* tupleItems = nullptr;
auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems);
@@ -570,8 +550,8 @@ private:
}
const IHTTPGateway::TPtr Gateway;
- const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
- NKikimr::NMiniKQL::TPlainContainerCache ContainerCache;
+ const THolderFactory& HolderFactory;
+ TPlainContainerCache ContainerCache;
const ui64 InputIndex;
const NActors::TActorId ComputeActorId;
@@ -660,7 +640,7 @@ using namespace NKikimr::NMiniKQL;
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const TTypeEnvironment& typeEnv,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const THolderFactory& holderFactory,
IHTTPGateway::TPtr gateway,
NS3::TSource&& params,
ui64 inputIndex,
@@ -730,7 +710,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
return {actor, actor};
} else {
const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken,
- std::move(paths), addPathIndex, startPathIndex, computeActorId, retryConfig);
+ std::move(paths), addPathIndex, startPathIndex, computeActorId);
return {actor, actor};
}
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp
new file mode 100644
index 00000000000..9e6b4e5a073
--- /dev/null
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.cpp
@@ -0,0 +1,15 @@
+#include "yql_s3_retry_policy.h"
+
+namespace NYql {
+
+IRetryPolicy<long>::TPtr GetS3RetryPolicy() {
+ return IRetryPolicy<long>::GetExponentialBackoffPolicy([](long httpCode) {
+ switch (httpCode) {
+ case 0: return ERetryErrorClass::ShortRetry;
+ case 503: return ERetryErrorClass::LongRetry;
+ default: return ERetryErrorClass::NoRetry;
+ }
+ });
+}
+
+}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.h b/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.h
new file mode 100644
index 00000000000..729985331fa
--- /dev/null
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_retry_policy.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <library/cpp/retry/retry_policy.h>
+
+namespace NYql {
+
+IRetryPolicy<long>::TPtr GetS3RetryPolicy();
+
+}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 503f35d80c7..f4f5952e37f 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -1,4 +1,5 @@
#include "yql_s3_write_actor.h"
+#include "yql_s3_retry_policy.h"
#include <ydb/library/yql/utils/yql_panic.h>
@@ -26,10 +27,6 @@ using namespace NActors;
namespace {
-ERetryErrorClass RetryS3SlowDown(long httpResponseCode) {
- return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503
-}
-
struct TEvPrivate {
// Event ids
enum EEv : ui32 {
@@ -68,46 +65,6 @@ struct TEvPrivate {
};
};
-using TPath = std::tuple<TString, size_t>;
-using TPathList = std::vector<TPath>;
-
-class TRetryParams {
-public:
- TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
- : MaxRetries(retryConfig && retryConfig->GetMaxRetriesPerPath() ? retryConfig->GetMaxRetriesPerPath() : 3U)
- , InitDelayMs(retryConfig && retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : TDuration::MilliSeconds(100))
- , InitEpsilon(retryConfig && retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : 0.1)
- {
- Y_VERIFY(0. < InitEpsilon && InitEpsilon < 1.);
- Reset();
- }
-
- void Reset() {
- Retries = 0U;
- DelayMs = InitDelayMs;
- Epsilon = InitEpsilon;
- }
-
- TDuration GetNextDelay() {
- if (++Retries > MaxRetries)
- return TDuration::Zero();
- return DelayMs = GenerateNextDelay();
- }
-private:
- TDuration GenerateNextDelay() {
- const auto low = 1. - Epsilon;
- const auto jitter = low + std::rand() / (RAND_MAX / (2. * Epsilon));
- return DelayMs * jitter;
- }
-
- const ui32 MaxRetries;
- const TDuration InitDelayMs;
- const double InitEpsilon;
-
- ui32 Retries;
- TDuration DelayMs;
- double Epsilon;
-};
using namespace NKikimr::NMiniKQL;
@@ -126,7 +83,7 @@ public:
void Bootstrap(const TActorId& parentId) {
ParentId = parentId;
Become(&TS3FileWriteActor::InitialStateFunc);
- Gateway->Upload(Url + "?uploads", MakeHeader(), "", std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown));
+ Gateway->Upload(Url + "?uploads", MakeHeader(), "", std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, GetS3RetryPolicy());
}
static constexpr char ActorName[] = "S3_FILE_WRITE_ACTOR";
@@ -256,7 +213,7 @@ private:
const auto index = Tags.size();
Tags.emplace_back();
InFlight += size;
- Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true, IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown));
+ Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true, GetS3RetryPolicy());
}
}
@@ -269,7 +226,7 @@ private:
for (const auto& tag : Tags)
xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl;
xml << "</CompleteMultipartUpload>" << Endl;
- Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, ParentId, Key, std::placeholders::_1), false, IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown));
+ Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, ParentId, Key, std::placeholders::_1), false, GetS3RetryPolicy());
}
IHTTPGateway::THeaders MakeHeader() const {
@@ -295,8 +252,6 @@ private:
std::queue<TString> Parts;
std::vector<TString> Tags;
- std::vector<TRetryParams> RetriesPerPath;
-
TString UploadId;
};
@@ -401,8 +356,6 @@ private:
const TString Path;
const std::vector<TString> Keys;
- std::vector<TRetryParams> RetriesPerPath;
-
std::unordered_map<TString, TS3FileWriteActor*> FileWriteActors;
};