diff options
author | a-romanov <[email protected]> | 2022-08-01 12:45:10 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-08-01 12:45:10 +0300 |
commit | 8a1625f555c78bae20a416825e7d0adffa7cc279 (patch) | |
tree | ec455c66c176f6d10ebd71c1f25f1fd079dd8c3b | |
parent | 4d31f887a737b905d8d686a56771951d8d169f60 (diff) |
Slice up on small output objects.
5 files changed, 69 insertions, 21 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 003a9384a95..263cc55f235 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -44,8 +44,8 @@ struct TEvPrivate { // Events struct TEvUploadFinished : public TEventLocal<TEvUploadFinished, EvUploadFinished> { - explicit TEvUploadFinished(const TString& key) : Key(key) {} - const TString Key; + TEvUploadFinished(const TString& key, const TString& url) : Key(key), Url(url) {} + const TString Key, Url; }; struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> { @@ -73,17 +73,17 @@ public: TS3FileWriteActor( IHTTPGateway::TPtr gateway, NYdb::TCredentialsProviderPtr credProvider, - const TString& key, const TString& url) + const TString& key, const TString& url, size_t sizeLimit) : Gateway(std::move(gateway)) , CredProvider(std::move(credProvider)) , ActorSystem(TActivationContext::ActorSystem()) - , Key(key), Url(url) + , Key(key), Url(url), SizeLimit(sizeLimit) {} void Bootstrap(const TActorId& parentId) { ParentId = parentId; if (InputFinished && 1U == Parts.size()) { - Gateway->Upload(Url, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, std::placeholders::_1), true, GetS3RetryPolicy()); + Gateway->Upload(Url, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), true, GetS3RetryPolicy()); Parts.pop(); } else { Become(&TS3FileWriteActor::InitialStateFunc); @@ -95,7 +95,12 @@ public: void SendData(TString&& data) { InQueue += data.size(); + TotalSize += data.size(); Parts.emplace(std::move(data)); + + if (1000U == ++PartsCount || TotalSize >= SizeLimit) + InputFinished = true; + if (!UploadId.empty()) StartUploadParts(); } @@ -106,6 +111,10 @@ public: CommitUploadedParts(); } + bool IsFinishing() const { return InputFinished; } + + const TString& GetUrl() const { return Url; } + i64 GetMemoryUsed() const { return InFlight + InQueue; } @@ -166,7 +175,7 @@ private: } } - static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, IHTTPGateway::TResult&& result) { + static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, IHTTPGateway::TResult&& result) { switch (result.index()) { case 0U: try { const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); @@ -177,7 +186,7 @@ private: } else if (root.Name() != "CompleteMultipartUploadResult") actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")}))); else - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url))); break; } catch (const std::exception& ex) { actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")}))); @@ -192,10 +201,10 @@ private: } } - static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, IHTTPGateway::TResult&& result) { + static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, IHTTPGateway::TResult&& result) { switch (result.index()) { case 0U: - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url))); break; case 1U: actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result))))); @@ -244,7 +253,7 @@ private: for (const auto& tag : Tags) xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl; xml << "</CompleteMultipartUpload>" << Endl; - Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, std::placeholders::_1), false, GetS3RetryPolicy()); + Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), false, GetS3RetryPolicy()); } IHTTPGateway::THeaders MakeHeader() const { @@ -257,6 +266,8 @@ private: bool InputFinished = false; size_t InQueue = 0ULL; size_t InFlight = 0ULL; + size_t TotalSize = 0ULL; + ui16 PartsCount = 0U; const IHTTPGateway::TPtr Gateway; const NYdb::TCredentialsProviderPtr CredProvider; @@ -266,6 +277,7 @@ private: const TString Key; const TString Url; + const size_t SizeLimit; std::queue<TString> Parts; std::vector<TString> Tags; @@ -282,6 +294,8 @@ public: const TString& url, const TString& path, const std::vector<TString>& keys, + const size_t memoryLimit, + const size_t maxFileSize, IDqComputeActorAsyncOutput::ICallbacks* callbacks) : Gateway(std::move(gateway)) , CredProvider(std::move(credProvider)) @@ -291,6 +305,8 @@ public: , Url(url) , Path(path) , Keys(keys) + , MemoryLimit(memoryLimit) + , MaxFileSize(maxFileSize) {} void Bootstrap() { @@ -303,7 +319,10 @@ private: void LoadState(const NDqProto::TSinkState&) final {}; ui64 GetOutputIndex() const final { return OutputIndex; } i64 GetFreeSpace() const final { - return std::accumulate(FileWriteActors.cbegin(), FileWriteActors.cend(), i64(1_GB), [](i64 free, const std::pair<const TString, TS3FileWriteActor*>& item){ return free - item.second->GetMemoryUsed(); }); + return std::accumulate(FileWriteActors.cbegin(), FileWriteActors.cend(), i64(MemoryLimit), + [](i64 free, const std::pair<const TString, std::vector<TS3FileWriteActor*>>& item) { + return free - std::accumulate(item.second.cbegin(), item.second.cend(), i64(0), [](i64 sum, TS3FileWriteActor* actor) { return sum += actor->GetMemoryUsed(); }); + }); } TString MakeKey(const NUdf::TUnboxedValuePod v) const { @@ -336,18 +355,18 @@ private: void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { for (const auto& v : data) { const auto& key = MakeKey(v); - const auto ins = FileWriteActors.emplace(key, nullptr); - if (ins.second) { - auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path + key + MakeSuffix()); - ins.first->second = fileWrite.get(); + const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>()); + if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) { + auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path + key + MakeSuffix(), MaxFileSize); + ins.first->second.emplace_back(fileWrite.get()); RegisterWithSameMailbox(fileWrite.release()); } - ins.first->second->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef())); + ins.first->second.back()->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef())); } if (finished) - std::for_each(FileWriteActors.cbegin(), FileWriteActors.cend(), [](const std::pair<const TString, TS3FileWriteActor*>& item){ item.second->Finish(); }); + std::for_each(FileWriteActors.cbegin(), FileWriteActors.cend(), [](const std::pair<const TString, std::vector<TS3FileWriteActor*>>& item){ item.second.back()->Finish(); }); data.clear(); } @@ -356,7 +375,14 @@ private: } void Handle(TEvPrivate::TEvUploadFinished::TPtr& result) { - FileWriteActors.erase(result->Get()->Key); + if (const auto it = FileWriteActors.find(result->Get()->Key); FileWriteActors.cend() != it) { + if (const auto ft = std::find_if(it->second.cbegin(), it->second.cend(), [&](TS3FileWriteActor* actor){ return result->Get()->Url == actor->GetUrl(); }); it->second.cend() != ft) { + it->second.erase(ft); + if (it->second.empty()) + FileWriteActors.erase(it); + } + } + if (FileWriteActors.empty()) Callbacks->OnAsyncOutputFinished(OutputIndex); } @@ -377,7 +403,10 @@ private: const TString Path; const std::vector<TString> Keys; - std::unordered_map<TString, TS3FileWriteActor*> FileWriteActors; + const size_t MemoryLimit; + const size_t MaxFileSize; + + std::unordered_map<TString, std::vector<TS3FileWriteActor*>> FileWriteActors; }; } // namespace @@ -396,7 +425,16 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const auto token = secureParams.Value(params.GetToken(), TString{}); const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); const auto authToken = credentialsProviderFactory->CreateProvider(); - const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), credentialsProviderFactory->CreateProvider(), randomProvider, params.GetUrl(), params.GetPath(), std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()), callbacks); + const auto actor = new TS3WriteActor( + outputIndex, + std::move(gateway), + credentialsProviderFactory->CreateProvider(), + randomProvider, params.GetUrl(), + params.GetPath(), + std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()), + params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB, + params.HasMaxFileSize() ? params.GetMaxFileSize() : 50_MB, + callbacks); return {actor, actor}; } diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto index ab3ad50adfa..f18ca717e83 100644 --- a/ydb/library/yql/providers/s3/proto/sink.proto +++ b/ydb/library/yql/providers/s3/proto/sink.proto @@ -8,5 +8,7 @@ message TSink { string Token = 2; string Path = 3; repeated string Keys = 4; + optional string Compression = 5; + optional uint64 MemoryLimit = 6; + optional uint64 MaxFileSize = 7; } - diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 064d056b14f..428ec9bbb17 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -235,6 +235,12 @@ public: for (const auto& key : GetKeys(maySettings.Settings().Ref())) sinkDesc.MutableKeys()->Add(TString(key->Content())); + if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get()) + sinkDesc.SetMaxFileSize(*maxObjectSize); + + if (const auto& memoryLimit = State_->Configuration->InFlightMemoryLimit.Get()) + sinkDesc.SetMemoryLimit(*memoryLimit); + protoSettings.PackFrom(sinkDesc); sinkType = "S3Sink"; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index 993dd909253..0e20428dd40 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -13,6 +13,7 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, UniqueKeysCountLimit); REGISTER_SETTING(*this, BlockSizeMemoryLimit); REGISTER_SETTING(*this, SerializeMemoryLimit); + REGISTER_SETTING(*this, InFlightMemoryLimit); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 8a557d15095..7e583367fb1 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -15,6 +15,7 @@ struct TS3Settings { NCommon::TConfSetting<ui64, false> UniqueKeysCountLimit; NCommon::TConfSetting<ui64, false> BlockSizeMemoryLimit; NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions. + NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink. }; struct TS3ClusterSettings { |