diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-20 12:44:09 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-20 12:44:09 +0300 |
commit | 4aab304cf9fba378a52a0bcfc2b5a9663f2300cf (patch) | |
tree | df067052d8fbe065eee06b1ab5b9c9f26c18674a | |
parent | 4c24c91c583db70673b56f8a2b85f69e6e4fd8a0 (diff) | |
download | ydb-4aab304cf9fba378a52a0bcfc2b5a9663f2300cf.tar.gz |
S3 split on partitions on write. Draft.
7 files changed, 433 insertions, 130 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 24e3997679a..516cf0edefa 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -40,7 +40,10 @@ struct TEvPrivate { static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); // Events - struct TEvUploadFinished : public TEventLocal<TEvUploadFinished, EvUploadFinished> {}; + struct TEvUploadFinished : public TEventLocal<TEvUploadFinished, EvUploadFinished> { + explicit TEvUploadFinished(const TString& key) : Key(key) {} + const TString Key; + }; struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> { explicit TEvUploadError(TIssues&& error) : Error(std::move(error)) {} @@ -102,83 +105,84 @@ private: using namespace NKikimr::NMiniKQL; -class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComputeActorAsyncOutput { +class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> { public: - TS3WriteActor(ui64 outputIndex, + TS3FileWriteActor( IHTTPGateway::TPtr gateway, + NYdb::TCredentialsProviderPtr credProvider, + const TString& key, const TString& url, - const TString& token, - const TString& path, - IDqComputeActorAsyncOutput::ICallbacks* callbacks, const std::shared_ptr<NS3::TRetryConfig>& retryConfig) : Gateway(std::move(gateway)) - , OutputIndex(outputIndex) - , Callbacks(callbacks) + , CredProvider(std::move(credProvider)) , ActorSystem(TActivationContext::ActorSystem()) - , Url(url) - , Headers(MakeHeader(token)) - , Path(path) + , Key(key), Url(url + key) , RetryConfig(retryConfig) {} - void Bootstrap() { - Become(&TS3WriteActor::InitialStateFunc); - Gateway->Upload(Url + Path + "?uploads", Headers, "", std::bind(&TS3WriteActor::OnUploadsCreated, ActorSystem, SelfId(), std::placeholders::_1), false); + void Bootstrap(const TActorId& parentId) { + ParentId = parentId; + Become(&TS3FileWriteActor::InitialStateFunc); + Gateway->Upload(Url + "?uploads", MakeHeader(), "", std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false); } - static constexpr char ActorName[] = "S3_WRITE_ACTOR"; -private: - void CommitState(const NDqProto::TCheckpoint&) final {}; - void LoadState(const NDqProto::TSinkState&) final {}; - ui64 GetOutputIndex() const final { return OutputIndex; } - i64 GetFreeSpace() const final { - return 1_GB - InFlight - InQueue; + static constexpr char ActorName[] = "S3_FILE_WRITE_ACTOR"; + + void SendData(TString&& data) { + InQueue += data.size(); + Parts.emplace(std::move(data)); + if (!UploadId.empty()) + StartUploadParts(); } + void Finish() { + InputFinished = true; + if (!InFlight && Parts.empty()) + CommitUploadedParts(); + } + + i64 GetMemoryUsed() const { + return InFlight + InQueue; + } +private: STRICT_STFUNC(InitialStateFunc, - hFunc(TEvPrivate::TEvUploadError, Handle); hFunc(TEvPrivate::TEvUploadStarted, Handle); ) STRICT_STFUNC(WorkingStateFunc, - hFunc(TEvPrivate::TEvUploadError, Handle); hFunc(TEvPrivate::TEvUploadPartFinished, Handle); ) - STRICT_STFUNC(FinalStateFunc, - hFunc(TEvPrivate::TEvUploadError, Handle); - cFunc(TEvPrivate::EvUploadFinished, HandleFinished); - ) - static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) { + static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, IHTTPGateway::TResult&& result) { switch (result.index()) { case 0U: try { const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); if (const auto& root = xml.Root(); root.Name() == "Error") { const auto& code = root.Node("Code", true).Value<TString>(); const auto& message = root.Node("Message", true).Value<TString>(); - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); + actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); } else if (root.Name() != "InitiateMultipartUploadResult") - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")}))); + actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")}))); else { const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>()))); } break; } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")}))); + actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")}))); break; } case 1U: - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result))))); + actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result))))); break; default: - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")}))); + actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")}))); break; } } - static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, size_t size, size_t index, IHTTPGateway::TResult&& response) { + static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, IHTTPGateway::TResult&& response) { switch (response.index()) { case 0U: { const auto str = std::get<IHTTPGateway::TContent>(std::move(response)).Extract(); @@ -189,16 +193,16 @@ private: break; } } - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)}))); + actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)}))); break; } case 1U: - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(response))))); + actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(response))))); break; } } - static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) { + static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, const TString& key, IHTTPGateway::TResult&& result) { switch (result.index()) { case 0U: try { const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); @@ -209,7 +213,7 @@ private: } else if (root.Name() != "CompleteMultipartUploadResult") actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")}))); else - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadFinished())); + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadFinished(key))); break; } catch (const std::exception& ex) { actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")}))); @@ -224,26 +228,9 @@ private: } } - void SendData(TUnboxedValueVector&& data, i64 size, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { - InputFinished = finished; - for (const auto& v : data) - Parts.emplace(v.AsStringRef()); - data.clear(); - InQueue += size; - if (!UploadId.empty()) - StartUploadParts(); - } - - void Handle(TEvPrivate::TEvUploadError::TPtr& result) { - Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true); - if (!UploadId.empty()) { - // TODO: Send delete. - } - } - void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) { UploadId = result->Get()->UploadId; - Become(&TS3WriteActor::WorkingStateFunc); + Become(&TS3FileWriteActor::WorkingStateFunc); StartUploadParts(); } @@ -251,30 +238,13 @@ private: InFlight -= result->Get()->Size; Tags[result->Get()->Index] = std::move(result->Get()->ETag); - if (!InFlight && InputFinished && Parts.empty()) { - Become(&TS3WriteActor::FinalStateFunc); - TStringBuilder xml; - xml << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" << Endl; - xml << "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" << Endl; - size_t i = 0U; - for (const auto& tag : Tags) - xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl; - xml << "</CompleteMultipartUpload>" << Endl; - Gateway->Upload(Url + Path + "?uploadId=" + UploadId, Headers, xml, std::bind(&TS3WriteActor::OnUploadFinish, ActorSystem, SelfId(), std::placeholders::_1), false); - } - } - - void HandleFinished() { - return Callbacks->OnAsyncOutputFinished(OutputIndex); + if (!InFlight && InputFinished && Parts.empty()) + CommitUploadedParts(); } // IActor & IDqComputeActorAsyncOutput void PassAway() override { // Is called from Compute Actor - TActorBootstrapped<TS3WriteActor>::PassAway(); - } - - static IHTTPGateway::THeaders MakeHeader(const TString& token) { - return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token}; + TActorBootstrapped<TS3FileWriteActor>::PassAway(); } void StartUploadParts() { @@ -283,24 +253,41 @@ private: const auto index = Tags.size(); Tags.emplace_back(); InFlight += size; - Gateway->Upload(Url + Path + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, Headers, std::move(Parts.front()), std::bind(&TS3WriteActor::OnPartUploadFinish, ActorSystem, SelfId(), size, index, std::placeholders::_1), true); + Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true); } } + void CommitUploadedParts() { + Become(nullptr); + TStringBuilder xml; + xml << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" << Endl; + xml << "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" << Endl; + size_t i = 0U; + for (const auto& tag : Tags) + xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl; + xml << "</CompleteMultipartUpload>" << Endl; + Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, ParentId, Key, std::placeholders::_1), false); + } + + IHTTPGateway::THeaders MakeHeader() const { + if (const auto& token = CredProvider->GetAuthInfo(); token.empty()) + return IHTTPGateway::THeaders(); + else + return IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token}; + } + bool InputFinished = false; size_t InQueue = 0ULL; size_t InFlight = 0ULL; const IHTTPGateway::TPtr Gateway; - - const ui64 OutputIndex; - IDqComputeActorAsyncOutput::ICallbacks *const Callbacks; + const NYdb::TCredentialsProviderPtr CredProvider; TActorSystem* const ActorSystem; + TActorId ParentId; + const TString Key; const TString Url; - const IHTTPGateway::THeaders Headers; - const TString Path; std::queue<TString> Parts; std::vector<TString> Tags; @@ -311,6 +298,107 @@ private: TString UploadId; }; +class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComputeActorAsyncOutput { +public: + TS3WriteActor(ui64 outputIndex, + IHTTPGateway::TPtr gateway, + NYdb::TCredentialsProviderPtr credProvider, + const TString& url, + const TString& path, + const ui32 keyWidth, + IDqComputeActorAsyncOutput::ICallbacks* callbacks, + const std::shared_ptr<NS3::TRetryConfig>& retryConfig) + : Gateway(std::move(gateway)) + , CredProvider(std::move(credProvider)) + , OutputIndex(outputIndex) + , Callbacks(callbacks) + , Url(url) + , Path(path) + , KeyWidth(keyWidth) + , RetryConfig(retryConfig) + {} + + void Bootstrap() { + Become(&TS3WriteActor::StateFunc); + } + + static constexpr char ActorName[] = "S3_WRITE_ACTOR"; +private: + void CommitState(const NDqProto::TCheckpoint&) final {}; + void LoadState(const NDqProto::TSinkState&) final {}; + ui64 GetOutputIndex() const final { return OutputIndex; } + i64 GetFreeSpace() const final { + return std::accumulate(FileWriteActors.cbegin(), FileWriteActors.cend(), i64(1_GB), [](i64 free, const std::pair<const TString, TS3FileWriteActor*>& item){ return free - item.second->GetMemoryUsed(); }); + } + + TString MakeKey(const NUdf::TUnboxedValuePod v) const { + if (!KeyWidth) + return {}; + + auto elements = v.GetElements(); + TStringBuilder key; + for (auto k = KeyWidth; k; --k) { + const std::string_view keyPart = (++elements)->AsStringRef(); + YQL_ENSURE(std::string_view::npos == keyPart.find('/'), "Invalid partition key, contains '/': " << keyPart); + key << keyPart << '/'; + } + return key; + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvUploadError, Handle); + hFunc(TEvPrivate::TEvUploadFinished, Handle); + ) + + void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { + for (const auto& v : data) { + const auto& key = MakeKey(v); + const auto ins = FileWriteActors.emplace(key, nullptr); + if (ins.second) { + auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path, RetryConfig); + ins.first->second = fileWrite.get(); + RegisterWithSameMailbox(fileWrite.release()); + } + + ins.first->second->SendData(TString((KeyWidth > 0U ? *v.GetElements() : v).AsStringRef())); + } + + if (finished) + std::for_each(FileWriteActors.cbegin(), FileWriteActors.cend(), [](const std::pair<const TString, TS3FileWriteActor*>& item){ item.second->Finish(); }); + data.clear(); + } + + void Handle(TEvPrivate::TEvUploadError::TPtr& result) { + Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true); + } + + void Handle(TEvPrivate::TEvUploadFinished::TPtr& result) { + FileWriteActors.erase(result->Get()->Key); + if (FileWriteActors.empty()) + Callbacks->OnAsyncOutputFinished(OutputIndex); + } + + // IActor & IDqComputeActorAsyncOutput + void PassAway() override { // Is called from Compute Actor + TActorBootstrapped<TS3WriteActor>::PassAway(); + } + + const IHTTPGateway::TPtr Gateway; + const NYdb::TCredentialsProviderPtr CredProvider; + + const ui64 OutputIndex; + IDqComputeActorAsyncOutput::ICallbacks *const Callbacks; + + const TString Url; + const TString Path; + const ui32 KeyWidth; + + std::vector<TRetryParams> RetriesPerPath; + const std::shared_ptr<NS3::TRetryConfig> RetryConfig; + + std::unordered_map<TString, TS3FileWriteActor*> FileWriteActors; +}; + } // namespace std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( @@ -326,9 +414,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( { const auto token = secureParams.Value(params.GetToken(), TString{}); const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); - const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); - - const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), params.GetUrl(), authToken, params.GetPath(), callbacks, retryConfig); + const auto authToken = credentialsProviderFactory->CreateProvider(); + const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), credentialsProviderFactory->CreateProvider(), params.GetUrl(), params.GetPath(), params.HasKeys() ? params.GetKeys() : 0U, callbacks, retryConfig); return {actor, actor}; } diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 40be1652ccd..93abdf6b5b1 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -141,7 +141,8 @@ "Children": [ {"Index": 0, "Name": "Input", "Type": "TExprBase"}, {"Index": 1, "Name": "Format", "Type": "TCoAtom"}, - {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + {"Index": 2, "Name": "KeyColumns", "Type": "TCoAtomList"}, + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} ] } ] diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto index bbc0405af3a..2b995a3cab2 100644 --- a/ydb/library/yql/providers/s3/proto/sink.proto +++ b/ydb/library/yql/providers/s3/proto/sink.proto @@ -7,5 +7,6 @@ message TSink { string Url = 1; string Token = 2; string Path = 3; + optional uint32 Keys = 4; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 9fe5d05bb7b..c67af6e61a9 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -92,7 +92,7 @@ private: } TStatus HandleOutput(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) { + if (!EnsureMinMaxArgsCount(*input, 3U, 4U, ctx)) { return TStatus::Error; } @@ -104,11 +104,40 @@ private: return TStatus::Error; } + if (!EnsureTupleOfAtoms(*input->Child(TS3SinkOutput::idx_KeyColumns), ctx)) { + return TStatus::Error; + } + if (input->ChildrenSize() > TS3SinkOutput::idx_Settings && !EnsureTuple(*input->Child(TS3SinkOutput::idx_Settings), ctx)) { return TStatus::Error; } - input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); + if (const auto keysCount = input->Child(TS3SinkOutput::idx_KeyColumns)->ChildrenSize()) { + const auto source = input->Child(TS3SinkOutput::idx_Input); + const auto itemType = source->GetTypeAnn()->Cast<TFlowExprType>()->GetItemType(); + if (!EnsureStructType(source->Pos(), *itemType, ctx)) { + return TStatus::Error; + } + + const auto structType = itemType->Cast<TStructExprType>(); + for (auto i = 0U; i < keysCount; ++i) { + const auto key = input->Child(TS3SinkOutput::idx_KeyColumns)->Child(i); + if (const auto keyType = structType->FindItemType(key->Content())) { + if (!EnsureDataType(key->Pos(), *keyType, ctx)) { + return TStatus::Error; + } + } else { + ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), "Missed key column.")); + return TStatus::Error; + } + + TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8)); + itemTypes.front() = ctx.MakeType<TDataExprType>(EDataSlot::String); + input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TTupleExprType>(itemTypes))); + } + } else + input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); + return TStatus::Ok; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 2f3b05faa6f..2343f79ea18 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -19,6 +19,15 @@ using namespace NNodes; namespace { +std::string_view GetKeys(const TExprNode& settings) { + for (auto i = 0U; i < settings.ChildrenSize(); ++i) { + const auto& child = *settings.Child(i); + if (child.Head().IsAtom("keys")) + return child.Tail().Content(); + } + return {}; +} + using namespace NYql::NS3Details; class TS3DqIntegration: public TDqIntegrationBase { @@ -221,6 +230,8 @@ public: sinkDesc.SetUrl(connect.Url); sinkDesc.SetToken(settings.Token().Name().StringValue()); sinkDesc.SetPath(settings.Path().StringValue()); + if (const auto& keys = GetKeys(maySettings.Settings().Ref()); !keys.empty()) + sinkDesc.SetKeys(::FromString<ui32>(keys)); protoSettings.PackFrom(sinkDesc); sinkType = "S3Sink"; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp index 8ffbd2be00d..3956e84a996 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/common/mkql/parser.h> +#include <library/cpp/json/json_writer.h> #include <util/stream/str.h> namespace NYql { @@ -12,11 +13,10 @@ using namespace NNodes; namespace { - TRuntimeNode BuildSerializeCall( TRuntimeNode input, + const std::vector<std::string_view>& keys, const std::string_view& format, - const std::string_view& /*compression*/, TType* inputType, NCommon::TMkqlBuildContext& ctx) { @@ -40,14 +40,62 @@ TRuntimeNode BuildSerializeCall( ); } - const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewStreamType(inputItemType)})}); - return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format), {ctx.ProgramBuilder.FromFlow(input)})); + TString settings; + if (!keys.empty()) { + const std::unordered_set<std::string_view> set(keys.cbegin(), keys.cend()); + const auto structType = AS_TYPE(TStructType, inputItemType); + MKQL_ENSURE(set.size() < structType->GetMembersCount(), "Expected non key columns."); + std::vector<std::pair<std::string_view, TType*>> types(structType->GetMembersCount()); + const auto keyType = ctx.ProgramBuilder.NewDataType(NUdf::TDataType<NUdf::TUtf8>::Id); + for (auto i = 0U; i < types.size(); ++i) { + const auto& name = structType->GetMemberName(i); + types[i].first = name; + types[i].second = set.contains(name) ? keyType : structType->GetMemberType(i); + } + + if (const auto newStructType = static_cast<TStructType*>(ctx.ProgramBuilder.NewStructType(types)); !newStructType->IsSameType(*structType)) { + input = ctx.ProgramBuilder.Map(input, + [&](TRuntimeNode item) { + std::vector<std::pair<std::string_view, TRuntimeNode>> members(types.size()); + for (auto i = 0U; i < members.size(); ++i) { + const auto& name = members[i].first = types[i].first; + members[i].second = ctx.ProgramBuilder.Member(item, name); + if (const auto oldType = structType->GetMemberType(i); !newStructType->GetMemberType(i)->IsSameType(*oldType)) { + const auto dataType = AS_TYPE(TDataType, oldType); + members[i].second = dataType->GetSchemeType() == NUdf::TDataType<const char*>::Id ? + ctx.ProgramBuilder.StrictFromString(members[i].second, newStructType->GetMemberType(i)): + ctx.ProgramBuilder.ToString<true>(members[i].second); + } + } + + return ctx.ProgramBuilder.NewStruct(newStructType, members); + } + ); + } + + TStringOutput stream(settings); + NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig()); + writer.OpenMap(); + writer.WriteKey("keys"); + writer.OpenArray(); + std::for_each(keys.cbegin(), keys.cend(), [&writer](const std::string_view& key){ writer.Write(key); }); + writer.CloseArray(); + writer.CloseMap(); + writer.Flush(); + } + + input = ctx.ProgramBuilder.FromFlow(input); + const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({input.GetStaticType()})}); + return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format + settings), {input})); } TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, NCommon::TMkqlBuildContext& ctx) { const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx); const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *wrapper.Input().Ref().GetTypeAnn(), ctx.ProgramBuilder); - return BuildSerializeCall(input, wrapper.Format().Value(), "TODO", inputItemType, ctx); + std::vector<std::string_view> keys; + keys.reserve(wrapper.KeyColumns().Size()); + wrapper.KeyColumns().Ref().ForEachChild([&](const TExprNode& key){ keys.emplace_back(key.Content()); }); + return BuildSerializeCall(input, keys, wrapper.Format().Value(), inputItemType, ctx); } } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 6718f26061d..384e828ea45 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -15,6 +15,18 @@ namespace { using namespace NNodes; using namespace NDq; +TExprNode::TListType GetPartitionBy(const TExprNode& settings) { + for (auto i = 0U; i < settings.ChildrenSize(); ++i) { + if (settings.Child(i)->Head().IsAtom("partitionby")) { + auto children = settings.Child(i)->ChildrenList(); + children.erase(children.cbegin()); + return children; + } + } + + return {}; +} + class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { public: explicit TS3PhysicalOptProposalTransformer(TS3State::TPtr state) @@ -24,74 +36,188 @@ public: #define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TS3PhysicalOptProposalTransformer::name) AddHandler(0, &TS3WriteObject::Match, HNDL(S3WriteObject)); #undef HNDL - - SetGlobal(0); // Stage 0 of this optimizer is global => we can remap nodes. } - TMaybeNode<TExprBase> S3WriteObject(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const { - auto write = node.Cast<TS3WriteObject>(); - if (!TDqCnUnionAll::Match(write.Input().Raw())) { // => this code is not for RTMR mode. - return node; - } - + TMaybeNode<TExprBase> S3WriteObject(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + const auto& write = node.Cast<TS3WriteObject>(); const auto& targetNode = write.Target(); const auto& cluster = write.DataSink().Cluster().StringValue(); + const auto token = "cluster:default_" + cluster; + const auto keys = GetPartitionBy(write.Target().Settings().Ref()); + + if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) { + YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink."; + return keys.empty() ? + Build<TDqQuery>(ctx, write.Pos()) + .World(write.World()) + .SinkStages() + .Add<TDqStage>() + .Inputs().Build() + .Program<TCoLambda>() + .Args({}) + .Body<TS3SinkOutput>() + .Input<TCoToFlow>() + .Input(write.Input()) + .Build() + .Format(write.Target().Format()) + .KeyColumns().Build() + .Build() + .Build() + .Outputs<TDqStageOutputsList>() + .Add<TDqSink>() + .DataSink(write.DataSink()) + .Index().Value("0").Build() + .Settings<TS3SinkSettings>() + .Path(write.Target().Path()) + .Settings<TCoNameValueTupleList>().Build() + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Build() + .Build() + .Done(): + Build<TDqQuery>(ctx, write.Pos()) + .World(write.World()) + .SinkStages() + .Add<TDqStage>() + .Inputs() + .Add<TDqCnHashShuffle>() + .Output<TDqOutput>() + .Stage<TDqStage>() + .Inputs().Build() + .Program<TCoLambda>() + .Args({}) + .Body<TCoToFlow>() + .Input(write.Input()) + .Build() + .Build() + .Settings().Build() + .Build() + .Index().Value("0", TNodeFlags::Default).Build() + .Build() + .KeyColumns().Add(keys).Build() + .Build() + .Build() + .Program<TCoLambda>() + .Args({"in"}) + .Body<TS3SinkOutput>() + .Input("in") + .Format(write.Target().Format()) + .KeyColumns().Add(keys).Build() + .Build() + .Build() + .Outputs<TDqStageOutputsList>() + .Add<TDqSink>() + .DataSink(write.DataSink()) + .Index().Value("0", TNodeFlags::Default).Build() + .Settings<TS3SinkSettings>() + .Path(write.Target().Path()) + .Settings<TCoNameValueTupleList>() + .Add() + .Name().Build("keys", TNodeFlags::Default) + .Value<TCoAtom>().Build(ToString(keys.size()), TNodeFlags::Default) + .Build() + .Build() + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Build() + .Build() + .Done(); + } + + if (!TDqCnUnionAll::Match(write.Input().Raw())) { + return node; + } const TParentsMap* parentsMap = getParents(); - auto dqUnion = write.Input().Cast<TDqCnUnionAll>(); + const auto dqUnion = write.Input().Cast<TDqCnUnionAll>(); if (!NDq::IsSingleConsumerConnection(dqUnion, *parentsMap)) { return node; } + YQL_CLOG(INFO, ProviderS3) << "Rewrite S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as sink."; - YQL_CLOG(INFO, ProviderS3) << "Optimize S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "`"; - - const auto token = "cluster:default_" + cluster; + const auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>(); - auto dqSink = Build<TDqSink>(ctx, write.Pos()) + const auto sink = Build<TDqSink>(ctx, write.Pos()) .DataSink(write.DataSink()) .Index(dqUnion.Output().Index()) .Settings<TS3SinkSettings>() .Path(write.Target().Path()) - .Settings<TCoNameValueTupleList>().Build() + .Settings<TCoNameValueTupleList>() + .Add() + .Name().Build("keys", TNodeFlags::Default) + .Value<TCoAtom>().Build(ToString(keys.size()), TNodeFlags::Default) + .Build() + .Build() .Token<TCoSecureParam>() .Name().Build(token) .Build() .Build() .Done(); - auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>(); auto outputsBuilder = Build<TDqStageOutputsList>(ctx, targetNode.Pos()); - if (inputStage.Outputs()) { + if (inputStage.Outputs() && keys.empty()) { outputsBuilder.InitFrom(inputStage.Outputs().Cast()); } - outputsBuilder.Add(dqSink); - - auto dqStageWithSink = Build<TDqStage>(ctx, inputStage.Pos()) - .InitFrom(inputStage) - .Program<TCoLambda>() - .Args({"input"}) - .Body<TS3SinkOutput>() - .Input<TExprApplier>() - .Apply(inputStage.Program()).With(0, "input") + outputsBuilder.Add(sink); + + auto dqStageWithSink = keys.empty() ? + Build<TDqStage>(ctx, inputStage.Pos()) + .InitFrom(inputStage) + .Program<TCoLambda>() + .Args({"input"}) + .Body<TS3SinkOutput>() + .Input<TExprApplier>() + .Apply(inputStage.Program()).With(0, "input") + .Build() + .Format(write.Target().Format()) + .KeyColumns() + .Add(std::move(keys)) + .Build() .Build() - .Format(write.Target().Format()) .Build() - .Build() - .Outputs(outputsBuilder.Done()) - .Done(); - - auto dqQueryBuilder = Build<TDqQuery>(ctx, write.Pos()); - dqQueryBuilder.World(write.World()); - dqQueryBuilder.SinkStages().Add(dqStageWithSink).Build(); - - optCtx.RemapNode(inputStage.Ref(), dqStageWithSink.Ptr()); + .Outputs(outputsBuilder.Done()) + .Done(): + Build<TDqStage>(ctx, inputStage.Pos()) + .Inputs() + .Add<TDqCnHashShuffle>() + .Output<TDqOutput>() + .Stage(inputStage) + .Index(dqUnion.Output().Index()) + .Build() + .KeyColumns().Add(keys).Build() + .Build() + .Build() + .Program<TCoLambda>() + .Args({"in"}) + .Body<TS3SinkOutput>() + .Input("in") + .Format(write.Target().Format()) + .KeyColumns().Add(std::move(keys)).Build() + .Build() + .Build() + .Settings().Build() + .Outputs(outputsBuilder.Done()) + .Done(); - return dqQueryBuilder.Done(); + return Build<TDqQuery>(ctx, write.Pos()) + .World(write.World()) + .SinkStages().Add(dqStageWithSink).Build() + .Done(); } private: - TS3State::TPtr State_; + const TS3State::TPtr State_; }; } // namespace |