summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-08-01 12:45:10 +0300
committera-romanov <[email protected]>2022-08-01 12:45:10 +0300
commit8a1625f555c78bae20a416825e7d0adffa7cc279 (patch)
treeec455c66c176f6d10ebd71c1f25f1fd079dd8c3b
parent4d31f887a737b905d8d686a56771951d8d169f60 (diff)
Slice up on small output objects.
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp78
-rw-r--r--ydb/library/yql/providers/s3/proto/sink.proto4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp6
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h1
5 files changed, 69 insertions, 21 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 003a9384a95..263cc55f235 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -44,8 +44,8 @@ struct TEvPrivate {
// Events
struct TEvUploadFinished : public TEventLocal<TEvUploadFinished, EvUploadFinished> {
- explicit TEvUploadFinished(const TString& key) : Key(key) {}
- const TString Key;
+ TEvUploadFinished(const TString& key, const TString& url) : Key(key), Url(url) {}
+ const TString Key, Url;
};
struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> {
@@ -73,17 +73,17 @@ public:
TS3FileWriteActor(
IHTTPGateway::TPtr gateway,
NYdb::TCredentialsProviderPtr credProvider,
- const TString& key, const TString& url)
+ const TString& key, const TString& url, size_t sizeLimit)
: Gateway(std::move(gateway))
, CredProvider(std::move(credProvider))
, ActorSystem(TActivationContext::ActorSystem())
- , Key(key), Url(url)
+ , Key(key), Url(url), SizeLimit(sizeLimit)
{}
void Bootstrap(const TActorId& parentId) {
ParentId = parentId;
if (InputFinished && 1U == Parts.size()) {
- Gateway->Upload(Url, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, std::placeholders::_1), true, GetS3RetryPolicy());
+ Gateway->Upload(Url, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), true, GetS3RetryPolicy());
Parts.pop();
} else {
Become(&TS3FileWriteActor::InitialStateFunc);
@@ -95,7 +95,12 @@ public:
void SendData(TString&& data) {
InQueue += data.size();
+ TotalSize += data.size();
Parts.emplace(std::move(data));
+
+ if (1000U == ++PartsCount || TotalSize >= SizeLimit)
+ InputFinished = true;
+
if (!UploadId.empty())
StartUploadParts();
}
@@ -106,6 +111,10 @@ public:
CommitUploadedParts();
}
+ bool IsFinishing() const { return InputFinished; }
+
+ const TString& GetUrl() const { return Url; }
+
i64 GetMemoryUsed() const {
return InFlight + InQueue;
}
@@ -166,7 +175,7 @@ private:
}
}
- static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, IHTTPGateway::TResult&& result) {
+ static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, IHTTPGateway::TResult&& result) {
switch (result.index()) {
case 0U: try {
const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
@@ -177,7 +186,7 @@ private:
} else if (root.Name() != "CompleteMultipartUploadResult")
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")})));
else
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key)));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));
break;
} catch (const std::exception& ex) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")})));
@@ -192,10 +201,10 @@ private:
}
}
- static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, IHTTPGateway::TResult&& result) {
+ static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, IHTTPGateway::TResult&& result) {
switch (result.index()) {
case 0U:
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key)));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));
break;
case 1U:
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
@@ -244,7 +253,7 @@ private:
for (const auto& tag : Tags)
xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl;
xml << "</CompleteMultipartUpload>" << Endl;
- Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, std::placeholders::_1), false, GetS3RetryPolicy());
+ Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), false, GetS3RetryPolicy());
}
IHTTPGateway::THeaders MakeHeader() const {
@@ -257,6 +266,8 @@ private:
bool InputFinished = false;
size_t InQueue = 0ULL;
size_t InFlight = 0ULL;
+ size_t TotalSize = 0ULL;
+ ui16 PartsCount = 0U;
const IHTTPGateway::TPtr Gateway;
const NYdb::TCredentialsProviderPtr CredProvider;
@@ -266,6 +277,7 @@ private:
const TString Key;
const TString Url;
+ const size_t SizeLimit;
std::queue<TString> Parts;
std::vector<TString> Tags;
@@ -282,6 +294,8 @@ public:
const TString& url,
const TString& path,
const std::vector<TString>& keys,
+ const size_t memoryLimit,
+ const size_t maxFileSize,
IDqComputeActorAsyncOutput::ICallbacks* callbacks)
: Gateway(std::move(gateway))
, CredProvider(std::move(credProvider))
@@ -291,6 +305,8 @@ public:
, Url(url)
, Path(path)
, Keys(keys)
+ , MemoryLimit(memoryLimit)
+ , MaxFileSize(maxFileSize)
{}
void Bootstrap() {
@@ -303,7 +319,10 @@ private:
void LoadState(const NDqProto::TSinkState&) final {};
ui64 GetOutputIndex() const final { return OutputIndex; }
i64 GetFreeSpace() const final {
- return std::accumulate(FileWriteActors.cbegin(), FileWriteActors.cend(), i64(1_GB), [](i64 free, const std::pair<const TString, TS3FileWriteActor*>& item){ return free - item.second->GetMemoryUsed(); });
+ return std::accumulate(FileWriteActors.cbegin(), FileWriteActors.cend(), i64(MemoryLimit),
+ [](i64 free, const std::pair<const TString, std::vector<TS3FileWriteActor*>>& item) {
+ return free - std::accumulate(item.second.cbegin(), item.second.cend(), i64(0), [](i64 sum, TS3FileWriteActor* actor) { return sum += actor->GetMemoryUsed(); });
+ });
}
TString MakeKey(const NUdf::TUnboxedValuePod v) const {
@@ -336,18 +355,18 @@ private:
void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
for (const auto& v : data) {
const auto& key = MakeKey(v);
- const auto ins = FileWriteActors.emplace(key, nullptr);
- if (ins.second) {
- auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path + key + MakeSuffix());
- ins.first->second = fileWrite.get();
+ const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
+ if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) {
+ auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path + key + MakeSuffix(), MaxFileSize);
+ ins.first->second.emplace_back(fileWrite.get());
RegisterWithSameMailbox(fileWrite.release());
}
- ins.first->second->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef()));
+ ins.first->second.back()->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef()));
}
if (finished)
- std::for_each(FileWriteActors.cbegin(), FileWriteActors.cend(), [](const std::pair<const TString, TS3FileWriteActor*>& item){ item.second->Finish(); });
+ std::for_each(FileWriteActors.cbegin(), FileWriteActors.cend(), [](const std::pair<const TString, std::vector<TS3FileWriteActor*>>& item){ item.second.back()->Finish(); });
data.clear();
}
@@ -356,7 +375,14 @@ private:
}
void Handle(TEvPrivate::TEvUploadFinished::TPtr& result) {
- FileWriteActors.erase(result->Get()->Key);
+ if (const auto it = FileWriteActors.find(result->Get()->Key); FileWriteActors.cend() != it) {
+ if (const auto ft = std::find_if(it->second.cbegin(), it->second.cend(), [&](TS3FileWriteActor* actor){ return result->Get()->Url == actor->GetUrl(); }); it->second.cend() != ft) {
+ it->second.erase(ft);
+ if (it->second.empty())
+ FileWriteActors.erase(it);
+ }
+ }
+
if (FileWriteActors.empty())
Callbacks->OnAsyncOutputFinished(OutputIndex);
}
@@ -377,7 +403,10 @@ private:
const TString Path;
const std::vector<TString> Keys;
- std::unordered_map<TString, TS3FileWriteActor*> FileWriteActors;
+ const size_t MemoryLimit;
+ const size_t MaxFileSize;
+
+ std::unordered_map<TString, std::vector<TS3FileWriteActor*>> FileWriteActors;
};
} // namespace
@@ -396,7 +425,16 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const auto token = secureParams.Value(params.GetToken(), TString{});
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
const auto authToken = credentialsProviderFactory->CreateProvider();
- const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), credentialsProviderFactory->CreateProvider(), randomProvider, params.GetUrl(), params.GetPath(), std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()), callbacks);
+ const auto actor = new TS3WriteActor(
+ outputIndex,
+ std::move(gateway),
+ credentialsProviderFactory->CreateProvider(),
+ randomProvider, params.GetUrl(),
+ params.GetPath(),
+ std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()),
+ params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB,
+ params.HasMaxFileSize() ? params.GetMaxFileSize() : 50_MB,
+ callbacks);
return {actor, actor};
}
diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto
index ab3ad50adfa..f18ca717e83 100644
--- a/ydb/library/yql/providers/s3/proto/sink.proto
+++ b/ydb/library/yql/providers/s3/proto/sink.proto
@@ -8,5 +8,7 @@ message TSink {
string Token = 2;
string Path = 3;
repeated string Keys = 4;
+ optional string Compression = 5;
+ optional uint64 MemoryLimit = 6;
+ optional uint64 MaxFileSize = 7;
}
-
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 064d056b14f..428ec9bbb17 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -235,6 +235,12 @@ public:
for (const auto& key : GetKeys(maySettings.Settings().Ref()))
sinkDesc.MutableKeys()->Add(TString(key->Content()));
+ if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get())
+ sinkDesc.SetMaxFileSize(*maxObjectSize);
+
+ if (const auto& memoryLimit = State_->Configuration->InFlightMemoryLimit.Get())
+ sinkDesc.SetMemoryLimit(*memoryLimit);
+
protoSettings.PackFrom(sinkDesc);
sinkType = "S3Sink";
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index 993dd909253..0e20428dd40 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -13,6 +13,7 @@ TS3Configuration::TS3Configuration()
REGISTER_SETTING(*this, UniqueKeysCountLimit);
REGISTER_SETTING(*this, BlockSizeMemoryLimit);
REGISTER_SETTING(*this, SerializeMemoryLimit);
+ REGISTER_SETTING(*this, InFlightMemoryLimit);
}
TS3Settings::TConstPtr TS3Configuration::Snapshot() const {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 8a557d15095..7e583367fb1 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -15,6 +15,7 @@ struct TS3Settings {
NCommon::TConfSetting<ui64, false> UniqueKeysCountLimit;
NCommon::TConfSetting<ui64, false> BlockSizeMemoryLimit;
NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions.
+ NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink.
};
struct TS3ClusterSettings {